diciembre 21, 2025
14 min de lectura

Preguntas de entrevista para Senior Machine Learning Engineer en producción

interview
career-advice
job-search
Preguntas de entrevista para Senior Machine Learning Engineer en producción
Milad Bonakdar

Milad Bonakdar

Autor

Las entrevistas senior de ML evalúan criterio de producción: diseño de sistemas, MLOps, entrenamiento distribuido, latencia, monitoreo y trade-offs. Practica respuestas claras y prácticas.


Introducción

Las entrevistas para Senior Machine Learning Engineer suelen evaluar criterio de producción: si puedes diseñar un sistema de ML suficientemente preciso, rápido, observable, reproducible y mantenible después del lanzamiento. Espera preguntas sobre MLOps, diseño de sistemas de ML, model serving, entrenamiento distribuido, pipelines de features, drift y experimentación.

Usa esta guía para practicar respuestas que expliquen trade-offs, no solo herramientas. Una buena respuesta senior empieza por requisitos y métricas, y después conecta datos, features, entrenamiento, despliegue, monitoreo y rollback.


Entrenamiento Distribuido y Escalabilidad (5 Preguntas)

1. ¿Cómo implementa el entrenamiento distribuido para modelos de aprendizaje profundo?

Respuesta: El entrenamiento distribuido paraleliza la computación a través de múltiples GPUs/máquinas.

  • Estrategias:
    • Paralelismo de Datos: Mismo modelo, diferentes lotes de datos
    • Paralelismo de Modelo: Divide el modelo entre dispositivos
    • Paralelismo de Pipeline: Divide el modelo en etapas
  • Frameworks: PyTorch DDP, Horovod, TensorFlow MirroredStrategy
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

# Inicializar el entrenamiento distribuido
def setup_distributed(rank, world_size):
    dist.init_process_group(
        backend='nccl',  # Usar 'gloo' para CPU
        init_method='env://',
        world_size=world_size,
        rank=rank
    )

# Configuración del modelo
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(784, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )
    
    def forward(self, x):
        return self.layers(x)

def train_distributed(rank, world_size):
    setup_distributed(rank, world_size)
    
    # Crear modelo y mover a la GPU
    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])
    
    # Crear muestreador distribuido
    train_dataset = MyDataset()
    train_sampler = DistributedSampler(
        train_dataset,
        num_replicas=world_size,
        rank=rank
    )
    
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=32,
        sampler=train_sampler
    )
    
    optimizer = torch.optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(10):
        train_sampler.set_epoch(epoch)  # Mezclar de manera diferente cada época
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(rank), target.to(rank)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            if rank == 0 and batch_idx % 100 == 0:
                print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
    
    dist.destroy_process_group()

# Lanzar con torch.multiprocessing
import torch.multiprocessing as mp

if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    mp.spawn(train_distributed, args=(world_size,), nprocs=world_size)

