November 23, 2025
16 min read

Senior Machine Learning Engineer Interview Questions: Complete Guide

interview
career-advice
job-search
Senior Machine Learning Engineer Interview Questions: Complete Guide
MB

Milad Bonakdar

Author

Master advanced ML engineering with essential interview questions covering distributed training, model optimization, MLOps, system design, and production ML at scale for senior machine learning engineers.


Introduction

Senior Machine Learning Engineers architect and scale ML systems in production, optimize model performance, build robust ML infrastructure, and lead technical initiatives. This role demands expertise in distributed systems, advanced optimization techniques, MLOps, and the ability to solve complex engineering challenges.

This comprehensive guide covers essential interview questions for Senior Machine Learning Engineers, spanning distributed training, model optimization, MLOps infrastructure, system design, feature engineering at scale, and production best practices. Each question includes detailed answers, rarity assessment, and difficulty ratings.


Distributed Training & Scalability (5 Questions)

1. How do you implement distributed training for deep learning models?

Answer: Distributed training parallelizes computation across multiple GPUs/machines.

  • Strategies:
    • Data Parallelism: Same model, different data batches
    • Model Parallelism: Split model across devices
    • Pipeline Parallelism: Split model into stages
  • Frameworks: PyTorch DDP, Horovod, TensorFlow MirroredStrategy
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

# Initialize distributed training
def setup_distributed(rank, world_size):
    dist.init_process_group(
        backend='nccl',  # Use 'gloo' for CPU
        init_method='env://',
        world_size=world_size,
        rank=rank
    )

# Model setup
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(784, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )
    
    def forward(self, x):
        return self.layers(x)

def train_distributed(rank, world_size):
    setup_distributed(rank, world_size)
    
    # Create model and move to GPU
    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])
    
    # Create distributed sampler
    train_dataset = MyDataset()
    train_sampler = DistributedSampler(
        train_dataset,
        num_replicas=world_size,
        rank=rank
    )
    
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=32,
        sampler=train_sampler
    )
    
    optimizer = torch.optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(10):
        train_sampler.set_epoch(epoch)  # Shuffle differently each epoch
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(rank), target.to(rank)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            if rank == 0 and batch_idx % 100 == 0:
                print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
    
    dist.destroy_process_group()

# Launch with torch.multiprocessing
import torch.multiprocessing as mp

if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    mp.spawn(train_distributed, args=(world_size,), nprocs=world_size)

# TensorFlow distributed training
import tensorflow as tf

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(512, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    
    model.compile(
        optimizer='adam',
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )

# model.fit will automatically distribute training
# model.fit(train_dataset, epochs=10)

Rarity: Common Difficulty: Hard


2. Explain gradient accumulation and when to use it.

Answer: Gradient accumulation simulates larger batch sizes when GPU memory is limited.

  • How it works: Accumulate gradients over multiple forward passes before updating weights
  • Use cases: Large models, limited GPU memory, stable training
import torch
import torch.nn as nn

model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

# Effective batch size = batch_size * accumulation_steps
batch_size = 8
accumulation_steps = 4  # Effective batch size = 32

model.train()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    # Forward pass
    output = model(data)
    loss = criterion(output, target)
    
    # Normalize loss by accumulation steps
    loss = loss / accumulation_steps
    
    # Backward pass (accumulate gradients)
    loss.backward()
    
    # Update weights every accumulation_steps
    if (batch_idx + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

# Mixed precision training with gradient accumulation
from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()
optimizer.zero_grad()

for batch_idx, (data, target) in enumerate(train_loader):
    with autocast():
        output = model(data)
        loss = criterion(output, target) / accumulation_steps
    
    scaler.scale(loss).backward()
    
    if (batch_idx + 1) % accumulation_steps == 0:
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

Rarity: Common Difficulty: Medium


3. How do you optimize model inference latency?

Answer: Multiple techniques reduce inference time:

  • Model Optimization:
    • Quantization (INT8, FP16)
    • Pruning (remove weights)
    • Knowledge distillation
    • Model compilation (TorchScript, ONNX)
  • Serving Optimization:
    • Batching
    • Caching
    • Model parallelism
    • Hardware acceleration (GPU, TPU)
import torch
import torch.nn as nn

# 1. Quantization (INT8)
model = MyModel()
model.eval()

# Dynamic quantization
quantized_model = torch.quantization.quantize_dynamic(
    model, {nn.Linear}, dtype=torch.qint8
)

# Static quantization (more accurate)
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# Calibrate with representative data
# for data in calibration_loader:
#     model(data)
torch.quantization.convert(model, inplace=True)

# 2. TorchScript compilation
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')

# 3. ONNX export
dummy_input = torch.randn(1, 784)
torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}}
)

