diciembre 21, 2025
14 min de lectura

Preguntas para Entrevistas de Ingeniero Senior de Machine Learning: Guía Completa

interview
career-advice
job-search
Preguntas para Entrevistas de Ingeniero Senior de Machine Learning: Guía Completa
MB

Milad Bonakdar

Autor

Domina la ingeniería de ML avanzada con preguntas esenciales para entrevistas que cubren el entrenamiento distribuido, la optimización de modelos, MLOps, el diseño de sistemas y el ML de producción a escala para ingenieros senior de machine learning.


Introducción

Los ingenieros sénior de Machine Learning diseñan y escalan sistemas de ML en producción, optimizan el rendimiento de los modelos, construyen una infraestructura de MLOps robusta y lideran iniciativas técnicas. Este rol exige experiencia en sistemas distribuidos, técnicas de optimización avanzadas, MLOps y la capacidad de resolver desafíos de ingeniería complejos.

Esta guía completa cubre preguntas esenciales de entrevista para Ingenieros Sénior de Machine Learning, abarcando entrenamiento distribuido, optimización de modelos, infraestructura de MLOps, diseño de sistemas, ingeniería de características a escala y mejores prácticas de producción. Cada pregunta incluye respuestas detalladas, evaluación de rareza y calificaciones de dificultad.


Entrenamiento Distribuido y Escalabilidad (5 Preguntas)

1. ¿Cómo implementa el entrenamiento distribuido para modelos de aprendizaje profundo?

Respuesta: El entrenamiento distribuido paraleliza la computación a través de múltiples GPUs/máquinas.

  • Estrategias:
    • Paralelismo de Datos: Mismo modelo, diferentes lotes de datos
    • Paralelismo de Modelo: Divide el modelo entre dispositivos
    • Paralelismo de Pipeline: Divide el modelo en etapas
  • Frameworks: PyTorch DDP, Horovod, TensorFlow MirroredStrategy
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

# Inicializar el entrenamiento distribuido
def setup_distributed(rank, world_size):
    dist.init_process_group(
        backend='nccl',  # Usar 'gloo' para CPU
        init_method='env://',
        world_size=world_size,
        rank=rank
    )

# Configuración del modelo
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(784, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )
    
    def forward(self, x):
        return self.layers(x)

def train_distributed(rank, world_size):
    setup_distributed(rank, world_size)
    
    # Crear modelo y mover a la GPU
    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])
    
    # Crear muestreador distribuido
    train_dataset = MyDataset()
    train_sampler = DistributedSampler(
        train_dataset,
        num_replicas=world_size,
        rank=rank
    )
    
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=32,
        sampler=train_sampler
    )
    
    optimizer = torch.optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(10):
        train_sampler.set_epoch(epoch)  # Mezclar de manera diferente cada época
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(rank), target.to(rank)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            if rank == 0 and batch_idx % 100 == 0:
                print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
    
    dist.destroy_process_group()

# Lanzar con torch.multiprocessing
import torch.multiprocessing as mp

if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    mp.spawn(train_distributed, args=(world_size,), nprocs=world_size)

