décembre 21, 2025
14 min de lecture

Questions d'entretien pour ingénieur senior en apprentissage automatique : Guide complet

interview
career-advice
job-search
Questions d'entretien pour ingénieur senior en apprentissage automatique : Guide complet
Milad Bonakdar

Milad Bonakdar

Auteur

Maîtrisez l'ingénierie ML avancée grâce à des questions d'entretien essentielles couvrant l'entraînement distribué, l'optimisation des modèles, le MLOps, la conception de systèmes et le ML en production à grande échelle pour les ingénieurs senior en apprentissage automatique.


Introduction

Les ingénieurs en apprentissage automatique senior conçoivent et mettent à l'échelle des systèmes d'apprentissage automatique en production, optimisent les performances des modèles, construisent une infrastructure d'apprentissage automatique robuste et dirigent des initiatives techniques. Ce rôle exige une expertise dans les systèmes distribués, les techniques d'optimisation avancées, le MLOps et la capacité à résoudre des défis d'ingénierie complexes.

Ce guide complet couvre les questions d'entretien essentielles pour les ingénieurs en apprentissage automatique senior, allant de l'entraînement distribué, l'optimisation des modèles, l'infrastructure MLOps, la conception de systèmes, l'ingénierie des caractéristiques à grande échelle et les meilleures pratiques de production. Chaque question comprend des réponses détaillées, une évaluation de la rareté et des niveaux de difficulté.


Entraînement distribué et évolutivité (5 questions)

1. Comment implémentez-vous l'entraînement distribué pour les modèles d'apprentissage profond ?

Réponse : L'entraînement distribué parallélise le calcul sur plusieurs GPU/machines.

  • Stratégies :
    • Parallélisme des données : Même modèle, différents lots de données
    • Parallélisme des modèles : Diviser le modèle entre les appareils
    • Parallélisme de pipeline : Diviser le modèle en étapes
  • Frameworks : PyTorch DDP, Horovod, TensorFlow MirroredStrategy
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

# Initialiser l'entraînement distribué
def setup_distributed(rank, world_size):
    dist.init_process_group(
        backend='nccl',  # Utiliser 'gloo' pour le CPU
        init_method='env://',
        world_size=world_size,
        rank=rank
    )

# Configuration du modèle
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(784, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )
    
    def forward(self, x):
        return self.layers(x)

def train_distributed(rank, world_size):
    setup_distributed(rank, world_size)
    
    # Créer le modèle et le déplacer vers le GPU
    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])
    
    # Créer un échantillonneur distribué
    train_dataset = MyDataset()
    train_sampler = DistributedSampler(
        train_dataset,
        num_replicas=world_size,
        rank=rank
    )
    
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=32,
        sampler=train_sampler
    )
    
    optimizer = torch.optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(10):
        train_sampler.set_epoch(epoch)  # Mélanger différemment à chaque époque
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(rank), target.to(rank)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            if rank == 0 and batch_idx % 100 == 0:
                print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
    
    dist.destroy_process_group()

# Lancer avec torch.multiprocessing
import torch.multiprocessing as mp

if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    mp.spawn(train_distributed, args=(world_size,), nprocs=world_size)