# 4. TensorRT optimization (NVIDIA)
import tensorrt as trt

# 5. Pruning
import torch.nn.utils.prune as prune

# Prune 30% of weights in linear layer
prune.l1_unstructured(model.fc1, name='weight', amount=0.3)

# Make pruning permanent
prune.remove(model.fc1, 'weight')

# 6. Knowledge distillation
class DistillationLoss(nn.Module):
    def __init__(self, temperature=3.0):
        super().__init__()
        self.temperature = temperature
        self.kl_div = nn.KLDivLoss(reduction='batchmean')
    
    def forward(self, student_logits, teacher_logits, labels):
        # Soft targets from teacher
        soft_loss = self.kl_div(
            F.log_softmax(student_logits / self.temperature, dim=1),
            F.softmax(teacher_logits / self.temperature, dim=1)
        ) * (self.temperature ** 2)
        
        # Hard targets
        hard_loss = F.cross_entropy(student_logits, labels)
        
        return 0.5 * soft_loss + 0.5 * hard_loss

# 7. Batching for inference
class BatchPredictor:
    def __init__(self, model, max_batch_size=32, max_wait_time=0.1):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.queue = []
    
    async def predict(self, x):
        self.queue.append(x)
        
        if len(self.queue) >= self.max_batch_size:
            return await self._process_batch()
        
        # Wait for more requests or timeout
        await asyncio.sleep(self.max_wait_time)
        return await self._process_batch()
    
    async def _process_batch(self):
        batch = torch.stack(self.queue)
        self.queue = []
        return self.model(batch)

Rarity: Very Common Difficulty: Hard


4. What is mixed precision training and how does it work?

Answer: Mixed precision uses FP16 and FP32 to speed up training while maintaining accuracy.

  • Benefits:
    • 2-3x faster training
    • Reduced memory usage
    • Larger batch sizes
  • Challenges:
    • Numerical stability
    • Gradient underflow
  • Solution: Gradient scaling
import torch
from torch.cuda.amp import autocast, GradScaler

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()

for epoch in range(10):
    for data, target in train_loader:
        data, target = data.cuda(), target.cuda()
        
        optimizer.zero_grad()
        
        # Forward pass in FP16
        with autocast():
            output = model(data)
            loss = criterion(output, target)
        
        # Backward pass with gradient scaling
        scaler.scale(loss).backward()
        
        # Unscale gradients and clip
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # Update weights
        scaler.step(optimizer)
        scaler.update()

# TensorFlow mixed precision
from tensorflow.keras import mixed_precision

policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

# Model automatically uses FP16 for computation
model = tf.keras.Sequential([
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(10)
])

# Loss scaling handled automatically
optimizer = tf.keras.optimizers.Adam()
optimizer = mixed_precision.LossScaleOptimizer(optimizer)

Rarity: Common Difficulty: Medium


5. How do you handle data pipeline bottlenecks?

Answer: Data loading often bottlenecks training. Optimize with:

  • Prefetching: Load next batch while training
  • Parallel loading: Multiple workers
  • Caching: Store preprocessed data
  • Data format: Use efficient formats (TFRecord, Parquet)
import torch
from torch.utils.data import DataLoader, Dataset
import multiprocessing as mp

# Efficient DataLoader configuration
train_loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=mp.cpu_count(),  # Parallel loading
    pin_memory=True,  # Faster GPU transfer
    prefetch_factor=2,  # Prefetch batches
    persistent_workers=True  # Keep workers alive
)

# Custom dataset with caching
class CachedDataset(Dataset):
    def __init__(self, data_path, cache_size=1000):
        self.data_path = data_path
        self.cache = {}
        self.cache_size = cache_size
    
    def __getitem__(self, idx):
        if idx in self.cache:
            return self.cache[idx]
        
        # Load and preprocess
        data = load_and_preprocess(self.data_path, idx)
        
        # Cache if space available
        if len(self.cache) < self.cache_size:
            self.cache[idx] = data
        
        return data