# Entrenamiento distribuido en TensorFlow
import tensorflow as tf

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(512, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    
    model.compile(
        optimizer='adam',
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )

# model.fit distribuirá automáticamente el entrenamiento
# model.fit(train_dataset, epochs=10)

Rareza: Común Dificultad: Difícil


2. Explique la acumulación de gradientes y cuándo usarla.

Respuesta: La acumulación de gradientes simula tamaños de lote más grandes cuando la memoria de la GPU es limitada.

  • Cómo funciona: Acumula gradientes durante múltiples pases hacia adelante antes de actualizar los pesos.
  • Casos de uso: Modelos grandes, memoria de GPU limitada, entrenamiento estable.
import torch
import torch.nn as nn

model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

# Tamaño de lote efectivo = batch_size * accumulation_steps
batch_size = 8
accumulation_steps = 4  # Tamaño de lote efectivo = 32

model.train()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    # Pase hacia adelante
    output = model(data)
    loss = criterion(output, target)
    
    # Normalizar la pérdida por los pasos de acumulación
    loss = loss / accumulation_steps
    
    # Pase hacia atrás (acumular gradientes)
    loss.backward()
    
    # Actualizar los pesos cada accumulation_steps
    if (batch_idx + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

# Entrenamiento con precisión mixta con acumulación de gradientes
from torch.amp import autocast, GradScaler

scaler = GradScaler("cuda")
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    with autocast("cuda"):
        output = model(data)
        loss = criterion(output, target) / accumulation_steps
    
    scaler.scale(loss).backward()
    
    if (batch_idx + 1) % accumulation_steps == 0:
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

Rareza: Común Dificultad: Media


3. ¿Cómo optimiza la latencia de inferencia del modelo?

Respuesta: Múltiples técnicas reducen el tiempo de inferencia:

  • Optimización del Modelo:
    • Cuantización (INT8, FP16)
    • Poda (eliminar pesos)
    • Destilación del conocimiento
    • Compilación del modelo (TorchScript, ONNX)
  • Optimización del Servidor:
    • Lotes (Batching)
    • Almacenamiento en caché (Caching)
    • Paralelismo del modelo
    • Aceleración por hardware (GPU, TPU)
import torch
import torch.nn as nn

# 1. Cuantización (INT8)
model = MyModel()
model.eval()

# Cuantización dinámica
quantized_model = torch.quantization.quantize_dynamic(
    model, {nn.Linear}, dtype=torch.qint8
)

# Cuantización estática (más precisa)
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# Calibrar con datos representativos
# for data in calibration_loader:
#     model(data)
torch.quantization.convert(model, inplace=True)

# 2. Compilación de TorchScript
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')

# 3. Exportación a ONNX
dummy_input = torch.randn(1, 784)
torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}}
)

# 4. Optimización de TensorRT (NVIDIA)
import tensorrt as trt

# 5. Poda
import torch.nn.utils.prune as prune

# Poda del 30% de los pesos en la capa lineal
prune.l1_unstructured(model.fc1, name='weight', amount=0.3)

# Hacer la poda permanente
prune.remove(model.fc1, 'weight')

# 6. Destilación del conocimiento
class DistillationLoss(nn.Module):
    def __init__(self, temperature=3.0):
        super().__init__()
        self.temperature = temperature
        self.kl_div = nn.KLDivLoss(reduction='batchmean')
    
    def forward(self, student_logits, teacher_logits, labels):
        # Objetivos blandos del profesor
        soft_loss = self.kl_div(
            F.log_softmax(student_logits / self.temperature, dim=1),
            F.softmax(teacher_logits / self.temperature, dim=1)
        ) * (self.temperature ** 2)
        
        # Objetivos duros
        hard_loss = F.cross_entropy(student_logits, labels)
        
        return 0.5 * soft_loss + 0.5 * hard_loss

# 7. Lotes para la inferencia
class BatchPredictor:
    def __init__(self, model, max_batch_size=32, max_wait_time=0.1):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.queue = []
    
    async def predict(self, x):
        self.queue.append(x)
        
        if len(self.queue) >= self.max_batch_size:
            return await self._process_batch()
        
        # Esperar más solicitudes o timeout
        await asyncio.sleep(self.max_wait_time)
        return await self._process_batch()
    
    async def _process_batch(self):
        batch = torch.stack(self.queue)
        self.queue = []
        return self.model(batch)

Rareza: Muy Común Dificultad: Difícil


4. ¿Qué es el entrenamiento de precisión mixta y cómo funciona?

Respuesta: La precisión mixta utiliza FP16 y FP32 para acelerar el entrenamiento manteniendo la precisión.

  • Beneficios:
    • Entrenamiento 2-3 veces más rápido
    • Reducción del uso de memoria
    • Tamaños de lote más grandes
  • Desafíos:
    • Estabilidad numérica
    • Subdesbordamiento del gradiente
  • Solución: Escala del gradiente
import torch
from torch.amp import autocast, GradScaler

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler("cuda")