# Entraînement distribué TensorFlow
import tensorflow as tf

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(512, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    
    model.compile(
        optimizer='adam',
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )

# model.fit distribuera automatiquement l'entraînement
# model.fit(train_dataset, epochs=10)

Rareté : Courant Difficulté : Difficile


2. Expliquez l'accumulation de gradient et quand l'utiliser.

Réponse : L'accumulation de gradient simule des tailles de lots plus importantes lorsque la mémoire GPU est limitée.

  • Fonctionnement : Accumuler les gradients sur plusieurs passes avant de mettre à jour les poids
  • Cas d'utilisation : Grands modèles, mémoire GPU limitée, entraînement stable
import torch
import torch.nn as nn

model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

# Taille de lot effective = batch_size * accumulation_steps
batch_size = 8
accumulation_steps = 4  # Taille de lot effective = 32

model.train()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    # Passe avant
    output = model(data)
    loss = criterion(output, target)
    
    # Normaliser la perte par les étapes d'accumulation
    loss = loss / accumulation_steps
    
    # Passe arrière (accumuler les gradients)
    loss.backward()
    
    # Mettre à jour les poids tous les accumulation_steps
    if (batch_idx + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

# Entraînement en précision mixte avec accumulation de gradient
from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    with autocast():
        output = model(data)
        loss = criterion(output, target) / accumulation_steps
    
    scaler.scale(loss).backward()
    
    if (batch_idx + 1) % accumulation_steps == 0:
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

Rareté : Courant Difficulté : Moyen


3. Comment optimisez-vous la latence d'inférence du modèle ?

Réponse : Plusieurs techniques réduisent le temps d'inférence :

  • Optimisation du modèle :
    • Quantification (INT8, FP16)
    • Élagage (supprimer les poids)
    • Distillation des connaissances
    • Compilation du modèle (TorchScript, ONNX)
  • Optimisation du service :
    • Traitement par lots
    • Mise en cache
    • Parallélisme du modèle
    • Accélération matérielle (GPU, TPU)
import torch
import torch.nn as nn

# 1. Quantification (INT8)
model = MyModel()
model.eval()

# Quantification dynamique
quantized_model = torch.quantization.quantize_dynamic(
    model, {nn.Linear}, dtype=torch.qint8
)

# Quantification statique (plus précise)
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# Calibrer avec des données représentatives
# for data in calibration_loader:
#     model(data)
torch.quantization.convert(model, inplace=True)

# 2. Compilation TorchScript
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')

# 3. Exportation ONNX
dummy_input = torch.randn(1, 784)
torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}}
)

# 4. Optimisation TensorRT (NVIDIA)
import tensorrt as trt

# 5. Élagage
import torch.nn.utils.prune as prune

# Élaguer 30 % des poids dans la couche linéaire
prune.l1_unstructured(model.fc1, name='weight', amount=0.3)

# Rendre l'élagage permanent
prune.remove(model.fc1, 'weight')

# 6. Distillation des connaissances
class DistillationLoss(nn.Module):
    def __init__(self, temperature=3.0):
        super().__init__()
        self.temperature = temperature
        self.kl_div = nn.KLDivLoss(reduction='batchmean')
    
    def forward(self, student_logits, teacher_logits, labels):
        # Cibles souples du professeur
        soft_loss = self.kl_div(
            F.log_softmax(student_logits / self.temperature, dim=1),
            F.softmax(teacher_logits / self.temperature, dim=1)
        ) * (self.temperature ** 2)
        
        # Cibles dures
        hard_loss = F.cross_entropy(student_logits, labels)
        
        return 0.5 * soft_loss + 0.5 * hard_loss

# 7. Traitement par lots pour l'inférence
class BatchPredictor:
    def __init__(self, model, max_batch_size=32, max_wait_time=0.1):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.queue = []
    
    async def predict(self, x):
        self.queue.append(x)
        
        if len(self.queue) >= self.max_batch_size:
            return await self._process_batch()
        
        # Attendre plus de requêtes ou un délai d'attente
        await asyncio.sleep(self.max_wait_time)
        return await self._process_batch()
    
    async def _process_batch(self):
        batch = torch.stack(self.queue)
        self.queue = []
        return self.model(batch)

Rareté : Très courant Difficulté : Difficile


4. Qu'est-ce que l'entraînement en précision mixte et comment fonctionne-t-il ?

Réponse : La précision mixte utilise FP16 et FP32 pour accélérer l'entraînement tout en conservant la précision.

  • Avantages :
    • Entraînement 2 à 3 fois plus rapide
    • Utilisation réduite de la mémoire
    • Tailles de lots plus importantes
  • Défis :
    • Stabilité numérique
    • Dépassement de capacité du gradient
  • Solution : Mise à l'échelle du gradient