# TensorFlow data pipeline optimization
import tensorflow as tf

dataset = tf.data.Dataset.from_tensor_slices((X, y))

dataset = dataset.cache()  # Cache in memory
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)  # Automatic prefetching
dataset = dataset.map(
    preprocess_function,
    num_parallel_calls=tf.data.AUTOTUNE
)

# Use TFRecord for large datasets
def create_tfrecord(data, labels, filename):
    with tf.io.TFRecordWriter(filename) as writer:
        for x, y in zip(data, labels):
            example = tf.train.Example(features=tf.train.Features(feature={
                'data': tf.train.Feature(float_list=tf.train.FloatList(value=x)),
                'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[y]))
            }))
            writer.write(example.SerializeToString())

Rarity: Common Difficulty: Medium


MLOps & Infrastructure (5 Questions)

6. How do you design a feature store?

Answer: Feature stores centralize feature engineering and serving.

Loading diagram...
  • Components:
    • Offline Store: Historical features for training (S3, BigQuery)
    • Online Store: Low-latency features for serving (Redis, DynamoDB)
    • Feature Registry: Metadata and lineage
  • Benefits:
    • Reusability
    • Consistency (train/serve)
    • Monitoring
# Example with Feast (open-source feature store)
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.value_type import ValueType
from datetime import timedelta

# Define entity
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="User ID"
)

# Define feature view
user_features = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="age", dtype=ValueType.INT64),
        Feature(name="total_purchases", dtype=ValueType.FLOAT),
        Feature(name="avg_purchase_value", dtype=ValueType.FLOAT),
    ],
    online=True,
    batch_source=FileSource(
        path="data/user_features.parquet",
        event_timestamp_column="timestamp"
    )
)

# Initialize feature store
fs = FeatureStore(repo_path=".")

# Get features for training (offline)
training_df = fs.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:age",
        "user_features:total_purchases",
        "user_features:avg_purchase_value"
    ]
).to_df()

# Get features for serving (online)
online_features = fs.get_online_features(
    features=[
        "user_features:age",
        "user_features:total_purchases"
    ],
    entity_rows=[{"user_id": 123}]
).to_dict()

# Custom feature store implementation
class SimpleFeatureStore:
    def __init__(self, redis_client, s3_client):
        self.redis = redis_client  # Online store
        self.s3 = s3_client  # Offline store
    
    def get_online_features(self, entity_id, feature_names):
        features = {}
        for feature in feature_names:
            key = f"{entity_id}:{feature}"
            features[feature] = self.redis.get(key)
        return features
    
    def write_features(self, entity_id, features):
        # Write to online store
        for feature_name, value in features.items():
            key = f"{entity_id}:{feature_name}"
            self.redis.set(key, value, ex=86400)  # 24h TTL
        
        # Write to offline store for training
        self.s3.put_object(
            Bucket='features',
            Key=f'{entity_id}/features.json',
            Body=json.dumps(features)
        )

Rarity: Medium Difficulty: Hard


7. How do you implement model versioning and experiment tracking?

Answer: Track experiments to reproduce results and compare models.

# MLflow for experiment tracking
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

# Set experiment
mlflow.set_experiment("model_comparison")

# Track experiment
with mlflow.start_run(run_name="random_forest_v1"):
    # Log parameters
    params = {
        'n_estimators': 100,
        'max_depth': 10,
        'min_samples_split': 2
    }
    mlflow.log_params(params)
    
    # Train model
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    
    # Log metrics
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)
    mlflow.log_metrics({
        'train_accuracy': train_score,
        'test_accuracy': test_score
    })
    
    # Log model
    mlflow.sklearn.log_model(model, "model")
    
    # Log artifacts
    mlflow.log_artifact("feature_importance.png")
    
    # Tag run
    mlflow.set_tags({
        'model_type': 'random_forest',
        'dataset_version': 'v2.0'
    })

# Load best model
best_run = mlflow.search_runs(
    experiment_ids=['1'],
    order_by=['metrics.test_accuracy DESC'],
    max_results=1
)

model_uri = f"runs:/{best_run.iloc[0].run_id}/model"
loaded_model = mlflow.sklearn.load_model(model_uri)

