dezembro 21, 2025
14 min de leitura

Perguntas para Entrevista de Engenheiro Sênior de Machine Learning: Guia Completo

interview
career-advice
job-search
Perguntas para Entrevista de Engenheiro Sênior de Machine Learning: Guia Completo
Milad Bonakdar

Milad Bonakdar

Autor

Domine a engenharia de ML avançada com perguntas essenciais para entrevistas, abrangendo treinamento distribuído, otimização de modelos, MLOps, design de sistemas e ML em produção em escala para engenheiros seniores de machine learning.


Introdução

Engenheiros de Machine Learning Sênior arquitetam e escalam sistemas de ML em produção, otimizam o desempenho do modelo, constroem infraestrutura de ML robusta e lideram iniciativas técnicas. Essa função exige experiência em sistemas distribuídos, técnicas avançadas de otimização, MLOps e a capacidade de resolver desafios complexos de engenharia.

Este guia abrangente aborda questões essenciais de entrevista para Engenheiros de Machine Learning Sênior, abrangendo treinamento distribuído, otimização de modelo, infraestrutura de MLOps, design de sistema, engenharia de features em escala e melhores práticas de produção. Cada pergunta inclui respostas detalhadas, avaliação de raridade e classificação de dificuldade.


Treinamento Distribuído e Escalabilidade (5 Perguntas)

1. Como você implementa o treinamento distribuído para modelos de deep learning?

Resposta: O treinamento distribuído paraleliza a computação em várias GPUs/máquinas.

  • Estratégias:
    • Paralelismo de Dados: Mesmo modelo, diferentes lotes de dados
    • Paralelismo de Modelo: Divide o modelo entre dispositivos
    • Paralelismo de Pipeline: Divide o modelo em estágios
  • Frameworks: PyTorch DDP, Horovod, TensorFlow MirroredStrategy
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

# Inicializa o treinamento distribuído
def setup_distributed(rank, world_size):
    dist.init_process_group(
        backend='nccl',  # Use 'gloo' para CPU
        init_method='env://',
        world_size=world_size,
        rank=rank
    )

# Configuração do modelo
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(784, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )
    
    def forward(self, x):
        return self.layers(x)

def train_distributed(rank, world_size):
    setup_distributed(rank, world_size)
    
    # Cria o modelo e move para a GPU
    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])
    
    # Cria o sampler distribuído
    train_dataset = MyDataset()
    train_sampler = DistributedSampler(
        train_dataset,
        num_replicas=world_size,
        rank=rank
    )
    
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=32,
        sampler=train_sampler
    )
    
    optimizer = torch.optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(10):
        train_sampler.set_epoch(epoch)  # Embaralha de forma diferente a cada época
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(rank), target.to(rank)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            if rank == 0 and batch_idx % 100 == 0:
                print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
    
    dist.destroy_process_group()

# Inicia com torch.multiprocessing
import torch.multiprocessing as mp

if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    mp.spawn(train_distributed, args=(world_size,), nprocs=world_size)