for epoch in range(10):
    for data, target in train_loader:
        data, target = data.cuda(), target.cuda()
        
        optimizer.zero_grad()
        
        # Pase hacia adelante en FP16
        with autocast("cuda"):
            output = model(data)
            loss = criterion(output, target)
        
        # Pase hacia atrás con escala del gradiente
        scaler.scale(loss).backward()
        
        # Desescalar los gradientes y recortar
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # Actualizar los pesos
        scaler.step(optimizer)
        scaler.update()

# Precisión mixta en TensorFlow
from tensorflow.keras import mixed_precision

policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

# El modelo usa automáticamente FP16 para la computación
model = tf.keras.Sequential([
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(10)
])

# Escala de la pérdida manejada automáticamente
optimizer = tf.keras.optimizers.Adam()
optimizer = mixed_precision.LossScaleOptimizer(optimizer)

Rareza: Común Dificultad: Media


5. ¿Cómo maneja los cuellos de botella en la canalización de datos?

Respuesta: La carga de datos a menudo crea cuellos de botella en el entrenamiento. Optimice con:

  • Prefetching (Precarga): Cargar el siguiente lote mientras se entrena.
  • Carga paralela: Múltiples trabajadores
  • Caching (Caché): Almacenar datos preprocesados
  • Formato de datos: Utilizar formatos eficientes (TFRecord, Parquet)
import torch
from torch.utils.data import DataLoader, Dataset
import multiprocessing as mp

# Configuración eficiente de DataLoader
train_loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=mp.cpu_count(),  # Carga paralela
    pin_memory=True,  # Transferencia más rápida a la GPU
    prefetch_factor=2,  # Precargar lotes
    persistent_workers=True  # Mantener los trabajadores activos
)

# Dataset personalizado con caché
class CachedDataset(Dataset):
    def __init__(self, data_path, cache_size=1000):
        self.data_path = data_path
        self.cache = {}
        self.cache_size = cache_size
    
    def __getitem__(self, idx):
        if idx in self.cache:
            return self.cache[idx]
        
        # Cargar y preprocesar
        data = load_and_preprocess(self.data_path, idx)
        
        # Almacenar en caché si hay espacio disponible
        if len(self.cache) < self.cache_size:
            self.cache[idx] = data
        
        return data

# Optimización de la canalización de datos en TensorFlow
import tensorflow as tf

dataset = tf.data.Dataset.from_tensor_slices((X, y))

dataset = dataset.cache()  # Almacenar en caché en la memoria
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)  # Precarga automática
dataset = dataset.map(
    preprocess_function,
    num_parallel_calls=tf.data.AUTOTUNE
)

# Usar TFRecord para conjuntos de datos grandes
def create_tfrecord(data, labels, filename):
    with tf.io.TFRecordWriter(filename) as writer:
        for x, y in zip(data, labels):
            example = tf.train.Example(features=tf.train.Features(feature={
                'data': tf.train.Feature(float_list=tf.train.FloatList(value=x)),
                'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[y]))
            }))
            writer.write(example.SerializeToString())

Rareza: Común Dificultad: Media


MLOps e Infraestructura (5 Preguntas)

6. ¿Cómo diseña un almacén de características (Feature Store)?

Respuesta: Los almacenes de características centralizan la ingeniería y el servicio de características.

Loading diagram...
  • Componentes:
    • Almacén Offline: Características históricas para el entrenamiento (S3, BigQuery)
    • Almacén Online: Características de baja latencia para el servicio (Redis, DynamoDB)
    • Registro de Características: Metadatos y linaje
  • Beneficios:
    • Reutilización
    • Consistencia (entrenamiento/servicio)
    • Monitorización
# Ejemplo con Feast (almacén de características de código abierto)
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.value_type import ValueType
from datetime import timedelta

# Definir entidad
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="ID de Usuario"
)

# Definir vista de características
user_features = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="age", dtype=ValueType.INT64),
        Feature(name="total_purchases", dtype=ValueType.FLOAT),
        Feature(name="avg_purchase_value", dtype=ValueType.FLOAT),
    ],
    online=True,
    batch_source=FileSource(
        path="data/user_features.parquet",
        event_timestamp_column="timestamp"
    )
)