# Weights & Biases alternative
import wandb

wandb.init(project="ml-project", name="experiment-1")

# Log hyperparameters
wandb.config.update({
    'learning_rate': 0.001,
    'batch_size': 32,
    'epochs': 10
})

# Log metrics during training
for epoch in range(10):
    # Training code
    wandb.log({
        'epoch': epoch,
        'train_loss': train_loss,
        'val_loss': val_loss,
        'accuracy': accuracy
    })

# Log model
wandb.save('model.h5')

# DVC for data and model versioning
"""
# Initialize DVC
dvc init

# Track data
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Add training data"

# Track model
dvc add models/model.pkl
git add models/model.pkl.dvc
git commit -m "Add trained model v1"

# Push to remote storage
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push
"""

Rarity: Very Common Difficulty: Medium


8. How do you deploy models on Kubernetes?

Answer: Kubernetes orchestrates containerized ML services.

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
      - name: ml-model
        image: ml-model:v1
        ports:
        - containerPort: 5000
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        env:
        - name: MODEL_PATH
          value: "/models/model.pkl"
        livenessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 5000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model
  ports:
  - protocol: TCP
    port: 80
    targetPort: 5000
  type: LoadBalancer
---
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-model-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-model-deployment
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
# app.py with health checks
from flask import Flask, request, jsonify
import joblib
import logging

app = Flask(__name__)
model = None

@app.route('/health')
def health():
    """Liveness probe"""
    return jsonify({'status': 'healthy'}), 200

@app.route('/ready')
def ready():
    """Readiness probe"""
    if model is not None:
        return jsonify({'status': 'ready'}), 200
    return jsonify({'status': 'not ready'}), 503

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.get_json()
        prediction = model.predict([data['features']])
        return jsonify({'prediction': int(prediction[0])})
    except Exception as e:
        logging.error(f"Prediction error: {e}")
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    model = joblib.load('/models/model.pkl')
    app.run(host='0.0.0.0', port=5000)

Rarity: Common Difficulty: Hard


9. What is model drift and how do you detect it?

Answer: Model drift occurs when model performance degrades over time.

  • Types:
    • Data Drift: Input distribution changes
    • Concept Drift: Relationship between X and y changes
  • Detection:
    • Statistical tests (KS test, PSI)
    • Performance monitoring
    • Distribution comparison
import numpy as np
from scipy import stats
from sklearn.metrics import accuracy_score

class DriftDetector:
    def __init__(self, reference_data, threshold=0.05):
        self.reference_data = reference_data
        self.threshold = threshold
        self.reference_mean = np.mean(reference_data, axis=0)
        self.reference_std = np.std(reference_data, axis=0)
    
    def detect_data_drift(self, new_data):
        """Kolmogorov-Smirnov test for each feature"""
        drift_detected = []
        
        for i in range(new_data.shape[1]):
            statistic, p_value = stats.ks_2samp(
                self.reference_data[:, i],
                new_data[:, i]
            )
            
            if p_value < self.threshold:
                drift_detected.append({
                    'feature': i,
                    'p_value': p_value,
                    'statistic': statistic
                })
        
        return drift_detected
    
    def calculate_psi(self, expected, actual, buckets=10):
        """Population Stability Index"""
        def scale_range(x, min_val, max_val):
            return (x - min_val) / (max_val - min_val)
        
        min_val = min(expected.min(), actual.min())
        max_val = max(expected.max(), actual.max())
        
        expected_scaled = scale_range(expected, min_val, max_val)
        actual_scaled = scale_range(actual, min_val, max_val)
        
        expected_percents = np.histogram(expected_scaled, bins=buckets)[0] / len(expected)
        actual_percents = np.histogram(actual_scaled, bins=buckets)[0] / len(actual)
        
        psi = np.sum((actual_percents - expected_percents) * 
                     np.log((actual_percents + 1e-10) / (expected_percents + 1e-10)))
        
        return psi

