dezembro 21, 2025
14 min de leitura

Perguntas de entrevista para Senior Machine Learning Engineer em produção

interview
career-advice
job-search
Perguntas de entrevista para Senior Machine Learning Engineer em produção
Milad Bonakdar

Milad Bonakdar

Autor

Entrevistas senior de ML avaliam julgamento de produção: design de sistemas, MLOps, treinamento distribuído, latência, monitoramento e trade-offs. Pratique respostas claras e úteis.


Introdução

Entrevistas para Senior Machine Learning Engineer costumam avaliar julgamento de produção: você consegue projetar um sistema de ML preciso o suficiente, rápido, observável, reproduzível e fácil de manter depois do lançamento? Espere perguntas sobre MLOps, design de sistemas de ML, model serving, treinamento distribuído, pipelines de features, drift e experimentação.

Use este guia para praticar respostas que expliquem trade-offs, não apenas ferramentas. Uma boa resposta senior começa por requisitos e métricas, depois conecta dados, features, treinamento, deploy, monitoramento e rollback.


Treinamento Distribuído e Escalabilidade (5 Perguntas)

1. Como você implementa o treinamento distribuído para modelos de deep learning?

Resposta: O treinamento distribuído paraleliza a computação em várias GPUs/máquinas.

  • Estratégias:
    • Paralelismo de Dados: Mesmo modelo, diferentes lotes de dados
    • Paralelismo de Modelo: Divide o modelo entre dispositivos
    • Paralelismo de Pipeline: Divide o modelo em estágios
  • Frameworks: PyTorch DDP, Horovod, TensorFlow MirroredStrategy
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

# Inicializa o treinamento distribuído
def setup_distributed(rank, world_size):
    dist.init_process_group(
        backend='nccl',  # Use 'gloo' para CPU
        init_method='env://',
        world_size=world_size,
        rank=rank
    )

# Configuração do modelo
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(784, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )
    
    def forward(self, x):
        return self.layers(x)

def train_distributed(rank, world_size):
    setup_distributed(rank, world_size)
    
    # Cria o modelo e move para a GPU
    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])
    
    # Cria o sampler distribuído
    train_dataset = MyDataset()
    train_sampler = DistributedSampler(
        train_dataset,
        num_replicas=world_size,
        rank=rank
    )
    
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=32,
        sampler=train_sampler
    )
    
    optimizer = torch.optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(10):
        train_sampler.set_epoch(epoch)  # Embaralha de forma diferente a cada época
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(rank), target.to(rank)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            if rank == 0 and batch_idx % 100 == 0:
                print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
    
    dist.destroy_process_group()

# Inicia com torch.multiprocessing
import torch.multiprocessing as mp

if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    mp.spawn(train_distributed, args=(world_size,), nprocs=world_size)