# Inicializar el almacén de características
fs = FeatureStore(repo_path=".")

# Obtener características para el entrenamiento (offline)
training_df = fs.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:age",
        "user_features:total_purchases",
        "user_features:avg_purchase_value"
    ]
).to_df()

# Obtener características para el servicio (online)
online_features = fs.get_online_features(
    features=[
        "user_features:age",
        "user_features:total_purchases"
    ],
    entity_rows=[{"user_id": 123}]
).to_dict()

# Implementación personalizada de un almacén de características
class SimpleFeatureStore:
    def __init__(self, redis_client, s3_client):
        self.redis = redis_client  # Almacén online
        self.s3 = s3_client  # Almacén offline
    
    def get_online_features(self, entity_id, feature_names):
        features = {}
        for feature in feature_names:
            key = f"{entity_id}:{feature}"
            features[feature] = self.redis.get(key)
        return features
    
    def write_features(self, entity_id, features):
        # Escribir en el almacén online
        for feature_name, value in features.items():
            key = f"{entity_id}:{feature_name}"
            self.redis.set(key, value, ex=86400)  # TTL de 24h
        
        # Escribir en el almacén offline para el entrenamiento
        self.s3.put_object(
            Bucket='features',
            Key=f'{entity_id}/features.json',
            Body=json.dumps(features)
        )

Rareza: Media Dificultad: Difícil


7. ¿Cómo implementa el versionado de modelos y el seguimiento de experimentos?

Respuesta: Realice un seguimiento de los experimentos para reproducir los resultados y comparar los modelos.

# MLflow para el seguimiento de experimentos
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

# Establecer experimento
mlflow.set_experiment("model_comparison")

# Realizar un seguimiento del experimento
with mlflow.start_run(run_name="random_forest_v1"):
    # Registrar parámetros
    params = {
        'n_estimators': 100,
        'max_depth': 10,
        'min_samples_split': 2
    }
    mlflow.log_params(params)
    
    # Entrenar el modelo
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    
    # Registrar métricas
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)
    mlflow.log_metrics({
        'train_accuracy': train_score,
        'test_accuracy': test_score
    })
    
    # Registrar el modelo
    mlflow.sklearn.log_model(model, "model")
    
    # Registrar artefactos
    mlflow.log_artifact("feature_importance.png")
    
    # Etiquetar la ejecución
    mlflow.set_tags({
        'model_type': 'random_forest',
        'dataset_version': 'v2.0'
    })

# Cargar el mejor modelo
best_run = mlflow.search_runs(
    experiment_ids=['1'],
    order_by=['metrics.test_accuracy DESC'],
    max_results=1
)

model_uri = f"runs:/{best_run.iloc[0].run_id}/model"
loaded_model = mlflow.sklearn.load_model(model_uri)

# Alternativa con Weights & Biases
import wandb

wandb.init(project="ml-project", name="experiment-1")

# Registrar los hiperparámetros
wandb.config.update({
    'learning_rate': 0.001,
    'batch_size': 32,
    'epochs': 10
})

# Registrar métricas durante el entrenamiento
for epoch in range(10):
    # Código de entrenamiento
    wandb.log({
        'epoch': epoch,
        'train_loss': train_loss,
        'val_loss': val_loss,
        'accuracy': accuracy
    })

# Registrar el modelo
wandb.save('model.h5')

# DVC para el versionado de datos y modelos
"""
# Inicializar DVC
dvc init

# Realizar un seguimiento de los datos
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Añadir datos de entrenamiento"

# Realizar un seguimiento del modelo
dvc add models/model.pkl
git add models/model.pkl.dvc
git commit -m "Añadir modelo entrenado v1"

# Enviar al almacenamiento remoto
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push
"""

Rareza: Muy Común Dificultad: Media


8. ¿Cómo implementa el despliegue de modelos en Kubernetes?