# Entrenamiento distribuido en TensorFlow
import tensorflow as tf

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(512, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    
    model.compile(
        optimizer='adam',
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )

# model.fit distribuirá automáticamente el entrenamiento
# model.fit(train_dataset, epochs=10)

Rareza: Común Dificultad: Difícil


2. Explique la acumulación de gradientes y cuándo usarla.

Respuesta: La acumulación de gradientes simula tamaños de lote más grandes cuando la memoria de la GPU es limitada.

  • Cómo funciona: Acumula gradientes durante múltiples pases hacia adelante antes de actualizar los pesos.
  • Casos de uso: Modelos grandes, memoria de GPU limitada, entrenamiento estable.
import torch
import torch.nn as nn

model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

# Tamaño de lote efectivo = batch_size * accumulation_steps
batch_size = 8
accumulation_steps = 4  # Tamaño de lote efectivo = 32

model.train()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    # Pase hacia adelante
    output = model(data)
    loss = criterion(output, target)
    
    # Normalizar la pérdida por los pasos de acumulación
    loss = loss / accumulation_steps
    
    # Pase hacia atrás (acumular gradientes)
    loss.backward()
    
    # Actualizar los pesos cada accumulation_steps
    if (batch_idx + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

# Entrenamiento con precisión mixta con acumulación de gradientes
from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    with autocast():
        output = model(data)
        loss = criterion(output, target) / accumulation_steps
    
    scaler.scale(loss).backward()
    
    if (batch_idx + 1) % accumulation_steps == 0:
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

Rareza: Común Dificultad: Media


3. ¿Cómo optimiza la latencia de inferencia del modelo?

Respuesta: Múltiples técnicas reducen el tiempo de inferencia:

  • Optimización del Modelo:
    • Cuantización (INT8, FP16)
    • Poda (eliminar pesos)
    • Destilación del conocimiento
    • Compilación del modelo (TorchScript, ONNX)
  • Optimización del Servidor:
    • Lotes (Batching)
    • Almacenamiento en caché (Caching)
    • Paralelismo del modelo
    • Aceleración por hardware (GPU, TPU)
import torch
import torch.nn as nn

# 1. Cuantización (INT8)
model = MyModel()
model.eval()

# Cuantización dinámica
quantized_model = torch.quantization.quantize_dynamic(
    model, {nn.Linear}, dtype=torch.qint8
)

# Cuantización estática (más precisa)
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# Calibrar con datos representativos
# for data in calibration_loader:
#     model(data)
torch.quantization.convert(model, inplace=True)

# 2. Compilación de TorchScript
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')

# 3. Exportación a ONNX
dummy_input = torch.randn(1, 784)
torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}}
)

# 4. Optimización de TensorRT (NVIDIA)
import tensorrt as trt

# 5. Poda
import torch.nn.utils.prune as prune

# Poda del 30% de los pesos en la capa lineal
prune.l1_unstructured(model.fc1, name='weight', amount=0.3)

# Hacer la poda permanente
prune.remove(model.fc1, 'weight')

# 6. Destilación del conocimiento
class DistillationLoss(nn.Module):
    def __init__(self, temperature=3.0):
        super().__init__()
        self.temperature = temperature
        self.kl_div = nn.KLDivLoss(reduction='batchmean')
    
    def forward(self, student_logits, teacher_logits, labels):
        # Objetivos blandos del profesor
        soft_loss = self.kl_div(
            F.log_softmax(student_logits / self.temperature, dim=1),
            F.softmax(teacher_logits / self.temperature, dim=1)
        ) * (self.temperature ** 2)
        
        # Objetivos duros
        hard_loss = F.cross_entropy(student_logits, labels)
        
        return 0.5 * soft_loss + 0.5 * hard_loss

# 7. Lotes para la inferencia
class BatchPredictor:
    def __init__(self, model, max_batch_size=32, max_wait_time=0.1):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.queue = []
    
    async def predict(self, x):
        self.queue.append(x)
        
        if len(self.queue) >= self.max_batch_size:
            return await self._process_batch()
        
        # Esperar más solicitudes o timeout
        await asyncio.sleep(self.max_wait_time)
        return await self._process_batch()
    
    async def _process_batch(self):
        batch = torch.stack(self.queue)
        self.queue = []
        return self.model(batch)

Rareza: Muy Común Dificultad: Difícil


4. ¿Qué es el entrenamiento de precisión mixta y cómo funciona?

Respuesta: La precisión mixta utiliza FP16 y FP32 para acelerar el entrenamiento manteniendo la precisión.

  • Beneficios:
    • Entrenamiento 2-3 veces más rápido
    • Reducción del uso de memoria
    • Tamaños de lote más grandes
  • Desafíos:
    • Estabilidad numérica
    • Subdesbordamiento del gradiente
  • Solución: Escala del gradiente
import torch
from torch.cuda.amp import autocast, GradScaler

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()