# Treinamento distribuído com TensorFlow
import tensorflow as tf

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(512, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    
    model.compile(
        optimizer='adam',
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )

# model.fit irá distribuir o treinamento automaticamente
# model.fit(train_dataset, epochs=10)

Raridade: Comum Dificuldade: Difícil


2. Explique o acúmulo de gradiente e quando usá-lo.

Resposta: O acúmulo de gradiente simula tamanhos de lote maiores quando a memória da GPU é limitada.

  • Como funciona: Acumula gradientes em várias passagens forward antes de atualizar os pesos
  • Casos de uso: Modelos grandes, memória limitada da GPU, treinamento estável
import torch
import torch.nn as nn

model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

# Tamanho efetivo do lote = batch_size * accumulation_steps
batch_size = 8
accumulation_steps = 4  # Tamanho efetivo do lote = 32

model.train()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    # Passagem forward
    output = model(data)
    loss = criterion(output, target)
    
    # Normaliza a perda pelos passos de acúmulo
    loss = loss / accumulation_steps
    
    # Passagem backward (acumula gradientes)
    loss.backward()
    
    # Atualiza os pesos a cada accumulation_steps
    if (batch_idx + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

# Treinamento de precisão mista com acúmulo de gradiente
from torch.amp import autocast, GradScaler

scaler = GradScaler("cuda")
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    with autocast("cuda"):
        output = model(data)
        loss = criterion(output, target) / accumulation_steps
    
    scaler.scale(loss).backward()
    
    if (batch_idx + 1) % accumulation_steps == 0:
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

Raridade: Comum Dificuldade: Média


3. Como você otimiza a latência de inferência do modelo?

Resposta: Várias técnicas reduzem o tempo de inferência:

  • Otimização do Modelo:
    • Quantização (INT8, FP16)
    • Poda (remove pesos)
    • Destilação de conhecimento
    • Compilação do modelo (TorchScript, ONNX)
  • Otimização de Serving:
    • Batching
    • Caching
    • Paralelismo de modelo
    • Aceleração de hardware (GPU, TPU)
import torch
import torch.nn as nn

# 1. Quantização (INT8)
model = MyModel()
model.eval()

# Quantização dinâmica
quantized_model = torch.quantization.quantize_dynamic(
    model, {nn.Linear}, dtype=torch.qint8
)

# Quantização estática (mais precisa)
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# Calibra com dados representativos
# for data in calibration_loader:
#     model(data)
torch.quantization.convert(model, inplace=True)

# 2. Compilação com TorchScript
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')

# 3. Exportação para ONNX
dummy_input = torch.randn(1, 784)
torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}}
)

# 4. Otimização com TensorRT (NVIDIA)
import tensorrt as trt

# 5. Poda
import torch.nn.utils.prune as prune

# Poda 30% dos pesos na camada linear
prune.l1_unstructured(model.fc1, name='weight', amount=0.3)

# Torna a poda permanente
prune.remove(model.fc1, 'weight')

# 6. Destilação de conhecimento
class DistillationLoss(nn.Module):
    def __init__(self, temperature=3.0):
        super().__init__()
        self.temperature = temperature
        self.kl_div = nn.KLDivLoss(reduction='batchmean')
    
    def forward(self, student_logits, teacher_logits, labels):
        # Soft targets do professor
        soft_loss = self.kl_div(
            F.log_softmax(student_logits / self.temperature, dim=1),
            F.softmax(teacher_logits / self.temperature, dim=1)
        ) * (self.temperature ** 2)
        
        # Hard targets
        hard_loss = F.cross_entropy(student_logits, labels)
        
        return 0.5 * soft_loss + 0.5 * hard_loss

# 7. Batching para inferência
class BatchPredictor:
    def __init__(self, model, max_batch_size=32, max_wait_time=0.1):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.queue = []
    
    async def predict(self, x):
        self.queue.append(x)
        
        if len(self.queue) >= self.max_batch_size:
            return await self._process_batch()
        
        # Espera por mais requisições ou timeout
        await asyncio.sleep(self.max_wait_time)
        return await self._process_batch()
    
    async def _process_batch(self):
        batch = torch.stack(self.queue)
        self.queue = []
        return self.model(batch)

Raridade: Muito Comum Dificuldade: Difícil


4. O que é treinamento de precisão mista e como funciona?

Resposta: A precisão mista usa FP16 e FP32 para acelerar o treinamento, mantendo a precisão.

  • Benefícios:
    • Treinamento mais rápido em hardware adequado
    • Uso reduzido de memória
    • Tamanhos de lote maiores
  • Desafios:
    • Estabilidade numérica
    • Underflow de gradiente
  • Solução: Escalonamento de gradiente
import torch
from torch.amp import autocast, GradScaler

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler("cuda")