# Treinamento distribuído com TensorFlow
import tensorflow as tf

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(512, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    
    model.compile(
        optimizer='adam',
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )

# model.fit irá distribuir o treinamento automaticamente
# model.fit(train_dataset, epochs=10)

Raridade: Comum Dificuldade: Difícil


2. Explique o acúmulo de gradiente e quando usá-lo.

Resposta: O acúmulo de gradiente simula tamanhos de lote maiores quando a memória da GPU é limitada.

  • Como funciona: Acumula gradientes em várias passagens forward antes de atualizar os pesos
  • Casos de uso: Modelos grandes, memória limitada da GPU, treinamento estável
import torch
import torch.nn as nn

model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

# Tamanho efetivo do lote = batch_size * accumulation_steps
batch_size = 8
accumulation_steps = 4  # Tamanho efetivo do lote = 32

model.train()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    # Passagem forward
    output = model(data)
    loss = criterion(output, target)
    
    # Normaliza a perda pelos passos de acúmulo
    loss = loss / accumulation_steps
    
    # Passagem backward (acumula gradientes)
    loss.backward()
    
    # Atualiza os pesos a cada accumulation_steps
    if (batch_idx + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

# Treinamento de precisão mista com acúmulo de gradiente
from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    with autocast():
        output = model(data)
        loss = criterion(output, target) / accumulation_steps
    
    scaler.scale(loss).backward()
    
    if (batch_idx + 1) % accumulation_steps == 0:
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

Raridade: Comum Dificuldade: Média


3. Como você otimiza a latência de inferência do modelo?

Resposta: Várias técnicas reduzem o tempo de inferência:

  • Otimização do Modelo:
    • Quantização (INT8, FP16)
    • Poda (remove pesos)
    • Destilação de conhecimento
    • Compilação do modelo (TorchScript, ONNX)
  • Otimização de Serving:
    • Batching
    • Caching
    • Paralelismo de modelo
    • Aceleração de hardware (GPU, TPU)
import torch
import torch.nn as nn

# 1. Quantização (INT8)
model = MyModel()
model.eval()

# Quantização dinâmica
quantized_model = torch.quantization.quantize_dynamic(
    model, {nn.Linear}, dtype=torch.qint8
)

# Quantização estática (mais precisa)
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# Calibra com dados representativos
# for data in calibration_loader:
#     model(data)
torch.quantization.convert(model, inplace=True)

# 2. Compilação com TorchScript
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')

# 3. Exportação para ONNX
dummy_input = torch.randn(1, 784)
torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}}
)

# 4. Otimização com TensorRT (NVIDIA)
import tensorrt as trt

# 5. Poda
import torch.nn.utils.prune as prune

# Poda 30% dos pesos na camada linear
prune.l1_unstructured(model.fc1, name='weight', amount=0.3)

# Torna a poda permanente
prune.remove(model.fc1, 'weight')

# 6. Destilação de conhecimento
class DistillationLoss(nn.Module):
    def __init__(self, temperature=3.0):
        super().__init__()
        self.temperature = temperature
        self.kl_div = nn.KLDivLoss(reduction='batchmean')
    
    def forward(self, student_logits, teacher_logits, labels):
        # Soft targets do professor
        soft_loss = self.kl_div(
            F.log_softmax(student_logits / self.temperature, dim=1),
            F.softmax(teacher_logits / self.temperature, dim=1)
        ) * (self.temperature ** 2)
        
        # Hard targets
        hard_loss = F.cross_entropy(student_logits, labels)
        
        return 0.5 * soft_loss + 0.5 * hard_loss

# 7. Batching para inferência
class BatchPredictor:
    def __init__(self, model, max_batch_size=32, max_wait_time=0.1):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.queue = []
    
    async def predict(self, x):
        self.queue.append(x)
        
        if len(self.queue) >= self.max_batch_size:
            return await self._process_batch()
        
        # Espera por mais requisições ou timeout
        await asyncio.sleep(self.max_wait_time)
        return await self._process_batch()
    
    async def _process_batch(self):
        batch = torch.stack(self.queue)
        self.queue = []
        return self.model(batch)

Raridade: Muito Comum Dificuldade: Difícil


4. O que é treinamento de precisão mista e como funciona?

Resposta: A precisão mista usa FP16 e FP32 para acelerar o treinamento, mantendo a precisão.

  • Benefícios:
    • Treinamento 2-3x mais rápido
    • Uso reduzido de memória
    • Tamanhos de lote maiores
  • Desafios:
    • Estabilidade numérica
    • Underflow de gradiente
  • Solução: Escalonamento de gradiente
import torch
from torch.cuda.amp import autocast, GradScaler

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()

