dicembre 21, 2025
14 min di lettura

Domande di colloquio per Senior Machine Learning Engineer: Guida Completa

interview
career-advice
job-search
Domande di colloquio per Senior Machine Learning Engineer: Guida Completa
Milad Bonakdar

Milad Bonakdar

Autore

Padroneggia l'ingegneria ML avanzata con domande di colloquio essenziali che coprono il training distribuito, l'ottimizzazione del modello, MLOps, la progettazione del sistema e l'ML di produzione su larga scala per gli ingegneri senior di machine learning.


Introduzione

I Senior Machine Learning Engineer progettano e scalano sistemi di ML in produzione, ottimizzano le prestazioni dei modelli, costruiscono infrastrutture di ML robuste e guidano iniziative tecniche. Questo ruolo richiede competenze in sistemi distribuiti, tecniche di ottimizzazione avanzate, MLOps e la capacità di risolvere sfide ingegneristiche complesse.

Questa guida completa copre le domande essenziali per i colloqui di Senior Machine Learning Engineer, spaziando dall'addestramento distribuito, all'ottimizzazione dei modelli, all'infrastruttura MLOps, alla progettazione del sistema, all'ingegneria delle feature su larga scala e alle migliori pratiche di produzione. Ogni domanda include risposte dettagliate, valutazione della rarità e valutazioni della difficoltà.


Addestramento Distribuito & Scalabilità (5 Domande)

1. Come implementi l'addestramento distribuito per modelli di deep learning?

Risposta: L'addestramento distribuito parallelizza il calcolo su più GPU/macchine.

  • Strategie:
    • Parallelismo dei Dati: Stesso modello, diversi batch di dati
    • Parallelismo del Modello: Modello diviso tra i dispositivi
    • Parallelismo della Pipeline: Modello diviso in fasi
  • Framework: PyTorch DDP, Horovod, TensorFlow MirroredStrategy
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

# Inizializza l'addestramento distribuito
def setup_distributed(rank, world_size):
    dist.init_process_group(
        backend='nccl',  # Usa 'gloo' per CPU
        init_method='env://',
        world_size=world_size,
        rank=rank
    )

# Configurazione del modello
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(784, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )
    
    def forward(self, x):
        return self.layers(x)

def train_distributed(rank, world_size):
    setup_distributed(rank, world_size)
    
    # Crea il modello e spostalo sulla GPU
    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])
    
    # Crea il sampler distribuito
    train_dataset = MyDataset()
    train_sampler = DistributedSampler(
        train_dataset,
        num_replicas=world_size,
        rank=rank
    )
    
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=32,
        sampler=train_sampler
    )
    
    optimizer = torch.optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(10):
        train_sampler.set_epoch(epoch)  # Mescola in modo diverso ogni epoca
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(rank), target.to(rank)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            if rank == 0 and batch_idx % 100 == 0:
                print(f'Epoca {epoch}, Batch {batch_idx}, Perdita: {loss.item():.4f}')
    
    dist.destroy_process_group()

# Avvia con torch.multiprocessing
import torch.multiprocessing as mp

if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    mp.spawn(train_distributed, args=(world_size,), nprocs=world_size)