for epoch in range(10):
    for data, target in train_loader:
        data, target = data.cuda(), target.cuda()
        
        optimizer.zero_grad()
        
        # Pase hacia adelante en FP16
        with autocast():
            output = model(data)
            loss = criterion(output, target)
        
        # Pase hacia atrás con escala del gradiente
        scaler.scale(loss).backward()
        
        # Desescalar los gradientes y recortar
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # Actualizar los pesos
        scaler.step(optimizer)
        scaler.update()

# Precisión mixta en TensorFlow
from tensorflow.keras import mixed_precision

policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

# El modelo usa automáticamente FP16 para la computación
model = tf.keras.Sequential([
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(10)
])

# Escala de la pérdida manejada automáticamente
optimizer = tf.keras.optimizers.Adam()
optimizer = mixed_precision.LossScaleOptimizer(optimizer)

Rareza: Común Dificultad: Media


5. ¿Cómo maneja los cuellos de botella en la canalización de datos?

Respuesta: La carga de datos a menudo crea cuellos de botella en el entrenamiento. Optimice con:

  • Prefetching (Precarga): Cargar el siguiente lote mientras se entrena.
  • Carga paralela: Múltiples trabajadores
  • Caching (Caché): Almacenar datos preprocesados
  • Formato de datos: Utilizar formatos eficientes (TFRecord, Parquet)
import torch
from torch.utils.data import DataLoader, Dataset
import multiprocessing as mp

# Configuración eficiente de DataLoader
train_loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=mp.cpu_count(),  # Carga paralela
    pin_memory=True,  # Transferencia más rápida a la GPU
    prefetch_factor=2,  # Precargar lotes
    persistent_workers=True  # Mantener los trabajadores activos
)

# Dataset personalizado con caché
class CachedDataset(Dataset):
    def __init__(self, data_path, cache_size=1000):
        self.data_path = data_path
        self.cache = {}
        self.cache_size = cache_size
    
    def __getitem__(self, idx):
        if idx in self.cache:
            return self.cache[idx]
        
        # Cargar y preprocesar
        data = load_and_preprocess(self.data_path, idx)
        
        # Almacenar en caché si hay espacio disponible
        if len(self.cache) < self.cache_size:
            self.cache[idx] = data
        
        return data

# Optimización de la canalización de datos en TensorFlow
import tensorflow as tf

dataset = tf.data.Dataset.from_tensor_slices((X, y))

dataset = dataset.cache()  # Almacenar en caché en la memoria
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)  # Precarga automática
dataset = dataset.map(
    preprocess_function,
    num_parallel_calls=tf.data.AUTOTUNE
)

# Usar TFRecord para conjuntos de datos grandes
def create_tfrecord(data, labels, filename):
    with tf.io.TFRecordWriter(filename) as writer:
        for x, y in zip(data, labels):
            example = tf.train.Example(features=tf.train.Features(feature={
                'data': tf.train.Feature(float_list=tf.train.FloatList(value=x)),
                'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[y]))
            }))
            writer.write(example.SerializeToString())

Rareza: Común Dificultad: Media


MLOps e Infraestructura (5 Preguntas)

6. ¿Cómo diseña un almacén de características (Feature Store)?

Respuesta: Los almacenes de características centralizan la ingeniería y el servicio de características.

Loading diagram...
  • Componentes:
    • Almacén Offline: Características históricas para el entrenamiento (S3, BigQuery)
    • Almacén Online: Características de baja latencia para el servicio (Redis, DynamoDB)
    • Registro de Características: Metadatos y linaje
  • Beneficios:
    • Reutilización
    • Consistencia (entrenamiento/servicio)
    • Monitorización
# Ejemplo con Feast (almacén de características de código abierto)
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.value_type import ValueType
from datetime import timedelta

# Definir entidad
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="ID de Usuario"
)

# Definir vista de características
user_features = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="age", dtype=ValueType.INT64),
        Feature(name="total_purchases", dtype=ValueType.FLOAT),
        Feature(name="avg_purchase_value", dtype=ValueType.FLOAT),
    ],
    online=True,
    batch_source=FileSource(
        path="data/user_features.parquet",
        event_timestamp_column="timestamp"
    )
)