class PerformanceMonitor:
    def __init__(self, model, window_size=1000):
        self.model = model
        self.window_size = window_size
        self.predictions = []
        self.actuals = []
        self.accuracies = []
    
    def log_prediction(self, X, y_true):
        y_pred = self.model.predict(X)
        
        self.predictions.extend(y_pred)
        self.actuals.extend(y_true)
        
        # Calculate rolling accuracy
        if len(self.predictions) >= self.window_size:
            recent_preds = self.predictions[-self.window_size:]
            recent_actuals = self.actuals[-self.window_size:]
            accuracy = accuracy_score(recent_actuals, recent_preds)
            self.accuracies.append(accuracy)
            
            # Alert if performance drops
            if len(self.accuracies) > 10:
                recent_avg = np.mean(self.accuracies[-10:])
                baseline_avg = np.mean(self.accuracies[:10])
                
                if recent_avg < baseline_avg * 0.9:  # 10% drop
                    print(f"ALERT: Performance dropped from {baseline_avg:.3f} to {recent_avg:.3f}")
                    return True
        
        return False

# Usage
detector = DriftDetector(X_train)
drift = detector.detect_data_drift(X_production)

if drift:
    print(f"Data drift detected in {len(drift)} features")
    # Trigger retraining

Rarity: Common Difficulty: Hard


10. How do you implement A/B testing for ML models?

Answer: A/B testing compares model versions in production.

import random
import hashlib
from datetime import datetime

class ABTestFramework:
    def __init__(self, model_a, model_b, traffic_split=0.5):
        self.model_a = model_a
        self.model_b = model_b
        self.traffic_split = traffic_split
        self.results_a = []
        self.results_b = []
    
    def get_variant(self, user_id):
        """Consistent assignment based on user_id"""
        hash_value = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
        return 'A' if (hash_value % 100) < (self.traffic_split * 100) else 'B'
    
    def predict(self, user_id, features):
        variant = self.get_variant(user_id)
        
        if variant == 'A':
            prediction = self.model_a.predict([features])[0]
            model_version = 'A'
        else:
            prediction = self.model_b.predict([features])[0]
            model_version = 'B'
        
        # Log prediction
        self.log_prediction(user_id, features, prediction, model_version)
        
        return prediction, model_version
    
    def log_prediction(self, user_id, features, prediction, variant):
        log_entry = {
            'timestamp': datetime.now(),
            'user_id': user_id,
            'variant': variant,
            'prediction': prediction
        }
        
        if variant == 'A':
            self.results_a.append(log_entry)
        else:
            self.results_b.append(log_entry)
    
    def log_outcome(self, user_id, actual_value):
        """Log actual outcome for analysis"""
        # Find prediction in logs and update
        pass
    
    def analyze_results(self):
        """Statistical analysis of A/B test"""
        from scipy import stats
        
        # Calculate conversion rates
        conversions_a = sum(1 for r in self.results_a if r.get('converted'))
        conversions_b = sum(1 for r in self.results_b if r.get('converted'))
        
        rate_a = conversions_a / len(self.results_a)
        rate_b = conversions_b / len(self.results_b)
        
        # Statistical significance test
        statistic, p_value = stats.chi2_contingency([
            [conversions_a, len(self.results_a) - conversions_a],
            [conversions_b, len(self.results_b) - conversions_b]
        ])[:2]
        
        return {
            'variant_a_rate': rate_a,
            'variant_b_rate': rate_b,
            'lift': (rate_b - rate_a) / rate_a * 100,
            'p_value': p_value,
            'significant': p_value < 0.05
        }

# Usage
ab_test = ABTestFramework(model_v1, model_v2, traffic_split=0.5)

# Make predictions
for user_id, features in requests:
    prediction, variant = ab_test.predict(user_id, features)
    
# Analyze after collecting data
results = ab_test.analyze_results()
print(f"Variant B lift: {results['lift']:.2f}%")
print(f"Statistically significant: {results['significant']}")

Rarity: Common Difficulty: Hard


System Design & Architecture (3 Questions)

11. Design a recommendation system architecture.

Answer: Recommendation systems require real-time serving and batch processing.

Loading diagram...

Components:

  • Data Pipeline: Kafka for streaming events
  • Feature Store: Online/offline features
  • Training: Batch training (daily/weekly)
  • Serving: Low-latency predictions (under 100ms)
  • Caching: Redis for popular items
  • Fallback: Rule-based recommendations