import torch
from torch.cuda.amp import autocast, GradScaler

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()

for epoch in range(10):
    for data, target in train_loader:
        data, target = data.cuda(), target.cuda()
        
        optimizer.zero_grad()
        
        # Passe avant en FP16
        with autocast():
            output = model(data)
            loss = criterion(output, target)
        
        # Passe arrière avec mise à l'échelle du gradient
        scaler.scale(loss).backward()
        
        # Supprimer la mise à l'échelle des gradients et écrêter
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # Mettre à jour les poids
        scaler.step(optimizer)
        scaler.update()

# Précision mixte TensorFlow
from tensorflow.keras import mixed_precision

policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

# Le modèle utilise automatiquement FP16 pour le calcul
model = tf.keras.Sequential([
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(10)
])

# La mise à l'échelle de la perte est gérée automatiquement
optimizer = tf.keras.optimizers.Adam()
optimizer = mixed_precision.LossScaleOptimizer(optimizer)

Rareté : Courant Difficulté : Moyen


5. Comment gérez-vous les goulots d'étranglement du pipeline de données ?

Réponse : Le chargement des données constitue souvent un goulot d'étranglement pour l'entraînement. Optimiser avec :

  • Prélecture : Charger le lot suivant pendant l'entraînement
  • Chargement parallèle : Plusieurs workers
  • Mise en cache : Stocker les données prétraitées
  • Format de données : Utiliser des formats efficaces (TFRecord, Parquet)
import torch
from torch.utils.data import DataLoader, Dataset
import multiprocessing as mp

# Configuration efficace de DataLoader
train_loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=mp.cpu_count(),  # Chargement parallèle
    pin_memory=True,  # Transfert GPU plus rapide
    prefetch_factor=2,  # Prélecture des lots
    persistent_workers=True  # Garder les workers en vie
)

# Ensemble de données personnalisé avec mise en cache
class CachedDataset(Dataset):
    def __init__(self, data_path, cache_size=1000):
        self.data_path = data_path
        self.cache = {}
        self.cache_size = cache_size
    
    def __getitem__(self, idx):
        if idx in self.cache:
            return self.cache[idx]
        
        # Charger et prétraiter
        data = load_and_preprocess(self.data_path, idx)
        
        # Mettre en cache si l'espace est disponible
        if len(self.cache) < self.cache_size:
            self.cache[idx] = data
        
        return data

# Optimisation du pipeline de données TensorFlow
import tensorflow as tf

dataset = tf.data.Dataset.from_tensor_slices((X, y))

dataset = dataset.cache()  # Mettre en cache en mémoire
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)  # Prélecture automatique
dataset = dataset.map(
    preprocess_function,
    num_parallel_calls=tf.data.AUTOTUNE
)

# Utiliser TFRecord pour les grands ensembles de données
def create_tfrecord(data, labels, filename):
    with tf.io.TFRecordWriter(filename) as writer:
        for x, y in zip(data, labels):
            example = tf.train.Example(features=tf.train.Features(feature={
                'data': tf.train.Feature(float_list=tf.train.FloatList(value=x)),
                'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[y]))
            }))
            writer.write(example.SerializeToString())

Rareté : Courant Difficulté : Moyen


MLOps et infrastructure (5 questions)

6. Comment concevez-vous un magasin de caractéristiques ?

Réponse : Les magasins de caractéristiques centralisent l'ingénierie et le service des caractéristiques.

Loading diagram...
  • Composants :
    • Magasin hors ligne : Caractéristiques historiques pour l'entraînement (S3, BigQuery)
    • Magasin en ligne : Caractéristiques à faible latence pour le service (Redis, DynamoDB)
    • Registre de caractéristiques : Métadonnées et filiation
  • Avantages :
    • Réutilisabilité
    • Cohérence (entraînement/service)
    • Surveillance
# Exemple avec Feast (magasin de caractéristiques open source)
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.value_type import ValueType
from datetime import timedelta