# Inicializar el almacén de características
fs = FeatureStore(repo_path=".")

# Obtener características para el entrenamiento (offline)
training_df = fs.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:age",
        "user_features:total_purchases",
        "user_features:avg_purchase_value"
    ]
).to_df()

# Obtener características para el servicio (online)
online_features = fs.get_online_features(
    features=[
        "user_features:age",
        "user_features:total_purchases"
    ],
    entity_rows=[{"user_id": 123}]
).to_dict()

# Implementación personalizada de un almacén de características
class SimpleFeatureStore:
    def __init__(self, redis_client, s3_client):
        self.redis = redis_client  # Almacén online
        self.s3 = s3_client  # Almacén offline
    
    def get_online_features(self, entity_id, feature_names):
        features = {}
        for feature in feature_names:
            key = f"{entity_id}:{feature}"
            features[feature] = self.redis.get(key)
        return features
    
    def write_features(self, entity_id, features):
        # Escribir en el almacén online
        for feature_name, value in features.items():
            key = f"{entity_id}:{feature_name}"
            self.redis.set(key, value, ex=86400)  # TTL de 24h
        
        # Escribir en el almacén offline para el entrenamiento
        self.s3.put_object(
            Bucket='features',
            Key=f'{entity_id}/features.json',
            Body=json.dumps(features)
        )

Rareza: Media Dificultad: Difícil


7. ¿Cómo implementa el versionado de modelos y el seguimiento de experimentos?

Respuesta: Realice un seguimiento de los experimentos para reproducir los resultados y comparar los modelos.

# MLflow para el seguimiento de experimentos
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

# Establecer experimento
mlflow.set_experiment("model_comparison")

# Realizar un seguimiento del experimento
with mlflow.start_run(run_name="random_forest_v1"):
    # Registrar parámetros
    params = {
        'n_estimators': 100,
        'max_depth': 10,
        'min_samples_split': 2
    }
    mlflow.log_params(params)
    
    # Entrenar el modelo
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    
    # Registrar métricas
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)
    mlflow.log_metrics({
        'train_accuracy': train_score,
        'test_accuracy': test_score
    })
    
    # Registrar el modelo
    mlflow.sklearn.log_model(model, "model")
    
    # Registrar artefactos
    mlflow.log_artifact("feature_importance.png")
    
    # Etiquetar la ejecución
    mlflow.set_tags({
        'model_type': 'random_forest',
        'dataset_version': 'v2.0'
    })

# Cargar el mejor modelo
best_run = mlflow.search_runs(
    experiment_ids=['1'],
    order_by=['metrics.test_accuracy DESC'],
    max_results=1
)

model_uri = f"runs:/{best_run.iloc[0].run_id}/model"
loaded_model = mlflow.sklearn.load_model(model_uri)

# Alternativa con Weights & Biases
import wandb

wandb.init(project="ml-project", name="experiment-1")

# Registrar los hiperparámetros
wandb.config.update({
    'learning_rate': 0.001,
    'batch_size': 32,
    'epochs': 10
})

# Registrar métricas durante el entrenamiento
for epoch in range(10):
    # Código de entrenamiento
    wandb.log({
        'epoch': epoch,
        'train_loss': train_loss,
        'val_loss': val_loss,
        'accuracy': accuracy
    })

# Registrar el modelo
wandb.save('model.h5')

# DVC para el versionado de datos y modelos
"""
# Inicializar DVC
dvc init

# Realizar un seguimiento de los datos
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Añadir datos de entrenamiento"

# Realizar un seguimiento del modelo
dvc add models/model.pkl
git add models/model.pkl.dvc
git commit -m "Añadir modelo entrenado v1"

# Enviar al almacenamiento remoto
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push
"""

Rareza: Muy Común Dificultad: Media


8. ¿Cómo implementa el despliegue de modelos en Kubernetes?