# Addestramento distribuito con TensorFlow
import tensorflow as tf

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(512, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    
    model.compile(
        optimizer='adam',
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )

# model.fit distribuirà automaticamente l'addestramento
# model.fit(train_dataset, epochs=10)

Rarità: Comune Difficoltà: Difficile


2. Spiega l'accumulo del gradiente e quando usarlo.

Risposta: L'accumulo del gradiente simula batch size più grandi quando la memoria della GPU è limitata.

  • Come funziona: Accumula i gradienti su più passaggi in avanti prima di aggiornare i pesi
  • Casi d'uso: Modelli grandi, memoria GPU limitata, addestramento stabile
import torch
import torch.nn as nn

model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

# Batch size effettivo = batch_size * accumulation_steps
batch_size = 8
accumulation_steps = 4  # Batch size effettivo = 32

model.train()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    # Passaggio in avanti
    output = model(data)
    loss = criterion(output, target)
    
    # Normalizza la perdita per i passi di accumulo
    loss = loss / accumulation_steps
    
    # Passaggio all'indietro (accumula i gradienti)
    loss.backward()
    
    # Aggiorna i pesi ogni accumulation_steps
    if (batch_idx + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

# Addestramento a precisione mista con accumulo del gradiente
from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    with autocast():
        output = model(data)
        loss = criterion(output, target) / accumulation_steps
    
    scaler.scale(loss).backward()
    
    if (batch_idx + 1) % accumulation_steps == 0:
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

Rarità: Comune Difficoltà: Media


3. Come ottimizzi la latenza dell'inferenza del modello?

Risposta: Molteplici tecniche riducono il tempo di inferenza:

  • Ottimizzazione del Modello:
    • Quantizzazione (INT8, FP16)
    • Pruning (rimuovi pesi)
    • Distillazione della conoscenza
    • Compilazione del modello (TorchScript, ONNX)
  • Ottimizzazione del Servizio:
    • Batching
    • Caching
    • Parallelismo del modello
    • Accelerazione hardware (GPU, TPU)
import torch
import torch.nn as nn

# 1. Quantizzazione (INT8)
model = MyModel()
model.eval()

# Quantizzazione dinamica
quantized_model = torch.quantization.quantize_dynamic(
    model, {nn.Linear}, dtype=torch.qint8
)

# Quantizzazione statica (più accurata)
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# Calibra con dati rappresentativi
# for data in calibration_loader:
#     model(data)
torch.quantization.convert(model, inplace=True)

# 2. Compilazione TorchScript
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')

# 3. Esportazione ONNX
dummy_input = torch.randn(1, 784)
torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}}
)

# 4. Ottimizzazione TensorRT (NVIDIA)
import tensorrt as trt

# 5. Pruning
import torch.nn.utils.prune as prune

# Pruna il 30% dei pesi nello strato lineare
prune.l1_unstructured(model.fc1, name='weight', amount=0.3)

# Rendi permanente il pruning
prune.remove(model.fc1, 'weight')

# 6. Distillazione della conoscenza
class DistillationLoss(nn.Module):
    def __init__(self, temperature=3.0):
        super().__init__()
        self.temperature = temperature
        self.kl_div = nn.KLDivLoss(reduction='batchmean')
    
    def forward(self, student_logits, teacher_logits, labels):
        # Soft targets dal teacher
        soft_loss = self.kl_div(
            F.log_softmax(student_logits / self.temperature, dim=1),
            F.softmax(teacher_logits / self.temperature, dim=1)
        ) * (self.temperature ** 2)
        
        # Hard targets
        hard_loss = F.cross_entropy(student_logits, labels)
        
        return 0.5 * soft_loss + 0.5 * hard_loss

# 7. Batching per l'inferenza
class BatchPredictor:
    def __init__(self, model, max_batch_size=32, max_wait_time=0.1):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.queue = []
    
    async def predict(self, x):
        self.queue.append(x)
        
        if len(self.queue) >= self.max_batch_size:
            return await self._process_batch()
        
        # Attendi altre richieste o timeout
        await asyncio.sleep(self.max_wait_time)
        return await self._process_batch()
    
    async def _process_batch(self):
        batch = torch.stack(self.queue)
        self.queue = []
        return self.model(batch)

Rarità: Molto Comune Difficoltà: Difficile


4. Cos'è l'addestramento a precisione mista e come funziona?

Risposta: La precisione mista utilizza FP16 e FP32 per accelerare l'addestramento mantenendo l'accuratezza.

  • Vantaggi:
    • Addestramento 2-3 volte più veloce
    • Uso ridotto della memoria
    • Batch size più grandi
  • Sfide:
    • Stabilità numerica
    • Underflow del gradiente
  • Soluzione: Scaling del gradiente