for epoch in range(10):
    for data, target in train_loader:
        data, target = data.cuda(), target.cuda()
        
        optimizer.zero_grad()
        
        # Passagem forward em FP16
        with autocast():
            output = model(data)
            loss = criterion(output, target)
        
        # Passagem backward com escalonamento de gradiente
        scaler.scale(loss).backward()
        
        # Desescala os gradientes e clipa
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # Atualiza os pesos
        scaler.step(optimizer)
        scaler.update()

# Precisão mista no TensorFlow
from tensorflow.keras import mixed_precision

policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

# O modelo usa automaticamente FP16 para computação
model = tf.keras.Sequential([
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(10)
])

# O escalonamento da perda é tratado automaticamente
optimizer = tf.keras.optimizers.Adam()
optimizer = mixed_precision.LossScaleOptimizer(optimizer)

Raridade: Comum Dificuldade: Média


5. Como você lida com gargalos no pipeline de dados?

Resposta: O carregamento de dados geralmente cria gargalos no treinamento. Otimize com:

  • Prefetching: Carrega o próximo lote durante o treinamento
  • Carregamento paralelo: Vários workers
  • Caching: Armazena dados pré-processados
  • Formato de dados: Use formatos eficientes (TFRecord, Parquet)
import torch
from torch.utils.data import DataLoader, Dataset
import multiprocessing as mp

# Configuração eficiente do DataLoader
train_loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=mp.cpu_count(),  # Carregamento paralelo
    pin_memory=True,  # Transferência mais rápida para a GPU
    prefetch_factor=2,  # Prefetch de lotes
    persistent_workers=True  # Mantém os workers ativos
)

# Dataset customizado com caching
class CachedDataset(Dataset):
    def __init__(self, data_path, cache_size=1000):
        self.data_path = data_path
        self.cache = {}
        self.cache_size = cache_size
    
    def __getitem__(self, idx):
        if idx in self.cache:
            return self.cache[idx]
        
        # Carrega e pré-processa
        data = load_and_preprocess(self.data_path, idx)
        
        # Armazena em cache se houver espaço disponível
        if len(self.cache) < self.cache_size:
            self.cache[idx] = data
        
        return data

# Otimização do pipeline de dados no TensorFlow
import tensorflow as tf

dataset = tf.data.Dataset.from_tensor_slices((X, y))

dataset = dataset.cache()  # Armazena em cache na memória
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)  # Prefetching automático
dataset = dataset.map(
    preprocess_function,
    num_parallel_calls=tf.data.AUTOTUNE
)

# Use TFRecord para datasets grandes
def create_tfrecord(data, labels, filename):
    with tf.io.TFRecordWriter(filename) as writer:
        for x, y in zip(data, labels):
            example = tf.train.Example(features=tf.train.Features(feature={
                'data': tf.train.Feature(float_list=tf.train.FloatList(value=x)),
                'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[y]))
            }))
            writer.write(example.SerializeToString())

Raridade: Comum Dificuldade: Média


MLOps e Infraestrutura (5 Perguntas)

6. Como você projeta um feature store?

Resposta: Os feature stores centralizam a engenharia e o serving de features.

Loading diagram...
  • Componentes:
    • Offline Store: Features históricas para treinamento (S3, BigQuery)
    • Online Store: Features de baixa latência para serving (Redis, DynamoDB)
    • Feature Registry: Metadados e linhagem
  • Benefícios:
    • Reusabilidade
    • Consistência (treinamento/serving)
    • Monitoramento
# Exemplo com Feast (feature store de código aberto)
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.value_type import ValueType
from datetime import timedelta

# Define a entidade
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="ID do usuário"
)

# Define a feature view
user_features = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="age", dtype=ValueType.INT64),
        Feature(name="total_purchases", dtype=ValueType.FLOAT),
        Feature(name="avg_purchase_value", dtype=ValueType.FLOAT),
    ],
    online=True,
    batch_source=FileSource(
        path="data/user_features.parquet",
        event_timestamp_column="timestamp"
    )
)

# Inicializa o feature store
fs = FeatureStore(repo_path=".")