Respuesta: Kubernetes orquesta servicios de ML en contenedores.

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
      - name: ml-model
        image: ml-model:v1
        ports:
        - containerPort: 5000
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        env:
        - name: MODEL_PATH
          value: "/models/model.pkl"
        livenessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 5000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model
  ports:
  - protocol: TCP
    port: 80
    targetPort: 5000
  type: LoadBalancer
---
# Autoescalador Horizontal de Pods
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-model-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-model-deployment
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
# app.py con comprobaciones de salud
from flask import Flask, request, jsonify
import joblib
import logging

app = Flask(__name__)
model = None

@app.route('/health')
def health():
    """Prueba de vivacidad"""
    return jsonify({'status': 'healthy'}), 200

@app.route('/ready')
def ready():
    """Prueba de preparación"""
    if model is not None:
        return jsonify({'status': 'ready'}), 200
    return jsonify({'status': 'not ready'}), 503

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.get_json()
        prediction = model.predict([data['features']])
        return jsonify({'prediction': int(prediction[0])})
    except Exception as e:
        logging.error(f"Error de predicción: {e}")
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    model = joblib.load('/models/model.pkl')
    app.run(host='0.0.0.0', port=5000)

Rareza: Común Dificultad: Difícil


9. ¿Qué es la deriva del modelo y cómo la detecta?

Respuesta: La deriva del modelo ocurre cuando el rendimiento del modelo se degrada con el tiempo.

  • Tipos:
    • Deriva de Datos: Cambia la distribución de entrada
    • Deriva de Concepto: Cambia la relación entre X e y
  • Detección:
    • Pruebas estadísticas (prueba KS, PSI)
    • Monitorización del rendimiento
    • Comparación de la distribución
import numpy as np
from scipy import stats
from sklearn.metrics import accuracy_score

class DriftDetector:
    def __init__(self, reference_data, threshold=0.05):
        self.reference_data = reference_data
        self.threshold = threshold
        self.reference_mean = np.mean(reference_data, axis=0)
        self.reference_std = np.std(reference_data, axis=0)
    
    def detect_data_drift(self, new_data):
        """Prueba de Kolmogorov-Smirnov para cada característica"""
        drift_detected = []
        
        for i in range(new_data.shape[1]):
            statistic, p_value = stats.ks_2samp(
                self.reference_data[:, i],
                new_data[:, i]
            )
            
            if p_value < self.threshold:
                drift_detected.append({
                    'feature': i,
                    'p_value': p_value,
                    'statistic': statistic
                })
        
        return drift_detected
    
    def calculate_psi(self, expected, actual, buckets=10):
        """Índice de Estabilidad de la Población"""
        def scale_range(x, min_val, max_val):
            return (x - min_val) / (max_val - min_val)
        
        min_val = min(expected.min(), actual.min())
        max_val = max(expected.max(), actual.max())
        
        expected_scaled = scale_range(expected, min_val, max_val)
        actual_scaled = scale_range(actual, min_val, max_val)
        
        expected_percents = np.histogram(expected_scaled, bins=buckets)[0] / len(expected)
        actual_percents = np.histogram(actual_scaled, bins=buckets)[0] / len(actual)
        
        psi = np.sum((actual_percents - expected_percents) * 
                     np.log((actual_percents + 1e-10) / (expected_percents + 1e-10)))
        
        return psi

class PerformanceMonitor:
    def __init__(self, model, window_size=1000):
        self.model = model
        self.window_size = window_size
        self.predictions = []
        self.actuals = []
        self.accuracies = []
    
    def log_prediction(self, X, y_true):
        y_pred = self.model.predict(X)
        
        self.predictions.extend(y_pred)
        self.actuals.extend(y_true)
        
        # Calcular la precisión rodante
        if len(self.predictions) >= self.window_size:
            recent_preds = self.predictions[-self.window_size:]
            recent_actuals = self.actuals[-self.window_size:]
            accuracy = accuracy_score(recent_actuals, recent_preds)
            self.accuracies.append(accuracy)
            
            # Alertar si el rendimiento disminuye
            if len(self.accuracies) > 10:
                recent_avg = np.mean(self.accuracies[-10:])
                baseline_avg = np.mean(self.accuracies[:10])
                
                if recent_avg < baseline_avg * 0.9:  # Disminución del 10%
                    print(f"ALERTA: El rendimiento disminuyó de {baseline_avg:.3f} a {recent_avg:.3f}")
                    return True
        
        return False