Respuesta: Kubernetes orquesta servicios de ML en contenedores.

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
      - name: ml-model
        image: ml-model:v1
        ports:
        - containerPort: 5000
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        env:
        - name: MODEL_PATH
          value: "/models/model.pkl"
        livenessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 5000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model
  ports:
  - protocol: TCP
    port: 80
    targetPort: 5000
  type: LoadBalancer
---
# Autoescalador Horizontal de Pods
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-model-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-model-deployment
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
# app.py con comprobaciones de salud
from flask import Flask, request, jsonify
import joblib
import logging

app = Flask(__name__)
model = None

@app.route('/health')
def health():
    """Prueba de vivacidad"""
    return jsonify({'status': 'healthy'}), 200

@app.route('/ready')
def ready():
    """Prueba de preparación"""
    if model is not None:
        return jsonify({'status': 'ready'}), 200
    return jsonify({'status': 'not ready'}), 503

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.get_json()
        prediction = model.predict([data['features']])
        return jsonify({'prediction': int(prediction[0])})
    except Exception as e:
        logging.error(f"Error de predicción: {e}")
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    model = joblib.load('/models/model.pkl')
    app.run(host='0.0.0.0', port=5000)

Rareza: Común Dificultad: Difícil


9. ¿Qué es la deriva del modelo y cómo la detecta?

Respuesta: La deriva del modelo ocurre cuando el rendimiento del modelo se degrada con el tiempo.

  • Tipos:
    • Deriva de Datos: Cambia la distribución de entrada
    • Deriva de Concepto: Cambia la relación entre X e y
  • Detección:
    • Pruebas estadísticas (prueba KS, PSI)
    • Monitorización del rendimiento
    • Comparación de la distribución
import numpy as np
from scipy import stats
from sklearn.metrics import accuracy_score

class DriftDetector:
    def __init__(self, reference_data, threshold=0.05):
        self.reference_data = reference_data
        self.threshold = threshold
        self.reference_mean = np.mean(reference_data, axis=0)
        self.reference_std = np.std(reference_data, axis=0)
    
    def detect_data_drift(self, new_data):
        """Prueba de Kolmogorov-Smirnov para cada característica"""
        drift_detected = []
        
        for i in range(new_data.shape[1]):
            statistic, p_value = stats.ks_2samp(
                self.reference_data[:, i],
                new_data[:, i]
            )
            
            if p_value < self.threshold:
                drift_detected.append({
                    'feature': i,
                    'p_value': p_value,
                    'statistic': statistic
                })
        
        return drift_detected
    
    def calculate_psi(self, expected, actual, buckets=10):
        """Índice de Estabilidad de la Población"""
        def scale_range(x, min_val, max_val):
            return (x - min_val) / (max_val - min_val)
        
        min_val = min(expected.min(), actual.min())
        max_val = max(expected.max(), actual.max())
        
        expected_scaled = scale_range(expected, min_val, max_val)
        actual_scaled = scale_range(actual, min_val, max_val)
        
        expected_percents = np.histogram(expected_scaled, bins=buckets)[0] / len(expected)
        actual_percents = np.histogram(actual_scaled, bins=buckets)[0] / len(actual)
        
        psi = np.sum((actual_percents - expected_percents) * 
                     np.log((actual_percents + 1e-10) / (expected_percents + 1e-10)))
        
        return psi

class PerformanceMonitor:
    def __init__(self, model, window_size=1000):
        self.model = model
        self.window_size = window_size
        self.predictions = []
        self.actuals = []
        self.accuracies = []
    
    def log_prediction(self, X, y_true):
        y_pred = self.model.predict(X)
        
        self.predictions.extend(y_pred)
        self.actuals.extend(y_true)
        
        # Calcular la precisión rodante
        if len(self.predictions) >= self.window_size:
            recent_preds = self.predictions[-self.window_size:]
            recent_actuals = self.actuals[-self.window_size:]
            accuracy = accuracy_score(recent_actuals, recent_preds)
            self.accuracies.append(accuracy)
            
            # Alertar si el rendimiento disminuye
            if len(self.accuracies) > 10:
                recent_avg = np.mean(self.accuracies[-10:])
                baseline_avg = np.mean(self.accuracies[:10])
                
                if recent_avg < baseline_avg * 0.9:  # Disminución del 10%
                    print(f"ALERTA: El rendimiento disminuyó de {baseline_avg:.3f} a {recent_avg:.3f}")
                    return True
        
        return False