# Obtém as features para treinamento (offline)
training_df = fs.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:age",
        "user_features:total_purchases",
        "user_features:avg_purchase_value"
    ]
).to_df()

# Obtém as features para serving (online)
online_features = fs.get_online_features(
    features=[
        "user_features:age",
        "user_features:total_purchases"
    ],
    entity_rows=[{"user_id": 123}]
).to_dict()

# Implementação customizada de feature store
class SimpleFeatureStore:
    def __init__(self, redis_client, s3_client):
        self.redis = redis_client  # Online store
        self.s3 = s3_client  # Offline store
    
    def get_online_features(self, entity_id, feature_names):
        features = {}
        for feature in feature_names:
            key = f"{entity_id}:{feature}"
            features[feature] = self.redis.get(key)
        return features
    
    def write_features(self, entity_id, features):
        # Escreve no online store
        for feature_name, value in features.items():
            key = f"{entity_id}:{feature_name}"
            self.redis.set(key, value, ex=86400)  # TTL de 24h
        
        # Escreve no offline store para treinamento
        self.s3.put_object(
            Bucket='features',
            Key=f'{entity_id}/features.json',
            Body=json.dumps(features)
        )

Raridade: Média Dificuldade: Difícil


7. Como você implementa o versionamento de modelos e o rastreamento de experimentos?

Resposta: Rastreie experimentos para reproduzir resultados e comparar modelos.

# MLflow para rastreamento de experimentos
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

# Define o experimento
mlflow.set_experiment("model_comparison")

# Rastreia o experimento
with mlflow.start_run(run_name="random_forest_v1"):
    # Loga os parâmetros
    params = {
        'n_estimators': 100,
        'max_depth': 10,
        'min_samples_split': 2
    }
    mlflow.log_params(params)
    
    # Treina o modelo
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    
    # Loga as métricas
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)
    mlflow.log_metrics({
        'train_accuracy': train_score,
        'test_accuracy': test_score
    })
    
    # Loga o modelo
    mlflow.sklearn.log_model(model, "model")
    
    # Loga os artefatos
    mlflow.log_artifact("feature_importance.png")
    
    # Marca a execução
    mlflow.set_tags({
        'model_type': 'random_forest',
        'dataset_version': 'v2.0'
    })

# Carrega o melhor modelo
best_run = mlflow.search_runs(
    experiment_ids=['1'],
    order_by=['metrics.test_accuracy DESC'],
    max_results=1
)

model_uri = f"runs:/{best_run.iloc[0].run_id}/model"
loaded_model = mlflow.sklearn.load_model(model_uri)

# Alternativa com Weights & Biases
import wandb

wandb.init(project="ml-project", name="experiment-1")

# Loga os hiperparâmetros
wandb.config.update({
    'learning_rate': 0.001,
    'batch_size': 32,
    'epochs': 10
})

# Loga as métricas durante o treinamento
for epoch in range(10):
    # Código de treinamento
    wandb.log({
        'epoch': epoch,
        'train_loss': train_loss,
        'val_loss': val_loss,
        'accuracy': accuracy
    })

# Loga o modelo
wandb.save('model.h5')

# DVC para versionamento de dados e modelos
"""
# Inicializa o DVC
dvc init

# Rastreia os dados
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Adiciona os dados de treinamento"

# Rastreia o modelo
dvc add models/model.pkl
git add models/model.pkl.dvc
git commit -m "Adiciona o modelo treinado v1"

# Envia para o armazenamento remoto
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push
"""

Raridade: Muito Comum Dificuldade: Média


8. Como você implanta modelos no Kubernetes?

Resposta: O Kubernetes orquestra serviços de ML conteinerizados.

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
      - name: ml-model
        image: ml-model:v1
        ports:
        - containerPort: 5000
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        env:
        - name: MODEL_PATH
          value: "/models/model.pkl"
        livenessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 5000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model
  ports:
  - protocol: TCP
    port: 80
    targetPort: 5000
  type: LoadBalancer