# Uso
detector = DriftDetector(X_train)
drift = detector.detect_data_drift(X_production)

if drift:
    print(f"Se detectó deriva de datos en {len(drift)} características")
    # Desencadenar el reentrenamiento

Rareza: Común Dificultad: Difícil


10. ¿Cómo implementa las pruebas A/B para modelos de ML?

Respuesta: Las pruebas A/B comparan las versiones del modelo en producción.

import random
import hashlib
from datetime import datetime

class ABTestFramework:
    def __init__(self, model_a, model_b, traffic_split=0.5):
        self.model_a = model_a
        self.model_b = model_b
        self.traffic_split = traffic_split
        self.results_a = []
        self.results_b = []
    
    def get_variant(self, user_id):
        """Asignación consistente basada en user_id"""
        hash_value = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
        return 'A' if (hash_value % 100) < (self.traffic_split * 100) else 'B'
    
    def predict(self, user_id, features):
        variant = self.get_variant(user_id)
        
        if variant == 'A':
            prediction = self.model_a.predict([features])[0]
            model_version = 'A'
        else:
            prediction = self.model_b.predict([features])[0]
            model_version = 'B'
        
        # Registrar la predicción
        self.log_prediction(user_id, features, prediction, model_version)
        
        return prediction, model_version
    
    def log_prediction(self, user_id, features, prediction, variant):
        log_entry = {
            'timestamp': datetime.now(),
            'user_id': user_id,
            'variant': variant,
            'prediction': prediction
        }
        
        if variant == 'A':
            self.results_a.append(log_entry)
        else:
            self.results_b.append(log_entry)
    
    def log_outcome(self, user_id, actual_value):
        """Registrar el resultado real para el análisis"""
        # Encontrar la predicción en los registros y actualizar
        pass
    
    def analyze_results(self):
        """Análisis estadístico de la prueba A/B"""
        from scipy import stats
        
        # Calcular las tasas de conversión
        conversions_a = sum(1 for r in self.results_a if r.get('converted'))
        conversions_b = sum(1 for r in self.results_b if r.get('converted'))
        
        rate_a = conversions_a / len(self.results_a)
        rate_b = conversions_b / len(self.results_b)
        
        # Prueba de significación estadística
        statistic, p_value = stats.chi2_contingency([
            [conversions_a, len(self.results_a) - conversions_a],
            [conversions_b, len(self.results_b) - conversions_b]
        ])[:2]
        
        return {
            'variant_a_rate': rate_a,
            'variant_b_rate': rate_b,
            'lift': (rate_b - rate_a) / rate_a * 100,
            'p_value': p_value,
            'significant': p_value < 0.05
        }

# Uso
ab_test = ABTestFramework(model_v1, model_v2, traffic_split=0.5)

# Hacer predicciones
for user_id, features in requests:
    prediction, variant = ab_test.predict(user_id, features)
    
# Analizar después de recopilar datos
results = ab_test.analyze_results()
print(f"Elevación de la variante B: {results['lift']:.2f}%")
print(f"Estadíst
Newsletter subscription

Consejos de carrera semanales que realmente funcionan

Recibe las últimas ideas directamente en tu bandeja de entrada

Decorative doodle

Tu Próxima Entrevista Está a Solo un Currículum de Distancia

Crea un currículum profesional y optimizado en minutos. No se necesitan habilidades de diseño, solo resultados comprobados.

Crea mi currículum

Compartir esta publicación

Consigue Empleo 50% Más Rápido

Los buscadores de empleo que usan currículums profesionales mejorados con IA consiguen puestos en un promedio de 5 semanas en comparación con las 10 estándar. Deja de esperar y empieza a entrevistar.