for epoch in range(10):
    for data, target in train_loader:
        data, target = data.cuda(), target.cuda()
        
        optimizer.zero_grad()
        
        # Passagem forward em FP16
        with autocast("cuda"):
            output = model(data)
            loss = criterion(output, target)
        
        # Passagem backward com escalonamento de gradiente
        scaler.scale(loss).backward()
        
        # Desescala os gradientes e clipa
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # Atualiza os pesos
        scaler.step(optimizer)
        scaler.update()

# Precisão mista no TensorFlow
from tensorflow.keras import mixed_precision

policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

# O modelo usa automaticamente FP16 para computação
model = tf.keras.Sequential([
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(10)
])

# O escalonamento da perda é tratado automaticamente
optimizer = tf.keras.optimizers.Adam()
optimizer = mixed_precision.LossScaleOptimizer(optimizer)

Raridade: Comum Dificuldade: Média


5. Como você lida com gargalos no pipeline de dados?

Resposta: O carregamento de dados geralmente cria gargalos no treinamento. Otimize com:

  • Prefetching: Carrega o próximo lote durante o treinamento
  • Carregamento paralelo: Vários workers
  • Caching: Armazena dados pré-processados
  • Formato de dados: Use formatos eficientes (TFRecord, Parquet)
import torch
from torch.utils.data import DataLoader, Dataset
import multiprocessing as mp

# Configuração eficiente do DataLoader
train_loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=mp.cpu_count(),  # Carregamento paralelo
    pin_memory=True,  # Transferência mais rápida para a GPU
    prefetch_factor=2,  # Prefetch de lotes
    persistent_workers=True  # Mantém os workers ativos
)

# Dataset customizado com caching
class CachedDataset(Dataset):
    def __init__(self, data_path, cache_size=1000):
        self.data_path = data_path
        self.cache = {}
        self.cache_size = cache_size
    
    def __getitem__(self, idx):
        if idx in self.cache:
            return self.cache[idx]
        
        # Carrega e pré-processa
        data = load_and_preprocess(self.data_path, idx)
        
        # Armazena em cache se houver espaço disponível
        if len(self.cache) < self.cache_size:
            self.cache[idx] = data
        
        return data

# Otimização do pipeline de dados no TensorFlow
import tensorflow as tf

dataset = tf.data.Dataset.from_tensor_slices((X, y))

dataset = dataset.cache()  # Armazena em cache na memória
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)  # Prefetching automático
dataset = dataset.map(
    preprocess_function,
    num_parallel_calls=tf.data.AUTOTUNE
)

# Use TFRecord para datasets grandes
def create_tfrecord(data, labels, filename):
    with tf.io.TFRecordWriter(filename) as writer:
        for x, y in zip(data, labels):
            example = tf.train.Example(features=tf.train.Features(feature={
                'data': tf.train.Feature(float_list=tf.train.FloatList(value=x)),
                'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[y]))
            }))
            writer.write(example.SerializeToString())

Raridade: Comum Dificuldade: Média


MLOps e Infraestrutura (5 Perguntas)

6. Como você projeta um feature store?

Resposta: Os feature stores centralizam a engenharia e o serving de features.

Loading diagram...
  • Componentes:
    • Offline Store: Features históricas para treinamento (S3, BigQuery)
    • Online Store: Features de baixa latência para serving (Redis, DynamoDB)
    • Feature Registry: Metadados e linhagem
  • Benefícios:
    • Reusabilidade
    • Consistência (treinamento/serving)
    • Monitoramento
# Exemplo com Feast (feature store de código aberto)
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.value_type import ValueType
from datetime import timedelta

# Define a entidade
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="ID do usuário"
)

# Define a feature view
user_features = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="age", dtype=ValueType.INT64),
        Feature(name="total_purchases", dtype=ValueType.FLOAT),
        Feature(name="avg_purchase_value", dtype=ValueType.FLOAT),
    ],
    online=True,
    batch_source=FileSource(
        path="data/user_features.parquet",
        event_timestamp_column="timestamp"
    )
)

# Inicializa o feature store
fs = FeatureStore(repo_path=".")