---
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-model-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-model-deployment
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
# app.py com health checks
from flask import Flask, request, jsonify
import joblib
import logging

app = Flask(__name__)
model = None

@app.route('/health')
def health():
    """Liveness probe"""
    return jsonify({'status': 'healthy'}), 200

@app.route('/ready')
def ready():
    """Readiness probe"""
    if model is not None:
        return jsonify({'status': 'ready'}), 200
    return jsonify({'status': 'not ready'}), 503

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.get_json()
        prediction = model.predict([data['features']])
        return jsonify({'prediction': int(prediction[0])})
    except Exception as e:
        logging.error(f"Prediction error: {e}")
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    model = joblib.load('/models/model.pkl')
    app.run(host='0.0.0.0', port=5000)

Raridade: Comum Dificuldade: Difícil


9. O que é model drift e como você o detecta?

Resposta: O model drift ocorre quando o desempenho do modelo se degrada ao longo do tempo.

  • Tipos:
    • Data Drift: A distribuição de entrada muda
    • Concept Drift: A relação entre X e y muda
  • Detecção:
    • Testes estatísticos (teste KS, PSI)
    • Monitoramento de desempenho
    • Comparação de distribuição
import numpy as np
from scipy import stats
from sklearn.metrics import accuracy_score

class DriftDetector:
    def __init__(self, reference_data, threshold=0.05):
        self.reference_data = reference_data
        self.threshold = threshold
        self.reference_mean = np.mean(reference_data, axis=0)
        self.reference_std = np.std(reference_data, axis=0)
    
    def detect_data_drift(self, new_data):
        """Teste de Kolmogorov-Smirnov para cada feature"""
        drift_detected = []
        
        for i in range(new_data.shape[1]):
            statistic, p_value = stats.ks_2samp(
                self.reference_data[:, i],
                new_data[:, i]
            )
            
            if p_value < self.threshold:
                drift_detected.append({
                    'feature': i,
                    'p_value': p_value,
                    'statistic': statistic
                })
        
        return drift_detected
    
    def calculate_psi(self, expected, actual, buckets=10):
        """Population Stability Index"""
        def scale_range(x, min_val, max_val):
            return (x - min_val) / (max_val - min_val)
        
        min_val = min(expected.min(), actual.min())
        max_val = max(expected.max(), actual.max())
        
        expected_scaled = scale_range(expected, min_val, max_val)
        actual_scaled = scale_range(actual, min_val, max_val)
        
        expected_percents = np.histogram(expected_scaled, bins=buckets)[0] / len(expected)
        actual_percents = np.histogram(actual_scaled, bins=buckets)[0] / len(actual)
        
        psi = np.sum((actual_percents - expected_percents) * 
                     np.log((actual_percents + 1e-10) / (expected_percents + 1e-10)))
        
        return psi

class PerformanceMonitor:
    def __init__(self, model, window_size=1000):
        self.model = model
        self.window_size = window_size
        self.predictions = []
        self.actuals = []
        self.accuracies = []
    
    def log_prediction(self, X, y_true):
        y_pred = self.model.predict(X)
        
        self.predictions.extend(y_pred)
        self.actuals.extend(y_true)
        
        # Calcula a acurácia rolante
        if len(self.predictions) >= self.window_size:
            recent_preds = self.predictions[-self.window_size:]
            recent_actuals = self.actuals[-self.window_size:]
            accuracy = accuracy_score(recent_actuals, recent_preds)
            self.accuracies.append(accuracy)
            
            # Alerta se o desempenho cair
            if len(self.accuracies) > 10:
                recent_avg = np.mean(self.accuracies[-10:])
                baseline_avg = np.mean(self.accuracies[:10])
                
                if recent_avg < baseline_avg * 0.9:  # Queda de 10%
                    print(f"ALERTA: O desempenho caiu de {baseline_avg:.3f} para {recent_avg:.3f}")
                    return True
        
        return False