import torch
from torch.cuda.amp import autocast, GradScaler

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()

for epoch in range(10):
    for data, target in train_loader:
        data, target = data.cuda(), target.cuda()
        
        optimizer.zero_grad()
        
        # Passaggio in avanti in FP16
        with autocast():
            output = model(data)
            loss = criterion(output, target)
        
        # Passaggio all'indietro con scaling del gradiente
        scaler.scale(loss).backward()
        
        # De-scala i gradienti e applica il clipping
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # Aggiorna i pesi
        scaler.step(optimizer)
        scaler.update()

# Precisione mista con TensorFlow
from tensorflow.keras import mixed_precision

policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

# Il modello utilizza automaticamente FP16 per il calcolo
model = tf.keras.Sequential([
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(10)
])

# Lo scaling della perdita è gestito automaticamente
optimizer = tf.keras.optimizers.Adam()
optimizer = mixed_precision.LossScaleOptimizer(optimizer)

Rarità: Comune Difficoltà: Media


5. Come gestisci i colli di bottiglia della pipeline di dati?

Risposta: Il caricamento dei dati spesso rappresenta un collo di bottiglia per l'addestramento. Ottimizza con:

  • Prefetching: Carica il batch successivo durante l'addestramento
  • Caricamento parallelo: Più worker
  • Caching: Memorizza i dati pre-elaborati
  • Formato dati: Usa formati efficienti (TFRecord, Parquet)
import torch
from torch.utils.data import DataLoader, Dataset
import multiprocessing as mp

# Configurazione efficiente del DataLoader
train_loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=mp.cpu_count(),  # Caricamento parallelo
    pin_memory=True,  # Trasferimento più veloce alla GPU
    prefetch_factor=2,  # Prefetching dei batch
    persistent_workers=True  # Mantieni i worker attivi
)

# Dataset personalizzato con caching
class CachedDataset(Dataset):
    def __init__(self, data_path, cache_size=1000):
        self.data_path = data_path
        self.cache = {}
        self.cache_size = cache_size
    
    def __getitem__(self, idx):
        if idx in self.cache:
            return self.cache[idx]
        
        # Carica e pre-elabora
        data = load_and_preprocess(self.data_path, idx)
        
        # Memorizza nella cache se c'è spazio disponibile
        if len(self.cache) < self.cache_size:
            self.cache[idx] = data
        
        return data

# Ottimizzazione della pipeline di dati con TensorFlow
import tensorflow as tf

dataset = tf.data.Dataset.from_tensor_slices((X, y))

dataset = dataset.cache()  # Memorizza nella cache in memoria
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)  # Prefetching automatico
dataset = dataset.map(
    preprocess_function,
    num_parallel_calls=tf.data.AUTOTUNE
)

# Usa TFRecord per dataset di grandi dimensioni
def create_tfrecord(data, labels, filename):
    with tf.io.TFRecordWriter(filename) as writer:
        for x, y in zip(data, labels):
            example = tf.train.Example(features=tf.train.Features(feature={
                'data': tf.train.Feature(float_list=tf.train.FloatList(value=x)),
                'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[y]))
            }))
            writer.write(example.SerializeToString())

Rarità: Comune Difficoltà: Media


MLOps & Infrastruttura (5 Domande)

6. Come progetti un feature store?

Risposta: I feature store centralizzano l'ingegneria delle feature e il serving.

Loading diagram...
  • Componenti:
    • Offline Store: Feature storiche per l'addestramento (S3, BigQuery)
    • Online Store: Feature a bassa latenza per il serving (Redis, DynamoDB)
    • Feature Registry: Metadati e lineage
  • Vantaggi:
    • Riutilizzabilità
    • Coerenza (addestramento/serving)
    • Monitoraggio
# Esempio con Feast (feature store open-source)
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.value_type import ValueType
from datetime import timedelta

# Definisci l'entità
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="ID Utente"
)