# Obtém as features para treinamento (offline)
training_df = fs.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:age",
        "user_features:total_purchases",
        "user_features:avg_purchase_value"
    ]
).to_df()

# Obtém as features para serving (online)
online_features = fs.get_online_features(
    features=[
        "user_features:age",
        "user_features:total_purchases"
    ],
    entity_rows=[{"user_id": 123}]
).to_dict()

# Implementação customizada de feature store
class SimpleFeatureStore:
    def __init__(self, redis_client, s3_client):
        self.redis = redis_client  # Online store
        self.s3 = s3_client  # Offline store
    
    def get_online_features(self, entity_id, feature_names):
        features = {}
        for feature in feature_names:
            key = f"{entity_id}:{feature}"
            features[feature] = self.redis.get(key)
        return features
    
    def write_features(self, entity_id, features):
        # Escreve no online store
        for feature_name, value in features.items():
            key = f"{entity_id}:{feature_name}"
            self.redis.set(key, value, ex=86400)  # TTL de 24h
        
        # Escreve no offline store para treinamento
        self.s3.put_object(
            Bucket='features',
            Key=f'{entity_id}/features.json',
            Body=json.dumps(features)
        )

Raridade: Média Dificuldade: Difícil


7. Como você implementa o versionamento de modelos e o rastreamento de experimentos?

Resposta: Rastreie experimentos para reproduzir resultados e comparar modelos.

# MLflow para rastreamento de experimentos
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

# Define o experimento
mlflow.set_experiment("model_comparison")

# Rastreia o experimento
with mlflow.start_run(run_name="random_forest_v1"):
    # Loga os parâmetros
    params = {
        'n_estimators': 100,
        'max_depth': 10,
        'min_samples_split': 2
    }
    mlflow.log_params(params)
    
    # Treina o modelo
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    
    # Loga as métricas
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)
    mlflow.log_metrics({
        'train_accuracy': train_score,
        'test_accuracy': test_score
    })
    
    # Loga o modelo
    mlflow.sklearn.log_model(model, "model")
    
    # Loga os artefatos
    mlflow.log_artifact("feature_importance.png")
    
    # Marca a execução
    mlflow.set_tags({
        'model_type': 'random_forest',
        'dataset_version': 'v2.0'
    })

# Carrega o melhor modelo
best_run = mlflow.search_runs(
    experiment_ids=['1'],
    order_by=['metrics.test_accuracy DESC'],
    max_results=1
)

model_uri = f"runs:/{best_run.iloc[0].run_id}/model"
loaded_model = mlflow.sklearn.load_model(model_uri)

# Alternativa com Weights & Biases
import wandb

wandb.init(project="ml-project", name="experiment-1")

# Loga os hiperparâmetros
wandb.config.update({
    'learning_rate': 0.001,
    'batch_size': 32,
    'epochs': 10
})

# Loga as métricas durante o treinamento
for epoch in range(10):
    # Código de treinamento
    wandb.log({
        'epoch': epoch,
        'train_loss': train_loss,
        'val_loss': val_loss,
        'accuracy': accuracy
    })

# Loga o modelo
wandb.save('model.h5')

# DVC para versionamento de dados e modelos
"""
# Inicializa o DVC
dvc init

# Rastreia os dados
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Adiciona os dados de treinamento"

# Rastreia o modelo
dvc add models/model.pkl
git add models/model.pkl.dvc
git commit -m "Adiciona o modelo treinado v1"

# Envia para o armazenamento remoto
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push
"""

Raridade: Muito Comum Dificuldade: Média


8. Como você implanta modelos no Kubernetes?

Resposta: O Kubernetes orquestra serviços de ML conteinerizados.

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
      - name: ml-model
        image: ml-model:v1
        ports:
        - containerPort: 5000
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        env:
        - name: MODEL_PATH
          value: "/models/model.pkl"
        livenessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 5000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model
  ports:
  - protocol: TCP
    port: 80
    targetPort: 5000
  type: LoadBalancer
