Senior Machine Learning Engineer Interview Questions: Complete Guide

Milad Bonakdar
Author
Master advanced ML engineering with essential interview questions covering distributed training, model optimization, MLOps, system design, and production ML at scale for senior machine learning engineers.
Introduction
Senior Machine Learning Engineers architect and scale ML systems in production, optimize model performance, build robust ML infrastructure, and lead technical initiatives. This role demands expertise in distributed systems, advanced optimization techniques, MLOps, and the ability to solve complex engineering challenges.
This comprehensive guide covers essential interview questions for Senior Machine Learning Engineers, spanning distributed training, model optimization, MLOps infrastructure, system design, feature engineering at scale, and production best practices. Each question includes detailed answers, rarity assessment, and difficulty ratings.
Distributed Training & Scalability (5 Questions)
1. How do you implement distributed training for deep learning models?
Answer: Distributed training parallelizes computation across multiple GPUs/machines.
- Strategies:
- Data Parallelism: Same model, different data batches
- Model Parallelism: Split model across devices
- Pipeline Parallelism: Split model into stages
- Frameworks: PyTorch DDP, Horovod, TensorFlow MirroredStrategy
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
# Initialize distributed training
def setup_distributed(rank, world_size):
dist.init_process_group(
backend='nccl', # Use 'gloo' for CPU
init_method='env://',
world_size=world_size,
rank=rank
)
# Model setup
class MyModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(784, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
return self.layers(x)
def train_distributed(rank, world_size):
setup_distributed(rank, world_size)
# Create model and move to GPU
model = MyModel().to(rank)
model = DDP(model, device_ids=[rank])
# Create distributed sampler
train_dataset = MyDataset()
train_sampler = DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=rank
)
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=32,
sampler=train_sampler
)
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
for epoch in range(10):
train_sampler.set_epoch(epoch) # Shuffle differently each epoch
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(rank), target.to(rank)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
if rank == 0 and batch_idx % 100 == 0:
print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
dist.destroy_process_group()
# Launch with torch.multiprocessing
import torch.multiprocessing as mp
if __name__ == '__main__':
world_size = torch.cuda.device_count()
mp.spawn(train_distributed, args=(world_size,), nprocs=world_size)
# TensorFlow distributed training
import tensorflow as tf
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(512, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
# model.fit will automatically distribute training
# model.fit(train_dataset, epochs=10)Rarity: Common Difficulty: Hard
2. Explain gradient accumulation and when to use it.
Answer: Gradient accumulation simulates larger batch sizes when GPU memory is limited.
- How it works: Accumulate gradients over multiple forward passes before updating weights
- Use cases: Large models, limited GPU memory, stable training
import torch
import torch.nn as nn
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
# Effective batch size = batch_size * accumulation_steps
batch_size = 8
accumulation_steps = 4 # Effective batch size = 32
model.train()
optimizer.zero_grad()
for batch_idx, (data, target) in enumerate(train_loader):
# Forward pass
output = model(data)
loss = criterion(output, target)
# Normalize loss by accumulation steps
loss = loss / accumulation_steps
# Backward pass (accumulate gradients)
loss.backward()
# Update weights every accumulation_steps
if (batch_idx + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
# Mixed precision training with gradient accumulation
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
optimizer.zero_grad()
for batch_idx, (data, target) in enumerate(train_loader):
with autocast():
output = model(data)
loss = criterion(output, target) / accumulation_steps
scaler.scale(loss).backward()
if (batch_idx + 1) % accumulation_steps == 0:
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()Rarity: Common Difficulty: Medium
3. How do you optimize model inference latency?
Answer: Multiple techniques reduce inference time:
- Model Optimization:
- Quantization (INT8, FP16)
- Pruning (remove weights)
- Knowledge distillation
- Model compilation (TorchScript, ONNX)
- Serving Optimization:
- Batching
- Caching
- Model parallelism
- Hardware acceleration (GPU, TPU)
import torch
import torch.nn as nn
# 1. Quantization (INT8)
model = MyModel()
model.eval()
# Dynamic quantization
quantized_model = torch.quantization.quantize_dynamic(
model, {nn.Linear}, dtype=torch.qint8
)
# Static quantization (more accurate)
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# Calibrate with representative data
# for data in calibration_loader:
# model(data)
torch.quantization.convert(model, inplace=True)
# 2. TorchScript compilation
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')
# 3. ONNX export
dummy_input = torch.randn(1, 784)
torch.onnx.export(
model,
dummy_input,
'model.onnx',
input_names=['input'],
output_names=['output'],
dynamic_axes={'input': {0: 'batch_size'}}
)
# 4. TensorRT optimization (NVIDIA)
import tensorrt as trt
# 5. Pruning
import torch.nn.utils.prune as prune
# Prune 30% of weights in linear layer
prune.l1_unstructured(model.fc1, name='weight', amount=0.3)
# Make pruning permanent
prune.remove(model.fc1, 'weight')
# 6. Knowledge distillation
class DistillationLoss(nn.Module):
def __init__(self, temperature=3.0):
super().__init__()
self.temperature = temperature
self.kl_div = nn.KLDivLoss(reduction='batchmean')
def forward(self, student_logits, teacher_logits, labels):
# Soft targets from teacher
soft_loss = self.kl_div(
F.log_softmax(student_logits / self.temperature, dim=1),
F.softmax(teacher_logits / self.temperature, dim=1)
) * (self.temperature ** 2)
# Hard targets
hard_loss = F.cross_entropy(student_logits, labels)
return 0.5 * soft_loss + 0.5 * hard_loss
# 7. Batching for inference
class BatchPredictor:
def __init__(self, model, max_batch_size=32, max_wait_time=0.1):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.queue = []
async def predict(self, x):
self.queue.append(x)
if len(self.queue) >= self.max_batch_size:
return await self._process_batch()
# Wait for more requests or timeout
await asyncio.sleep(self.max_wait_time)
return await self._process_batch()
async def _process_batch(self):
batch = torch.stack(self.queue)
self.queue = []
return self.model(batch)Rarity: Very Common Difficulty: Hard
4. What is mixed precision training and how does it work?
Answer: Mixed precision uses FP16 and FP32 to speed up training while maintaining accuracy.
- Benefits:
- 2-3x faster training
- Reduced memory usage
- Larger batch sizes
- Challenges:
- Numerical stability
- Gradient underflow
- Solution: Gradient scaling
import torch
from torch.cuda.amp import autocast, GradScaler
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()
for epoch in range(10):
for data, target in train_loader:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
# Forward pass in FP16
with autocast():
output = model(data)
loss = criterion(output, target)
# Backward pass with gradient scaling
scaler.scale(loss).backward()
# Unscale gradients and clip
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# Update weights
scaler.step(optimizer)
scaler.update()
# TensorFlow mixed precision
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)
# Model automatically uses FP16 for computation
model = tf.keras.Sequential([
tf.keras.layers.Dense(512, activation='relu'),
tf.keras.layers.Dense(10)
])
# Loss scaling handled automatically
optimizer = tf.keras.optimizers.Adam()
optimizer = mixed_precision.LossScaleOptimizer(optimizer)Rarity: Common Difficulty: Medium
5. How do you handle data pipeline bottlenecks?
Answer: Data loading often bottlenecks training. Optimize with:
- Prefetching: Load next batch while training
- Parallel loading: Multiple workers
- Caching: Store preprocessed data
- Data format: Use efficient formats (TFRecord, Parquet)
import torch
from torch.utils.data import DataLoader, Dataset
import multiprocessing as mp
# Efficient DataLoader configuration
train_loader = DataLoader(
dataset,
batch_size=32,
num_workers=mp.cpu_count(), # Parallel loading
pin_memory=True, # Faster GPU transfer
prefetch_factor=2, # Prefetch batches
persistent_workers=True # Keep workers alive
)
# Custom dataset with caching
class CachedDataset(Dataset):
def __init__(self, data_path, cache_size=1000):
self.data_path = data_path
self.cache = {}
self.cache_size = cache_size
def __getitem__(self, idx):
if idx in self.cache:
return self.cache[idx]
# Load and preprocess
data = load_and_preprocess(self.data_path, idx)
# Cache if space available
if len(self.cache) < self.cache_size:
self.cache[idx] = data
return data
# TensorFlow data pipeline optimization
import tensorflow as tf
dataset = tf.data.Dataset.from_tensor_slices((X, y))
dataset = dataset.cache() # Cache in memory
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE) # Automatic prefetching
dataset = dataset.map(
preprocess_function,
num_parallel_calls=tf.data.AUTOTUNE
)
# Use TFRecord for large datasets
def create_tfrecord(data, labels, filename):
with tf.io.TFRecordWriter(filename) as writer:
for x, y in zip(data, labels):
example = tf.train.Example(features=tf.train.Features(feature={
'data': tf.train.Feature(float_list=tf.train.FloatList(value=x)),
'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[y]))
}))
writer.write(example.SerializeToString())Rarity: Common Difficulty: Medium
MLOps & Infrastructure (5 Questions)
6. How do you design a feature store?
Answer: Feature stores centralize feature engineering and serving.
- Components:
- Offline Store: Historical features for training (S3, BigQuery)
- Online Store: Low-latency features for serving (Redis, DynamoDB)
- Feature Registry: Metadata and lineage
- Benefits:
- Reusability
- Consistency (train/serve)
- Monitoring
# Example with Feast (open-source feature store)
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.value_type import ValueType
from datetime import timedelta
# Define entity
user = Entity(
name="user_id",
value_type=ValueType.INT64,
description="User ID"
)
# Define feature view
user_features = FeatureView(
name="user_features",
entities=["user_id"],
ttl=timedelta(days=1),
features=[
Feature(name="age", dtype=ValueType.INT64),
Feature(name="total_purchases", dtype=ValueType.FLOAT),
Feature(name="avg_purchase_value", dtype=ValueType.FLOAT),
],
online=True,
batch_source=FileSource(
path="data/user_features.parquet",
event_timestamp_column="timestamp"
)
)
# Initialize feature store
fs = FeatureStore(repo_path=".")
# Get features for training (offline)
training_df = fs.get_historical_features(
entity_df=entity_df,
features=[
"user_features:age",
"user_features:total_purchases",
"user_features:avg_purchase_value"
]
).to_df()
# Get features for serving (online)
online_features = fs.get_online_features(
features=[
"user_features:age",
"user_features:total_purchases"
],
entity_rows=[{"user_id": 123}]
).to_dict()
# Custom feature store implementation
class SimpleFeatureStore:
def __init__(self, redis_client, s3_client):
self.redis = redis_client # Online store
self.s3 = s3_client # Offline store
def get_online_features(self, entity_id, feature_names):
features = {}
for feature in feature_names:
key = f"{entity_id}:{feature}"
features[feature] = self.redis.get(key)
return features
def write_features(self, entity_id, features):
# Write to online store
for feature_name, value in features.items():
key = f"{entity_id}:{feature_name}"
self.redis.set(key, value, ex=86400) # 24h TTL
# Write to offline store for training
self.s3.put_object(
Bucket='features',
Key=f'{entity_id}/features.json',
Body=json.dumps(features)
)Rarity: Medium Difficulty: Hard
7. How do you implement model versioning and experiment tracking?
Answer: Track experiments to reproduce results and compare models.
# MLflow for experiment tracking
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
# Set experiment
mlflow.set_experiment("model_comparison")
# Track experiment
with mlflow.start_run(run_name="random_forest_v1"):
# Log parameters
params = {
'n_estimators': 100,
'max_depth': 10,
'min_samples_split': 2
}
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Log metrics
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
mlflow.log_metrics({
'train_accuracy': train_score,
'test_accuracy': test_score
})
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("feature_importance.png")
# Tag run
mlflow.set_tags({
'model_type': 'random_forest',
'dataset_version': 'v2.0'
})
# Load best model
best_run = mlflow.search_runs(
experiment_ids=['1'],
order_by=['metrics.test_accuracy DESC'],
max_results=1
)
model_uri = f"runs:/{best_run.iloc[0].run_id}/model"
loaded_model = mlflow.sklearn.load_model(model_uri)
# Weights & Biases alternative
import wandb
wandb.init(project="ml-project", name="experiment-1")
# Log hyperparameters
wandb.config.update({
'learning_rate': 0.001,
'batch_size': 32,
'epochs': 10
})
# Log metrics during training
for epoch in range(10):
# Training code
wandb.log({
'epoch': epoch,
'train_loss': train_loss,
'val_loss': val_loss,
'accuracy': accuracy
})
# Log model
wandb.save('model.h5')
# DVC for data and model versioning
"""
# Initialize DVC
dvc init
# Track data
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Add training data"
# Track model
dvc add models/model.pkl
git add models/model.pkl.dvc
git commit -m "Add trained model v1"
# Push to remote storage
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push
"""Rarity: Very Common Difficulty: Medium
8. How do you deploy models on Kubernetes?
Answer: Kubernetes orchestrates containerized ML services.
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-model-deployment
spec:
replicas: 3
selector:
matchLabels:
app: ml-model
template:
metadata:
labels:
app: ml-model
spec:
containers:
- name: ml-model
image: ml-model:v1
ports:
- containerPort: 5000
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
env:
- name: MODEL_PATH
value: "/models/model.pkl"
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 5000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: ml-model-service
spec:
selector:
app: ml-model
ports:
- protocol: TCP
port: 80
targetPort: 5000
type: LoadBalancer
---
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ml-model-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ml-model-deployment
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70# app.py with health checks
from flask import Flask, request, jsonify
import joblib
import logging
app = Flask(__name__)
model = None
@app.route('/health')
def health():
"""Liveness probe"""
return jsonify({'status': 'healthy'}), 200
@app.route('/ready')
def ready():
"""Readiness probe"""
if model is not None:
return jsonify({'status': 'ready'}), 200
return jsonify({'status': 'not ready'}), 503
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.get_json()
prediction = model.predict([data['features']])
return jsonify({'prediction': int(prediction[0])})
except Exception as e:
logging.error(f"Prediction error: {e}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
model = joblib.load('/models/model.pkl')
app.run(host='0.0.0.0', port=5000)Rarity: Common Difficulty: Hard
9. What is model drift and how do you detect it?
Answer: Model drift occurs when model performance degrades over time.
- Types:
- Data Drift: Input distribution changes
- Concept Drift: Relationship between X and y changes
- Detection:
- Statistical tests (KS test, PSI)
- Performance monitoring
- Distribution comparison
import numpy as np
from scipy import stats
from sklearn.metrics import accuracy_score
class DriftDetector:
def __init__(self, reference_data, threshold=0.05):
self.reference_data = reference_data
self.threshold = threshold
self.reference_mean = np.mean(reference_data, axis=0)
self.reference_std = np.std(reference_data, axis=0)
def detect_data_drift(self, new_data):
"""Kolmogorov-Smirnov test for each feature"""
drift_detected = []
for i in range(new_data.shape[1]):
statistic, p_value = stats.ks_2samp(
self.reference_data[:, i],
new_data[:, i]
)
if p_value < self.threshold:
drift_detected.append({
'feature': i,
'p_value': p_value,
'statistic': statistic
})
return drift_detected
def calculate_psi(self, expected, actual, buckets=10):
"""Population Stability Index"""
def scale_range(x, min_val, max_val):
return (x - min_val) / (max_val - min_val)
min_val = min(expected.min(), actual.min())
max_val = max(expected.max(), actual.max())
expected_scaled = scale_range(expected, min_val, max_val)
actual_scaled = scale_range(actual, min_val, max_val)
expected_percents = np.histogram(expected_scaled, bins=buckets)[0] / len(expected)
actual_percents = np.histogram(actual_scaled, bins=buckets)[0] / len(actual)
psi = np.sum((actual_percents - expected_percents) *
np.log((actual_percents + 1e-10) / (expected_percents + 1e-10)))
return psi
class PerformanceMonitor:
def __init__(self, model, window_size=1000):
self.model = model
self.window_size = window_size
self.predictions = []
self.actuals = []
self.accuracies = []
def log_prediction(self, X, y_true):
y_pred = self.model.predict(X)
self.predictions.extend(y_pred)
self.actuals.extend(y_true)
# Calculate rolling accuracy
if len(self.predictions) >= self.window_size:
recent_preds = self.predictions[-self.window_size:]
recent_actuals = self.actuals[-self.window_size:]
accuracy = accuracy_score(recent_actuals, recent_preds)
self.accuracies.append(accuracy)
# Alert if performance drops
if len(self.accuracies) > 10:
recent_avg = np.mean(self.accuracies[-10:])
baseline_avg = np.mean(self.accuracies[:10])
if recent_avg < baseline_avg * 0.9: # 10% drop
print(f"ALERT: Performance dropped from {baseline_avg:.3f} to {recent_avg:.3f}")
return True
return False
# Usage
detector = DriftDetector(X_train)
drift = detector.detect_data_drift(X_production)
if drift:
print(f"Data drift detected in {len(drift)} features")
# Trigger retrainingRarity: Common Difficulty: Hard
10. How do you implement A/B testing for ML models?
Answer: A/B testing compares model versions in production.
import random
import hashlib
from datetime import datetime
class ABTestFramework:
def __init__(self, model_a, model_b, traffic_split=0.5):
self.model_a = model_a
self.model_b = model_b
self.traffic_split = traffic_split
self.results_a = []
self.results_b = []
def get_variant(self, user_id):
"""Consistent assignment based on user_id"""
hash_value = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
return 'A' if (hash_value % 100) < (self.traffic_split * 100) else 'B'
def predict(self, user_id, features):
variant = self.get_variant(user_id)
if variant == 'A':
prediction = self.model_a.predict([features])[0]
model_version = 'A'
else:
prediction = self.model_b.predict([features])[0]
model_version = 'B'
# Log prediction
self.log_prediction(user_id, features, prediction, model_version)
return prediction, model_version
def log_prediction(self, user_id, features, prediction, variant):
log_entry = {
'timestamp': datetime.now(),
'user_id': user_id,
'variant': variant,
'prediction': prediction
}
if variant == 'A':
self.results_a.append(log_entry)
else:
self.results_b.append(log_entry)
def log_outcome(self, user_id, actual_value):
"""Log actual outcome for analysis"""
# Find prediction in logs and update
pass
def analyze_results(self):
"""Statistical analysis of A/B test"""
from scipy import stats
# Calculate conversion rates
conversions_a = sum(1 for r in self.results_a if r.get('converted'))
conversions_b = sum(1 for r in self.results_b if r.get('converted'))
rate_a = conversions_a / len(self.results_a)
rate_b = conversions_b / len(self.results_b)
# Statistical significance test
statistic, p_value = stats.chi2_contingency([
[conversions_a, len(self.results_a) - conversions_a],
[conversions_b, len(self.results_b) - conversions_b]
])[:2]
return {
'variant_a_rate': rate_a,
'variant_b_rate': rate_b,
'lift': (rate_b - rate_a) / rate_a * 100,
'p_value': p_value,
'significant': p_value < 0.05
}
# Usage
ab_test = ABTestFramework(model_v1, model_v2, traffic_split=0.5)
# Make predictions
for user_id, features in requests:
prediction, variant = ab_test.predict(user_id, features)
# Analyze after collecting data
results = ab_test.analyze_results()
print(f"Variant B lift: {results['lift']:.2f}%")
print(f"Statistically significant: {results['significant']}")Rarity: Common Difficulty: Hard
System Design & Architecture (3 Questions)
11. Design a recommendation system architecture.
Answer: Recommendation systems require real-time serving and batch processing.
Components:
- Data Pipeline: Kafka for streaming events
- Feature Store: Online/offline features
- Training: Batch training (daily/weekly)
- Serving: Low-latency predictions (under 100ms)
- Caching: Redis for popular items
- Fallback: Rule-based recommendations
# Simplified recommendation system
class RecommendationSystem:
def __init__(self, model, feature_store, cache):
self.model = model
self.feature_store = feature_store
self.cache = cache
def get_recommendations(self, user_id, num_items=10):
# Check cache first
cache_key = f"recs:{user_id}"
cached = self.cache.get(cache_key)
if cached:
return cached
# Get user features
user_features = self.feature_store.get_online_features(
entity_id=user_id,
feature_names=['age', 'preferences', 'history']
)
# Get candidate items
candidates = self.get_candidates(user_id)
# Score candidates
scores = []
for item_id in candidates:
item_features = self.feature_store.get_online_features(
entity_id=item_id,
feature_names=['category', 'popularity', 'price']
)
features = {**user_features, **item_features}
score = self.model.predict([features])[0]
scores.append((item_id, score))
# Sort and return top N
recommendations = sorted(scores, key=lambda x: x[1], reverse=True)[:num_items]
# Cache results
self.cache.set(cache_key, recommendations, ex=3600)
return recommendations
def get_candidates(self, user_id):
# Candidate generation (collaborative filtering, content-based, etc.)
return candidate_itemsRarity: Medium Difficulty: Hard
12. How do you handle model serving at scale?
Answer: Serving millions of predictions requires careful architecture.
- Strategies:
- Load balancing
- Auto-scaling
- Model caching
- Batch prediction
- Model optimization
# Model serving with TensorFlow Serving
"""
# Start TensorFlow Serving
docker run -p 8501:8501 \
--mount type=bind,source=/path/to/model,target=/models/my_model \
-e MODEL_NAME=my_model \
tensorflow/serving
# Client code
import requests
import json
data = json.dumps({
"signature_name": "serving_default",
"instances": [[1.0, 2.0, 3.0, 4.0]]
})
headers = {"content-type": "application/json"}
response = requests.post(
'http://localhost:8501/v1/models/my_model:predict',
data=data,
headers=headers
)
predictions = response.json()['predictions']
"""
# Custom serving with batching
import asyncio
from collections import deque
import time
class BatchedModelServer:
def __init__(self, model, max_batch_size=32, max_wait_ms=100):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms / 1000
self.queue = deque()
self.processing = False
async def predict(self, features):
# Add to queue
future = asyncio.Future()
self.queue.append((features, future))
# Start processing if not already
if not self.processing:
asyncio.create_task(self.process_batch())
# Wait for result
return await future
async def process_batch(self):
self.processing = True
while self.queue:
# Wait for batch to fill or timeout
start_time = time.time()
while len(self.queue) < self.max_batch_size:
if time.time() - start_time > self.max_wait_ms:
break
await asyncio.sleep(0.001)
# Get batch
batch_size = min(len(self.queue), self.max_batch_size)
batch = [self.queue.popleft() for _ in range(batch_size)]
# Process batch
features = [item[0] for item in batch]
futures = [item[1] for item in batch]
predictions = self.model.predict(features)
# Return results
for future, prediction in zip(futures, predictions):
future.set_result(prediction)
self.processing = FalseRarity: Common Difficulty: Hard
13. How do you ensure model reproducibility?
Answer: Reproducibility enables debugging and compliance.
- Best Practices:
- Version control (code, data, models)
- Seed fixing
- Environment management (Docker)
- Experiment tracking
- Data lineage
import random
import numpy as np
import torch
def set_seed(seed=42):
"""Set all random seeds for reproducibility"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# requirements.txt with exact versions
"""
numpy==1.21.0
scikit-learn==0.24.2
torch==1.9.0
"""
# Docker for environment
"""
FROM python:3.9
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "train.py"]
"""
# Track everything with MLflow
import mlflow
with mlflow.start_run():
# Log code version
mlflow.log_param("git_commit", get_git_commit())
# Log data version
mlflow.log_param("data_version", "v1.2.3")
# Log environment
mlflow.log_artifact("requirements.txt")
# Log model
mlflow.sklearn.log_model(model, "model")Rarity: Common Difficulty: Medium