# Definisci la feature view
user_features = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="age", dtype=ValueType.INT64),
        Feature(name="total_purchases", dtype=ValueType.FLOAT),
        Feature(name="avg_purchase_value", dtype=ValueType.FLOAT),
    ],
    online=True,
    batch_source=FileSource(
        path="data/user_features.parquet",
        event_timestamp_column="timestamp"
    )
)

# Inizializza il feature store
fs = FeatureStore(repo_path=".")

# Ottieni le feature per l'addestramento (offline)
training_df = fs.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:age",
        "user_features:total_purchases",
        "user_features:avg_purchase_value"
    ]
).to_df()

# Ottieni le feature per il serving (online)
online_features = fs.get_online_features(
    features=[
        "user_features:age",
        "user_features:total_purchases"
    ],
    entity_rows=[{"user_id": 123}]
).to_dict()

# Implementazione personalizzata del feature store
class SimpleFeatureStore:
    def __init__(self, redis_client, s3_client):
        self.redis = redis_client  # Online store
        self.s3 = s3_client  # Offline store
    
    def get_online_features(self, entity_id, feature_names):
        features = {}
        for feature in feature_names:
            key = f"{entity_id}:{feature}"
            features[feature] = self.redis.get(key)
        return features
    
    def write_features(self, entity_id, features):
        # Scrivi nell'online store
        for feature_name, value in features.items():
            key = f"{entity_id}:{feature_name}"
            self.redis.set(key, value, ex=86400)  # TTL di 24h
        
        # Scrivi nell'offline store per l'addestramento
        self.s3.put_object(
            Bucket='features',
            Key=f'{entity_id}/features.json',
            Body=json.dumps(features)
        )

Rarità: Media Difficoltà: Difficile


7. Come implementi il versionamento del modello e il tracciamento degli esperimenti?

Risposta: Traccia gli esperimenti per riprodurre i risultati e confrontare i modelli.

# MLflow per il tracciamento degli esperimenti
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

# Imposta l'esperimento
mlflow.set_experiment("model_comparison")

# Traccia l'esperimento
with mlflow.start_run(run_name="random_forest_v1"):
    # Registra i parametri
    params = {
        'n_estimators': 100,
        'max_depth': 10,
        'min_samples_split': 2
    }
    mlflow.log_params(params)
    
    # Addestra il modello
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    
    # Registra le metriche
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)
    mlflow.log_metrics({
        'train_accuracy': train_score,
        'test_accuracy': test_score
    })
    
    # Registra il modello
    mlflow.sklearn.log_model(model, "model")
    
    # Registra gli artefatti
    mlflow.log_artifact("feature_importance.png")
    
    # Tagga l'esecuzione
    mlflow.set_tags({
        'model_type': 'random_forest',
        'dataset_version': 'v2.0'
    })

# Carica il modello migliore
best_run = mlflow.search_runs(
    experiment_ids=['1'],
    order_by=['metrics.test_accuracy DESC'],
    max_results=1
)

model_uri = f"runs:/{best_run.iloc[0].run_id}/model"
loaded_model = mlflow.sklearn.load_model(model_uri)

# Alternativa Weights & Biases
import wandb

wandb.init(project="ml-project", name="experiment-1")

# Registra gli iperparametri
wandb.config.update({
    'learning_rate': 0.001,
    'batch_size': 32,
    'epochs': 10
})

# Registra le metriche durante l'addestramento
for epoch in range(10):
    # Codice di addestramento
    wandb.log({
        'epoch': epoch,
        'train_loss': train_loss,
        'val_loss': val_loss,
        'accuracy': accuracy
    })

# Registra il modello
wandb.save('model.h5')

# DVC per il versionamento dei dati e del modello
"""
# Inizializza DVC
dvc init

# Traccia i dati
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Aggiungi dati di addestramento"

# Traccia il modello
dvc add models/model.pkl
git add models/model.pkl.dvc
git commit -m "Aggiungi modello addestrato v1"

# Esegui il push allo storage remoto
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push
"""

Rarità: Molto Comune Difficoltà: Media