# Définir l'entité
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="ID d'utilisateur"
)

# Définir la vue des caractéristiques
user_features = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="age", dtype=ValueType.INT64),
        Feature(name="total_purchases", dtype=ValueType.FLOAT),
        Feature(name="avg_purchase_value", dtype=ValueType.FLOAT),
    ],
    online=True,
    batch_source=FileSource(
        path="data/user_features.parquet",
        event_timestamp_column="timestamp"
    )
)

# Initialiser le magasin de caractéristiques
fs = FeatureStore(repo_path=".")

# Obtenir les caractéristiques pour l'entraînement (hors ligne)
training_df = fs.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:age",
        "user_features:total_purchases",
        "user_features:avg_purchase_value"
    ]
).to_df()

# Obtenir les caractéristiques pour le service (en ligne)
online_features = fs.get_online_features(
    features=[
        "user_features:age",
        "user_features:total_purchases"
    ],
    entity_rows=[{"user_id": 123}]
).to_dict()

# Implémentation personnalisée du magasin de caractéristiques
class SimpleFeatureStore:
    def __init__(self, redis_client, s3_client):
        self.redis = redis_client  # Magasin en ligne
        self.s3 = s3_client  # Magasin hors ligne
    
    def get_online_features(self, entity_id, feature_names):
        features = {}
        for feature in feature_names:
            key = f"{entity_id}:{feature}"
            features[feature] = self.redis.get(key)
        return features
    
    def write_features(self, entity_id, features):
        # Écrire dans le magasin en ligne
        for feature_name, value in features.items():
            key = f"{entity_id}:{feature_name}"
            self.redis.set(key, value, ex=86400)  # TTL de 24 h
        
        # Écrire dans le magasin hors ligne pour l'entraînement
        self.s3.put_object(
            Bucket='features',
            Key=f'{entity_id}/features.json',
            Body=json.dumps(features)
        )

Rareté : Moyen Difficulté : Difficile


7. Comment implémentez-vous le contrôle de version du modèle et le suivi des expériences ?

Réponse : Suivre les expériences pour reproduire les résultats et comparer les modèles.

# MLflow pour le suivi des expériences
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

# Définir l'expérience
mlflow.set_experiment("model_comparison")

# Suivre l'expérience
with mlflow.start_run(run_name="random_forest_v1"):
    # Enregistrer les paramètres
    params = {
        'n_estimators': 100,
        'max_depth': 10,
        'min_samples_split': 2
    }
    mlflow.log_params(params)
    
    # Entraîner le modèle
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    
    # Enregistrer les métriques
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)
    mlflow.log_metrics({
        'train_accuracy': train_score,
        'test_accuracy': test_score
    })
    
    # Enregistrer le modèle
    mlflow.sklearn.log_model(model, "model")
    
    # Enregistrer les artefacts
    mlflow.log_artifact("feature_importance.png")
    
    # Étiqueter l'exécution
    mlflow.set_tags({
        'model_type': 'random_forest',
        'dataset_version': 'v2.0'
    })

# Charger le meilleur modèle
best_run = mlflow.search_runs(
    experiment_ids=['1'],
    order_by=['metrics.test_accuracy DESC'],
    max_results=1
)

model_uri = f"runs:/{best_run.iloc[0].run_id}/model"
loaded_model = mlflow.sklearn.load_model(model_uri)

# Alternative Weights & Biases
import wandb

wandb.init(project="ml-project", name="experiment-1")

# Enregistrer les hyperparamètres
wandb.config.update({
    'learning_rate': 0.001,
    'batch_size': 32,
    'epochs': 10
})

# Enregistrer les métriques pendant l'entraînement
for epoch in range(10):
    # Code d'entraînement
    wandb.log({
        'epoch': epoch,
        'train_loss': train_loss,
        'val_loss': val_loss,
        'accuracy': accuracy
    })

# Enregistrer le modèle
wandb.save('model.h5')