---
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-model-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-model-deployment
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
# app.py com health checks
from flask import Flask, request, jsonify
import joblib
import logging

app = Flask(__name__)
model = None

@app.route('/health')
def health():
    """Liveness probe"""
    return jsonify({'status': 'healthy'}), 200

@app.route('/ready')
def ready():
    """Readiness probe"""
    if model is not None:
        return jsonify({'status': 'ready'}), 200
    return jsonify({'status': 'not ready'}), 503

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.get_json()
        prediction = model.predict([data['features']])
        return jsonify({'prediction': int(prediction[0])})
    except Exception as e:
        logging.error(f"Prediction error: {e}")
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    model = joblib.load('/models/model.pkl')
    app.run(host='0.0.0.0', port=5000)

Raridade: Comum Dificuldade: Difícil


9. O que é model drift e como você o detecta?

Resposta: O model drift ocorre quando o desempenho do modelo se degrada ao longo do tempo.

  • Tipos:
    • Data Drift: A distribuição de entrada muda
    • Concept Drift: A relação entre X e y muda
  • Detecção:
    • Testes estatísticos (teste KS, PSI)
    • Monitoramento de desempenho
    • Comparação de distribuição
import numpy as np
from scipy import stats
from sklearn.metrics import accuracy_score

class DriftDetector:
    def __init__(self, reference_data, threshold=0.05):
        self.reference_data = reference_data
        self.threshold = threshold
        self.reference_mean = np.mean(reference_data, axis=0)
        self.reference_std = np.std(reference_data, axis=0)
    
    def detect_data_drift(self, new_data):
        """Teste de Kolmogorov-Smirnov para cada feature"""
        drift_detected = []
        
        for i in range(new_data.shape[1]):
            statistic, p_value = stats.ks_2samp(
                self.reference_data[:, i],
                new_data[:, i]
            )
            
            if p_value < self.threshold:
                drift_detected.append({
                    'feature': i,
                    'p_value': p_value,
                    'statistic': statistic
                })
        
        return drift_detected
    
    def calculate_psi(self, expected, actual, buckets=10):
        """Population Stability Index"""
        def scale_range(x, min_val, max_val):
            return (x - min_val) / (max_val - min_val)
        
        min_val = min(expected.min(), actual.min())
        max_val = max(expected.max(), actual.max())
        
        expected_scaled = scale_range(expected, min_val, max_val)
        actual_scaled = scale_range(actual, min_val, max_val)
        
        expected_percents = np.histogram(expected_scaled, bins=buckets)[0] / len(expected)
        actual_percents = np.histogram(actual_scaled, bins=buckets)[0] / len(actual)
        
        psi = np.sum((actual_percents - expected_percents) * 
                     np.log((actual_percents + 1e-10) / (expected_percents + 1e-10)))
        
        return psi

class PerformanceMonitor:
    def __init__(self, model, window_size=1000):
        self.model = model
        self.window_size = window_size
        self.predictions = []
        self.actuals = []
        self.accuracies = []
    
    def log_prediction(self, X, y_true):
        y_pred = self.model.predict(X)
        
        self.predictions.extend(y_pred)
        self.actuals.extend(y_true)
        
        # Calcula a acurácia rolante
        if len(self.predictions) >= self.window_size:
            recent_preds = self.predictions[-self.window_size:]
            recent_actuals = self.actuals[-self.window_size:]
            accuracy = accuracy_score(recent_actuals, recent_preds)
            self.accuracies.append(accuracy)
            
            # Alerta se o desempenho cair
            if len(self.accuracies) > 10:
                recent_avg = np.mean(self.accuracies[-10:])
                baseline_avg = np.mean(self.accuracies[:10])
                
                if recent_avg < baseline_avg * 0.9:  # Queda de 10%
                    print(f"ALERTA: O desempenho caiu de {baseline_avg:.3f} para {recent_avg:.3f}")
                    return True
        
        return False