8. Come distribuisci i modelli su Kubernetes?

Risposta: Kubernetes orchestra i servizi ML containerizzati.

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
      - name: ml-model
        image: ml-model:v1
        ports:
        - containerPort: 5000
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        env:
        - name: MODEL_PATH
          value: "/models/model.pkl"
        livenessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 5000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model
  ports:
  - protocol: TCP
    port: 80
    targetPort: 5000
  type: LoadBalancer
---
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-model-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-model-deployment
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
# app.py con controlli di integrità
from flask import Flask, request, jsonify
import joblib
import logging

app = Flask(__name__)
model = None

@app.route('/health')
def health():
    """Liveness probe"""
    return jsonify({'status': 'healthy'}), 200

@app.route('/ready')
def ready():
    """Readiness probe"""
    if model is not None:
        return jsonify({'status': 'ready'}), 200
    return jsonify({'status': 'not ready'}), 503

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.get_json()
        prediction = model.predict([data['features']])
        return jsonify({'prediction': int(prediction[0])})
    except Exception as e:
        logging.error(f"Prediction error: {e}")
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    model = joblib.load('/models/model.pkl')
    app.run(host='0.0.0.0', port=5000)

Rarità: Comune Difficoltà: Difficile


9. Cos'è il model drift e come lo rilevi?

Risposta: Il model drift si verifica quando le prestazioni del modello si degradano nel tempo.

  • Tipi:
    • Data Drift: Cambia la distribuzione degli input
    • Concept Drift: Cambia la relazione tra X e y
  • Rilevamento:
    • Test statistici (test KS, PSI)
    • Monitoraggio delle prestazioni
    • Confronto della distribuzione
import numpy as np
from scipy import stats
from sklearn.metrics import accuracy_score

class DriftDetector:
    def __init__(self, reference_data, threshold=0.05):
        self.reference_data = reference_data
        self.threshold = threshold
        self.reference_mean = np.mean(reference_data, axis=0)
        self.reference_std = np.std(reference_data, axis=0)
    
    def detect_data_drift(self, new_data):
        """Test di Kolmogorov-Smirnov per ogni feature"""
        drift_detected = []
        
        for i in range(new_data.shape[1]):
            statistic, p_value = stats.ks_2samp(
                self.reference_data[:, i],
                new_data[:, i]
            )
            
            if p_value < self.threshold:
                drift_detected.append({
                    'feature': i,
                    'p_value': p_value,
                    'statistic': statistic
                })
        
        return drift_detected
    
    def calculate_psi(self, expected, actual, buckets=10):
        """Population Stability Index"""
        def scale_range(x, min_val, max_val):
            return (x - min_val) / (max_val - min_val)
        
        min_val = min(expected.min(), actual.min())
        max_val = max(expected.max(), actual.max())
        
        expected_scaled = scale_range(expected, min_val, max_val)
        actual_scaled = scale_range(actual, min_val, max_val)
        
        expected_percents = np.histogram(expected_scaled, bins=buckets)[0] / len(expected)
        actual_percents = np.histogram(actual_scaled, bins=buckets)[0] / len(actual)
        
        psi = np.sum((actual_percents - expected_percents) * 
                     np.log((actual_percents + 1e-10) / (expected_percents + 1e-10)))
        
        return psi

class PerformanceMonitor:
    def __init__(self, model, window_size=1000):
        self.model = model
        self.window_size = window_size
        self.predictions = []
        self.actuals = []
        self.accuracies = []
    
    def log_prediction(self, X, y_true):
        y_pred = self.model.predict(X)
        
        self.predictions.extend(y_pred)
        self.actuals.extend(y_true)
        
        # Calcola l'accuratezza mobile
        if len(self.predictions) >= self.window_size:
            recent_preds = self.predictions[-self.window_size:]
            recent_actuals = self.actuals[-self.window_size:]
            accuracy = accuracy_score(recent_actuals, recent_preds)
            self.accuracies.append(accuracy)
            
            # Avvisa se le prestazioni calano
            if len(self.accuracies) > 10:
                recent_avg = np.mean(self.accuracies[-10:])
                baseline_avg = np.mean(self.accuracies[:10])
                
                if recent_avg < baseline_avg * 0.9:  # Calo del 10%
                    print(f"ALLARME: Le prestazioni sono calate da {baseline_avg:.3f} a {recent_avg:.3f}")
                    return True
        
        return False