# DVC pour le contrôle de version des données et des modèles
"""
# Initialiser DVC
dvc init

# Suivre les données
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Ajouter les données d'entraînement"

# Suivre le modèle
dvc add models/model.pkl
git add models/model.pkl.dvc
git commit -m "Ajouter le modèle entraîné v1"

# Envoyer vers le stockage distant
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push
"""

Rareté : Très courant Difficulté : Moyen


8. Comment déployez-vous des modèles sur Kubernetes ?

Réponse : Kubernetes orchestre les services ML conteneurisés.

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
      - name: ml-model
        image: ml-model:v1
        ports:
        - containerPort: 5000
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        env:
        - name: MODEL_PATH
          value: "/models/model.pkl"
        livenessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 5000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model
  ports:
  - protocol: TCP
    port: 80
    targetPort: 5000
  type: LoadBalancer
---
# Autoscaler horizontal de pod
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-model-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-model-deployment
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
# app.py avec des contrôles d'intégrité
from flask import Flask, request, jsonify
import joblib
import logging

app = Flask(__name__)
model = None

@app.route('/health')
def health():
    """Sonde d'activité"""
    return jsonify({'status': 'healthy'}), 200

@app.route('/ready')
def ready():
    """Sonde de disponibilité"""
    if model is not None:
        return jsonify({'status': 'ready'}), 200
    return jsonify({'status': 'not ready'}), 503

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.get_json()
        prediction = model.predict([data['features']])
        return jsonify({'prediction': int(prediction[0])})
    except Exception as e:
        logging.error(f"Erreur de prédiction : {e}")
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    model = joblib.load('/models/model.pkl')
    app.run(host='0.0.0.0', port=5000)

Rareté : Courant Difficulté : Difficile


9. Qu'est-ce que la dérive du modèle et comment la détectez-vous ?

Réponse : La dérive du modèle se produit lorsque les performances du modèle se dégradent au fil du temps.

  • Types :
    • Dérive des données : Les changements de distribution des entrées
    • Dérive du concept : La relation entre X et y change
  • Détection :
    • Tests statistiques (test KS, PSI)
    • Surveillance des performances
    • Comparaison de la distribution
import numpy as np
from scipy import stats
from sklearn.metrics import accuracy_score

class DriftDetector:
    def __init__(self, reference_data, threshold=0.05):
        self.reference_data = reference_data
        self.threshold = threshold
        self.reference_mean = np.mean(reference_data, axis=0)
        self.reference_std = np.std(reference_data, axis=0)
    
    def detect_data_drift(self, new_data):
        """Test de Kolmogorov-Smirnov pour chaque caractéristique"""
        drift_detected = []
        
        for i in range(new_data.shape[1]):
            statistic, p_value = stats.ks_2samp(
                self.reference_data[:, i],
                new_data[:, i]
            )
            
            if p_value < self.threshold:
                drift_detected.append({
                    'feature': i,
                    'p_value': p_value,
                    'statistic': statistic
                })
        
        return drift_detected
    
    def calculate_psi(self, expected, actual, buckets=10):
        """Indice de stabilité de la population"""
        def scale_range(x, min_val, max_val):
            return (x - min_val) / (max_val - min_val)
        
        min_val = min(expected.min(), actual.min())
        max_val = max(expected.max(), actual.max())
        
        expected_scaled = scale_range(expected, min_val, max_val)
        actual_scaled = scale_range(actual, min_val, max_val)
        
        expected_percents = np.histogram(expected_scaled, bins=buckets)[0] / len(expected)
        actual_percents = np.histogram(actual_scaled, bins=buckets)[0] / len(actual)
        
        psi = np.sum((actual_percents - expected_percents) * 
                     np.log((actual_percents + 1e-10) / (expected_percents + 1e-10)))
        
        return psi