# Simplified recommendation system
class RecommendationSystem:
    def __init__(self, model, feature_store, cache):
        self.model = model
        self.feature_store = feature_store
        self.cache = cache
    
    def get_recommendations(self, user_id, num_items=10):
        # Check cache first
        cache_key = f"recs:{user_id}"
        cached = self.cache.get(cache_key)
        if cached:
            return cached
        
        # Get user features
        user_features = self.feature_store.get_online_features(
            entity_id=user_id,
            feature_names=['age', 'preferences', 'history']
        )
        
        # Get candidate items
        candidates = self.get_candidates(user_id)
        
        # Score candidates
        scores = []
        for item_id in candidates:
            item_features = self.feature_store.get_online_features(
                entity_id=item_id,
                feature_names=['category', 'popularity', 'price']
            )
            
            features = {**user_features, **item_features}
            score = self.model.predict([features])[0]
            scores.append((item_id, score))
        
        # Sort and return top N
        recommendations = sorted(scores, key=lambda x: x[1], reverse=True)[:num_items]
        
        # Cache results
        self.cache.set(cache_key, recommendations, ex=3600)
        
        return recommendations
    
    def get_candidates(self, user_id):
        # Candidate generation (collaborative filtering, content-based, etc.)
        return candidate_items

Rarity: Medium Difficulty: Hard


12. How do you handle model serving at scale?

Answer: Serving millions of predictions requires careful architecture.

  • Strategies:
    • Load balancing
    • Auto-scaling
    • Model caching
    • Batch prediction
    • Model optimization
# Model serving with TensorFlow Serving
"""
# Start TensorFlow Serving
docker run -p 8501:8501 \
  --mount type=bind,source=/path/to/model,target=/models/my_model \
  -e MODEL_NAME=my_model \
  tensorflow/serving

# Client code
import requests
import json

data = json.dumps({
    "signature_name": "serving_default",
    "instances": [[1.0, 2.0, 3.0, 4.0]]
})

headers = {"content-type": "application/json"}
response = requests.post(
    'http://localhost:8501/v1/models/my_model:predict',
    data=data,
    headers=headers
)

predictions = response.json()['predictions']
"""

# Custom serving with batching
import asyncio
from collections import deque
import time

class BatchedModelServer:
    def __init__(self, model, max_batch_size=32, max_wait_ms=100):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms / 1000
        self.queue = deque()
        self.processing = False
    
    async def predict(self, features):
        # Add to queue
        future = asyncio.Future()
        self.queue.append((features, future))
        
        # Start processing if not already
        if not self.processing:
            asyncio.create_task(self.process_batch())
        
        # Wait for result
        return await future
    
    async def process_batch(self):
        self.processing = True
        
        while self.queue:
            # Wait for batch to fill or timeout
            start_time = time.time()
            while len(self.queue) < self.max_batch_size:
                if time.time() - start_time > self.max_wait_ms:
                    break
                await asyncio.sleep(0.001)
            
            # Get batch
            batch_size = min(len(self.queue), self.max_batch_size)
            batch = [self.queue.popleft() for _ in range(batch_size)]
            
            # Process batch
            features = [item[0] for item in batch]
            futures = [item[1] for item in batch]
            
            predictions = self.model.predict(features)
            
            # Return results
            for future, prediction in zip(futures, predictions):
                future.set_result(prediction)
        
        self.processing = False

Rarity: Common Difficulty: Hard


13. How do you ensure model reproducibility?

Answer: Reproducibility enables debugging and compliance.

  • Best Practices:
    • Version control (code, data, models)
    • Seed fixing
    • Environment management (Docker)
    • Experiment tracking
    • Data lineage
import random
import numpy as np
import torch

def set_seed(seed=42):
    """Set all random seeds for reproducibility"""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# requirements.txt with exact versions
"""
numpy==1.21.0
scikit-learn==0.24.2
torch==1.9.0
"""

# Docker for environment
"""
FROM python:3.9
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "train.py"]
"""

# Track everything with MLflow
import mlflow

with mlflow.start_run():
    # Log code version
    mlflow.log_param("git_commit", get_git_commit())
    
    # Log data version
    mlflow.log_param("data_version", "v1.2.3")
    
    # Log environment
    mlflow.log_artifact("requirements.txt")
    
    # Log model
    mlflow.sklearn.log_model(model, "model")

Rarity: Common Difficulty: Medium


Related Posts

Recent Posts

Weekly career tips that actually work

Get the latest insights delivered straight to your inbox