# Uso
detector = DriftDetector(X_train)
drift = detector.detect_data_drift(X_production)

if drift:
    print(f"Data drift detectado em {len(drift)} features")
    # Aciona o retreinamento

Raridade: Comum Dificuldade: Difícil


10. Como você implementa testes A/B para modelos de ML?

Resposta: O teste A/B compara versões de modelo em produção.

import random
import hashlib
from datetime import datetime

class ABTestFramework:
    def __init__(self, model_a, model_b, traffic_split=0.5):
        self.model_a = model_a
        self.model_b = model_b
        self.traffic_split = traffic_split
        self.results_a = []
        self.results_b = []
    
    def get_variant(self, user_id):
        """Atribuição consistente baseada no user_id"""
        hash_value = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
        return 'A' if (hash_value % 100) < (self.traffic_split * 100) else 'B'
    
    def predict(self, user_id, features):
        variant = self.get_variant(user_id)
        
        if variant == 'A':
            prediction = self.model_a.predict([features])[0]
            model_version = 'A'
        else:
            prediction = self.model_b.predict([features])[0]
            model_version = 'B'
        
        # Loga a predição
        self.log_prediction(user_id, features, prediction, model_version)
        
        return prediction, model_version
    
    def log_prediction(self, user_id, features, prediction, variant):
        log_entry = {
            'timestamp': datetime.now(),
            'user_id': user_id,
            'variant': variant,
            'prediction': prediction
        }
        
        if variant == 'A':
            self.results_a.append(log_entry)
        else:
            self.results_b.append(log_entry)
    
    def log_outcome(self, user_id, actual_value):
        """Loga o resultado real para análise"""
        # Encontra a predição nos logs e atualiza
        pass
    
    def analyze_results(self):
        """Análise estatística do teste A/B"""
        from scipy import stats
        
        # Calcula as taxas de conversão
        conversions_a = sum(1 for r in self.results_a if r.get('converted'))
        conversions_b = sum(1 for r in self.results_b if r.get('converted'))
        
        rate_a = conversions_a / len(self.results_a)
        rate_b = conversions_b / len(self.results_b)
        
        # Teste de significância estatística
        statistic, p_value = stats.chi2_contingency([
            [conversions_a, len(self.results_a) - conversions_a],
            [conversions_b, len(self.results_b) - conversions_b]
        ])[:2]
        
        return {
            'variant_a_rate': rate_a,
            'variant_b_rate': rate_b,
            'lift': (rate_b - rate_a) / rate_a * 100,
            'p_value': p_value,
            'significant': p_value < 0.05
        }

# Uso
ab_test = ABTestFramework(model_v1, model_v2, traffic_split=0.5)

# Faz as predições
for user_id, features in requests:
    prediction, variant = ab_test.predict(user_id, features)
    
# Analisa após coletar os dados
results = ab_test.analyze_results()
print(f"Lift da variante B: {results['lift']:.2f}%")
print(f"Estatisticamente significante: {results['significant']}")

Raridade: Comum Dificuldade: Difícil


Design de Sistema e Arquitetura (3 Perguntas)

11. Projete uma arquitetura de sistema de recomendação.

Resposta: Os sistemas de recomend

Newsletter subscription

Dicas de carreira semanais que realmente funcionam

Receba as últimas ideias diretamente na sua caixa de entrada

Destaque-se para Recrutadores e Conquiste o Emprego dos Seus Sonhos

Junte-se a milhares que transformaram suas carreiras com currículos impulsionados por IA que passam no ATS e impressionam gerentes de contratação.

Comece a criar agora

Compartilhar esta publicação

Faça Seus 6 Segundos Contarem

Os recrutadores escaneiam currículos por uma média de apenas 6 a 7 segundos. Nossos modelos comprovados são projetados para capturar atenção instantaneamente e mantê-los lendo.