class PerformanceMonitor:
    def __init__(self, model, window_size=1000):
        self.model = model
        self.window_size = window_size
        self.predictions = []
        self.actuals = []
        self.accuracies = []
    
    def log_prediction(self, X, y_true):
        y_pred = self.model.predict(X)
        
        self.predictions.extend(y_pred)
        self.actuals.extend(y_true)
        
        # Calculer la précision glissante
        if len(self.predictions) >= self.window_size:
            recent_preds = self.predictions[-self.window_size:]
            recent_actuals = self.actuals[-self.window_size:]
            accuracy = accuracy_score(recent_actuals, recent_preds)
            self.accuracies.append(accuracy)
            
            # Alerter si les performances chutent
            if len(self.accuracies) > 10:
                recent_avg = np.mean(self.accuracies[-10:])
                baseline_avg = np.mean(self.accuracies[:10])
                
                if recent_avg < baseline_avg * 0.9:  # Chute de 10 %
                    print(f"ALERTE : Les performances ont chuté de {baseline_avg:.3f} à {recent_avg:.3f}")
                    return True
        
        return False

# Utilisation
detector = DriftDetector(X_train)
drift = detector.detect_data_drift(X_production)

if drift:
    print(f"Dérive des données détectée dans {len(drift)} caractéristiques")
    # Déclencher le réentraînement

Rareté : Courant Difficulté : Difficile


10. Comment implémentez-vous les tests A/B pour les modèles ML ?

Réponse : Les tests A/B comparent les versions de modèle en production.

import random
import hashlib
from datetime import datetime

class ABTestFramework:
    def __init__(self, model_a, model_b, traffic_split=0.5):
        self.model_a = model_a
        self.model_b = model_b
        self.traffic_split = traffic_split
        self.results_a = []
        self.results_b = []
    
    def get_variant(self, user_id):
        """Attribution cohérente basée sur l'ID d'utilisateur"""
        hash_value = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
        return 'A' if (hash_value % 100) < (self.traffic_split * 100) else 'B'
    
    def predict(self, user_id, features):
        variant = self.get_variant(user_id)
        
        if variant == 'A':
            prediction = self.model_a.predict([features])[0]
            model_version = 'A'
        else:
            prediction = self.model_b.predict([features])[0]
            model_version = 'B'
        
        # Enregistrer la prédiction
        self.log_prediction(user_id, features, prediction, model_version)
        
        return prediction, model_version
    
    def log_prediction(self, user_id, features, prediction, variant):
        log_entry = {
            'timestamp': datetime.now(),
            'user_id': user_id,
            'variant': variant,
            'prediction': prediction
        }
        
        if variant == 'A':
            self.results_a.append(log_entry)
        else:
            self.results_b.append(log_entry)
    
    def log_outcome(self, user_id, actual_value):
        """Enregistrer le résultat réel pour l'analyse"""
        # Rechercher la prédiction dans les journaux et mettre à jour
        pass
    
    def analyze_results(self):
        """Analyse statistique des tests A/B"""
        from scipy import stats
        
        # Calculer les taux de conversion
        conversions_a = sum(1 for r in self.results_a if r.get('converted'))
        conversions_b = sum(1 for r in self.results_b if r.get('converted'))
        
        rate_a = conversions_a / len(self.results_a)
        rate_b = conversions_b / len(self.results_b)
        
        # Test de signification statistique
        statistic, p_value = stats.chi2_contingency([
            [conversions_a, len(self.results_
Newsletter subscription

Conseils de carrière hebdomadaires qui fonctionnent vraiment

Recevez les dernières idées directement dans votre boîte de réception

Démarquez-vous auprès des Recruteurs et Décrochez Votre Emploi de Rêve

Rejoignez des milliers de personnes qui ont transformé leur carrière avec des CV alimentés par l'IA qui passent les ATS et impressionnent les responsables du recrutement.

Commencer maintenant

Partager cet article

Faites Compter Vos 6 Secondes

Les recruteurs scannent les CV pendant seulement 6 à 7 secondes en moyenne. Nos modèles éprouvés sont conçus pour capter l'attention instantanément et les inciter à continuer la lecture.