# Uso
detector = DriftDetector(X_train)
drift = detector.detect_data_drift(X_production)

if drift:
    print(f"Data drift detectado em {len(drift)} features")
    # Aciona o retreinamento

Raridade: Comum Dificuldade: Difícil


10. Como você implementa testes A/B para modelos de ML?

Resposta: O teste A/B compara versões de modelo em produção.

import random
import hashlib
from datetime import datetime

class ABTestFramework:
    def __init__(self, model_a, model_b, traffic_split=0.5):
        self.model_a = model_a
        self.model_b = model_b
        self.traffic_split = traffic_split
        self.results_a = []
        self.results_b = []
    
    def get_variant(self, user_id):
        """Atribuição consistente baseada no user_id"""
        hash_value = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
        return 'A' if (hash_value % 100) < (self.traffic_split * 100) else 'B'
    
    def predict(self, user_id, features):
        variant = self.get_variant(user_id)
        
        if variant == 'A':
            prediction = self.model_a.predict([features])[0]
            model_version = 'A'
        else:
            prediction = self.model_b.predict([features])[0]
            model_version = 'B'
        
        # Loga a predição
        self.log_prediction(user_id, features, prediction, model_version)
        
        return prediction, model_version
    
    def log_prediction(self, user_id, features, prediction, variant):
        log_entry = {
            'timestamp': datetime.now(),
            'user_id': user_id,
            'variant': variant,
            'prediction': prediction
        }
        
        if variant == 'A':
            self.results_a.append(log_entry)
        else:
            self.results_b.append(log_entry)
    
    def log_outcome(self, user_id, actual_value):
        """Loga o resultado real para análise"""
        # Encontra a predição nos logs e atualiza
        pass
    
    def analyze_results(self):
        """Análise estatística do teste A/B"""
        from scipy import stats
        
        # Calcula as taxas de conversão
        conversions_a = sum(1 for r in self.results_a if r.get('converted'))
        conversions_b = sum(1 for r in self.results_b if r.get('converted'))
        
        rate_a = conversions_a / len(self.results_a)
        rate_b = conversions_b / len(self.results_b)
        
        # Teste de significância estatística
        statistic, p_value = stats.chi2_contingency([
            [conversions_a, len(self.results_a) - conversions_a],
            [conversions_b, len(self.results_b) - conversions_b]
        ])[:2]
        
        return {
            'variant_a_rate': rate_a,
            'variant_b_rate': rate_b,
            'lift': (rate_b - rate_a) / rate_a * 100,
            'p_value': p_value,
            'significant': p_value < 0.05
        }

# Uso
ab_test = ABTestFramework(model_v1, model_v2, traffic_split=0.5)

# Faz as predições
for user_id, features in requests:
    prediction, variant = ab_test.predict(user_id, features)
    
# Analisa após coletar os dados
results = ab_test.analyze_results()
print(f"Lift da variante B: {results['lift']:.2f}%")
print(f"Estatisticamente significante: {results['significant']}")

Raridade: Comum Dificuldade: Difícil


Design de Sistema e Arquitetura (3 Perguntas)

11. Projete uma arquitetura de sistema de recomendação.

Resposta: Os sistemas de recomend

Newsletter subscription

Dicas de carreira semanais que realmente funcionam

Receba as últimas ideias diretamente na sua caixa de entrada

Destaque-se para Recrutadores e Conquiste o Emprego dos Seus Sonhos

Junte-se a milhares que transformaram suas carreiras com currículos impulsionados por IA que passam no ATS e impressionam gerentes de contratação.

Comece a criar agora

Compartilhar esta publicação

Faça Seus 6 Segundos Contarem

Os recrutadores escaneiam currículos por uma média de apenas 6 a 7 segundos. Nossos modelos comprovados são projetados para capturar atenção instantaneamente e mantê-los lendo.