# Utilizzo
detector = DriftDetector(X_train)
drift = detector.detect_data_drift(X_production)

if drift:
    print(f"Data drift rilevato in {len(drift)} feature")
    # Avvia il retraining

Rarità: Comune Difficoltà: Difficile


10. Come implementi l'A/B testing per i modelli ML?

Risposta: L'A/B testing confronta le versioni del modello in produzione.

import random
import hashlib
from datetime import datetime

class ABTestFramework:
    def __init__(self, model_a, model_b, traffic_split=0.5):
        self.model_a = model_a
        self.model_b = model_b
        self.traffic_split = traffic_split
        self.results_a = []
        self.results_b = []
    
    def get_variant(self, user_id):
        """Assegnazione coerente basata sull'user_id"""
        hash_value = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
        return 'A' if (hash_value % 100) < (self.traffic_split * 100) else 'B'
    
    def predict(self, user_id, features):
        variant = self.get_variant(user_id)
        
        if variant == 'A':
            prediction = self.model_a.predict([features])[0]
            model_version = 'A'
        else:
            prediction = self.model_b.predict([features])[0]
            model_version = 'B'
        
        # Registra la previsione
        self.log_prediction(user_id, features, prediction, model_version)
        
        return prediction, model_version
    
    def log_prediction(self, user_id, features, prediction, variant):
        log_entry = {
            'timestamp': datetime.now(),
            'user_id': user_id,
            'variant': variant,
            'prediction': prediction
        }
        
        if variant == 'A':
            self.results_a.append(log_entry)
        else:
            self.results_b.append(log_entry)
    
    def log_outcome(self, user_id, actual_value):
        """Registra il risultato effettivo per l'analisi"""
        # Trova la previsione nei log e aggiorna
        pass
    
    def analyze_results(self):
        """Analisi statistica dell'A/B test"""
        from scipy import stats
        
        # Calcola i tassi di conversione
        conversions_a = sum(1 for r in self.results_a if r.get('converted'))
        conversions_b = sum(1 for r in self.results_b if r.get('converted'))
        
        rate_a = conversions_a / len(self.results_a)
        rate_b = conversions_b / len(self.results_b)
        
        # Test di significatività statistica
        statistic, p_value = stats.chi2_contingency([
            [conversions_a, len(self.results_a) - conversions_a],
            [conversions_b, len(self.results_b) - conversions_b]
        ])[:2]
        
        return {
            'variant_a_rate': rate_a,
            'variant_b_rate': rate_b,
            'lift': (rate_b - rate_a) / rate_a * 100,
            'p_value': p_value,
            'significant': p_value < 0.05
        }

# Utilizzo
ab_test = ABTestFramework(model_v1, model_v2, traffic_split=0.5)

# Esegui le previsioni
for user_id, features in requests:
    prediction, variant = ab_test.predict(user_
Newsletter subscription

Consigli di carriera settimanali che funzionano davvero

Ricevi le ultime idee direttamente nella tua casella di posta

Distinguiti dai Reclutatori e Ottieni il Lavoro dei Tuoi Sogni

Unisciti a migliaia di persone che hanno trasformato la loro carriera con curriculum potenziati dall'IA che superano l'ATS e impressionano i responsabili delle assunzioni.

Inizia a creare ora

Condividi questo post

Vieni Assunto il 50% Più Velocemente

Le persone in cerca di lavoro che utilizzano curriculum professionali migliorati dall'IA trovano ruoli in una media di 5 settimane rispetto alle 10 standard. Smetti di aspettare e inizia a fare colloqui.