# Uso
detector = DriftDetector(X_train)
drift = detector.detect_data_drift(X_production)

if drift:
    print(f"Se detectó deriva de datos en {len(drift)} características")
    # Desencadenar el reentrenamiento

Rareza: Común Dificultad: Difícil


10. ¿Cómo implementa las pruebas A/B para modelos de ML?

Respuesta: Las pruebas A/B comparan las versiones del modelo en producción.

import random
import hashlib
from datetime import datetime

class ABTestFramework:
    def __init__(self, model_a, model_b, traffic_split=0.5):
        self.model_a = model_a
        self.model_b = model_b
        self.traffic_split = traffic_split
        self.results_a = []
        self.results_b = []
    
    def get_variant(self, user_id):
        """Asignación consistente basada en user_id"""
        hash_value = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
        return 'A' if (hash_value % 100) < (self.traffic_split * 100) else 'B'
    
    def predict(self, user_id, features):
        variant = self.get_variant(user_id)
        
        if variant == 'A':
            prediction = self.model_a.predict([features])[0]
            model_version = 'A'
        else:
            prediction = self.model_b.predict([features])[0]
            model_version = 'B'
        
        # Registrar la predicción
        self.log_prediction(user_id, features, prediction, model_version)
        
        return prediction, model_version
    
    def log_prediction(self, user_id, features, prediction, variant):
        log_entry = {
            'timestamp': datetime.now(),
            'user_id': user_id,
            'variant': variant,
            'prediction': prediction
        }
        
        if variant == 'A':
            self.results_a.append(log_entry)
        else:
            self.results_b.append(log_entry)
    
    def log_outcome(self, user_id, actual_value):
        """Registrar el resultado real para el análisis"""
        # Encontrar la predicción en los registros y actualizar
        pass
    
    def analyze_results(self):
        """Análisis estadístico de la prueba A/B"""
        from scipy import stats
        
        # Calcular las tasas de conversión
        conversions_a = sum(1 for r in self.results_a if r.get('converted'))
        conversions_b = sum(1 for r in self.results_b if r.get('converted'))
        
        rate_a = conversions_a / len(self.results_a)
        rate_b = conversions_b / len(self.results_b)
        
        # Prueba de significación estadística
        statistic, p_value = stats.chi2_contingency([
            [conversions_a, len(self.results_a) - conversions_a],
            [conversions_b, len(self.results_b) - conversions_b]
        ])[:2]
        
        return {
            'variant_a_rate': rate_a,
            'variant_b_rate': rate_b,
            'lift': (rate_b - rate_a) / rate_a * 100,
            'p_value': p_value,
            'significant': p_value < 0.05
        }

# Uso
ab_test = ABTestFramework(model_v1, model_v2, traffic_split=0.5)

# Hacer predicciones
for user_id, features in requests:
    prediction, variant = ab_test.predict(user_id, features)
    
# Analizar después de recopilar datos
results = ab_test.analyze_results()
print(f"Elevación de la variante B: {results['lift']:.2f}%")
print(f"Estadíst
Newsletter subscription

Consejos de carrera semanales que realmente funcionan

Recibe las últimas ideas directamente en tu bandeja de entrada

Tu Próxima Entrevista Está a Solo un Currículum de Distancia

Crea un currículum profesional y optimizado en minutos. No se necesitan habilidades de diseño, solo resultados comprobados.

Crea mi currículum

Compartir esta publicación

Consigue Empleo 50% Más Rápido

Los buscadores de empleo que usan currículums profesionales mejorados con IA consiguen puestos en un promedio de 5 semanas en comparación con las 10 estándar. Deja de esperar y empieza a entrevistar.