シニア機械学習エンジニア面接質問:本番ML対策

Milad Bonakdar
著者
シニアMLエンジニア面接では、システム設計、MLOps、分散学習、レイテンシ、監視、トレードオフなど本番運用の判断力が問われます。実践的な回答を準備しましょう。
はじめに
シニア機械学習エンジニアの面接では、本番運用での判断力が問われます。十分な精度、低いレイテンシ、観測可能性、再現性、リリース後の保守性を備えたMLシステムを設計できるかが重要です。MLOps、MLシステム設計、モデルサービング、分散学習、特徴量パイプライン、ドリフト、実験設計に関する質問を想定しましょう。
このガイドでは、単にツール名を並べるのではなく、トレードオフを説明する回答を練習します。強いシニア回答は、要件と指標から始め、データ、特徴量、学習、デプロイ、監視、ロールバックまでつなげて説明します。
分散トレーニングとスケーラビリティ(5つの質問)
1. 深層学習モデルの分散トレーニングをどのように実装しますか?
回答: 分散トレーニングは、複数のGPU/マシンに計算を並列化します。
- 戦略:
- データ並列処理: 同じモデル、異なるデータバッチ
- モデル並列処理: デバイス間でモデルを分割
- パイプライン並列処理: モデルをステージに分割
- フレームワーク: PyTorch DDP、Horovod、TensorFlow MirroredStrategy
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
# 分散トレーニングの初期化
def setup_distributed(rank, world_size):
dist.init_process_group(
backend='nccl', # CPUの場合は'gloo'を使用
init_method='env://',
world_size=world_size,
rank=rank
)
# モデルのセットアップ
class MyModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(784, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
return self.layers(x)
def train_distributed(rank, world_size):
setup_distributed(rank, world_size)
# モデルを作成してGPUに移動
model = MyModel().to(rank)
model = DDP(model, device_ids=[rank])
# 分散サンプラーを作成
train_dataset = MyDataset()
train_sampler = DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=rank
)
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=32,
sampler=train_sampler
)
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
for epoch in range(10):
train_sampler.set_epoch(epoch) # エポックごとに異なるシャッフル
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(rank), target.to(rank)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
if rank == 0 and batch_idx % 100 == 0:
print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
dist.destroy_process_group()
# torch.multiprocessingで起動
import torch.multiprocessing as mp
if __name__ == '__main__':
world_size = torch.cuda.device_count()
mp.spawn(train_distributed, args=(world_size,), nprocs=world_size)
# TensorFlow分散トレーニング
import tensorflow as tf
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(512, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
# model.fitは自動的にトレーニングを分散
# model.fit(train_dataset, epochs=10)希少性: 一般的 難易度: 難しい
2. 勾配累積について説明し、いつ使用すべきかを説明してください。
回答: 勾配累積は、GPUメモリが限られている場合に、より大きなバッチサイズをシミュレートします。
- 仕組み: 重みを更新する前に、複数のフォワードパスにわたって勾配を累積
- ユースケース: 大規模モデル、限られたGPUメモリ、安定したトレーニング
import torch
import torch.nn as nn
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
# 実効バッチサイズ = batch_size * accumulation_steps
batch_size = 8
accumulation_steps = 4 # 実効バッチサイズ = 32
model.train()
optimizer.zero_grad()
for batch_idx, (data, target) in enumerate(train_loader):
# フォワードパス
output = model(data)
loss = criterion(output, target)
# 累積ステップで損失を正規化
loss = loss / accumulation_steps
# バックワードパス(勾配を累積)
loss.backward()
# accumulation_stepsごとに重みを更新
if (batch_idx + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
# 勾配累積による混合精度トレーニング
from torch.amp import autocast, GradScaler
scaler = GradScaler("cuda")
optimizer.zero_grad()
for batch_idx, (data, target) in enumerate(train_loader):
with autocast("cuda"):
output = model(data)
loss = criterion(output, target) / accumulation_steps
scaler.scale(loss).backward()
if (batch_idx + 1) % accumulation_steps == 0:
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()希少性: 一般的 難易度: 普通
3. モデル推論のレイテンシをどのように最適化しますか?
回答: 複数の手法で推論時間を短縮できます。
- モデルの最適化:
- 量子化(INT8、FP16)
- プルーニング(重みの削除)
- 知識蒸留
- モデルコンパイル(TorchScript、ONNX)
- サービングの最適化:
- バッチ処理
- キャッシング
- モデル並列処理
- ハードウェアアクセラレーション(GPU、TPU)
import torch
import torch.nn as nn
# 1. 量子化(INT8)
model = MyModel()
model.eval()
# 動的量子化
quantized_model = torch.quantization.quantize_dynamic(
model, {nn.Linear}, dtype=torch.qint8
)
# 静的量子化(より正確)
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# 代表的なデータでキャリブレーション
# for data in calibration_loader:
# model(data)
torch.quantization.convert(model, inplace=True)
# 2. TorchScriptコンパイル
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')
# 3. ONNXエクスポート
dummy_input = torch.randn(1, 784)
torch.onnx.export(
model,
dummy_input,
'model.onnx',
input_names=['input'],
output_names=['output'],
dynamic_axes={'input': {0: 'batch_size'}}
)
# 4. TensorRT最適化(NVIDIA)
import tensorrt as trt
# 5. プルーニング
import torch.nn.utils.prune as prune
# 線形レイヤーの重みの30%をプルーニング
prune.l1_unstructured(model.fc1, name='weight', amount=0.3)
# プルーニングを永続化
prune.remove(model.fc1, 'weight')
# 6. 知識蒸留
class DistillationLoss(nn.Module):
def __init__(self, temperature=3.0):
super().__init__()
self.temperature = temperature
self.kl_div = nn.KLDivLoss(reduction='batchmean')
def forward(self, student_logits, teacher_logits, labels):
# 教師からのソフトターゲット
soft_loss = self.kl_div(
F.log_softmax(student_logits / self.temperature, dim=1),
F.softmax(teacher_logits / self.temperature, dim=1)
) * (self.temperature ** 2)
# ハードターゲット
hard_loss = F.cross_entropy(student_logits, labels)
return 0.5 * soft_loss + 0.5 * hard_loss
# 7. 推論のバッチ処理
class BatchPredictor:
def __init__(self, model, max_batch_size=32, max_wait_time=0.1):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.queue = []
async def predict(self, x):
self.queue.append(x)
if len(self.queue) >= self.max_batch_size:
return await self._process_batch()
# より多くのリクエストを待つか、タイムアウト
await asyncio.sleep(self.max_wait_time)
return await self._process_batch()
async def _process_batch(self):
batch = torch.stack(self.queue)
self.queue = []
return self.model(batch)希少性: 非常に一般的 難易度: 難しい
4. 混合精度トレーニングとは何ですか?また、どのように機能しますか?
回答: 混合精度は、FP16とFP32を使用して、精度を維持しながらトレーニングを高速化します。
- 利点:
- 適したハードウェアでの学習高速化
- メモリ使用量の削減
- より大きなバッチサイズ
- 課題:
- 数値的安定性
- 勾配のアンダーフロー
- 解決策: 勾配スケーリング
import torch
from torch.amp import autocast, GradScaler
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler("cuda")
for epoch in range(10):
for data, target in train_loader:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
# FP16でのフォワードパス
with autocast("cuda"):
output = model(data)
loss = criterion(output, target)
# 勾配スケーリングによるバックワードパス
scaler.scale(loss).backward()
# 勾配をアン スケールしてクリップ
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 重みを更新
scaler.step(optimizer)
scaler.update()
# TensorFlow混合精度
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)
# モデルは自動的にFP16を計算に使用
model = tf.keras.Sequential([
tf.keras.layers.Dense(512, activation='relu'),
tf.keras.layers.Dense(10)
])
# 損失スケーリングは自動的に処理
optimizer = tf.keras.optimizers.Adam()
optimizer = mixed_precision.LossScaleOptimizer(optimizer)希少性: 一般的 難易度: 普通
5. データパイプラインのボトルネックをどのように処理しますか?
回答: データロードは、トレーニングのボトルネックになることがよくあります。以下を使用して最適化します。
- プリフェッチ: トレーニング中に次のバッチをロード
- 並列ロード: 複数のワーカー
- キャッシング: 前処理されたデータを保存
- データ形式: 効率的な形式を使用(TFRecord、Parquet)
import torch
from torch.utils.data import DataLoader, Dataset
import multiprocessing as mp
# 効率的なDataLoader構成
train_loader = DataLoader(
dataset,
batch_size=32,
num_workers=mp.cpu_count(), # 並列ロード
pin_memory=True, # より高速なGPU転送
prefetch_factor=2, # バッチのプリフェッチ
persistent_workers=True # ワーカーを維持
)
# キャッシュ付きのカスタムデータセット
class CachedDataset(Dataset):
def __init__(self, data_path, cache_size=1000):
self.data_path = data_path
self.cache = {}
self.cache_size = cache_size
def __getitem__(self, idx):
if idx in self.cache:
return self.cache[idx]
# ロードして前処理
data = load_and_preprocess(self.data_path, idx)
# スペースがあればキャッシュ
if len(self.cache) < self.cache_size:
self.cache[idx] = data
return data
# TensorFlowデータパイプラインの最適化
import tensorflow as tf
dataset = tf.data.Dataset.from_tensor_slices((X, y))
dataset = dataset.cache() # メモリにキャッシュ
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE) # 自動プリフェッチ
dataset = dataset.map(
preprocess_function,
num_parallel_calls=tf.data.AUTOTUNE
)
# 大規模データセットにTFRecordを使用
def create_tfrecord(data, labels, filename):
with tf.io.TFRecordWriter(filename) as writer:
for x, y in zip(data, labels):
example = tf.train.Example(features=tf.train.Features(feature={
'data': tf.train.Feature(float_list=tf.train.FloatList(value=x)),
'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[y]))
}))
writer.write(example.SerializeToString())希少性: 一般的 難易度: 普通
MLOpsとインフラストラクチャ(5つの質問)
6. 特徴量ストアをどのように設計しますか?
回答: 特徴量ストアは、特徴量エンジニアリングとサービングを一元化します。
- コンポーネント:
- オフラインストア: トレーニング用の履歴特徴量(S3、BigQuery)
- オンラインストア: サービング用の低レイテンシ特徴量(Redis、DynamoDB)
- 特徴量レジストリ: メタデータとリネージ
- 利点:
- 再利用性
- 一貫性(トレーニング/サービング)
- モニタリング
# Feast(オープンソースの特徴量ストア)の例
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.value_type import ValueType
from datetime import timedelta
# エンティティの定義
user = Entity(
name="user_id",
value_type=ValueType.INT64,
description="ユーザーID"
)
# 特徴量ビューの定義
user_features = FeatureView(
name="user_features",
entities=["user_id"],
ttl=timedelta(days=1),
features=[
Feature(name="age", dtype=ValueType.INT64),
Feature(name="total_purchases", dtype=ValueType.FLOAT),
Feature(name="avg_purchase_value", dtype=ValueType.FLOAT),
],
online=True,
batch_source=FileSource(
path="data/user_features.parquet",
event_timestamp_column="timestamp"
)
)
# 特徴量ストアの初期化
fs = FeatureStore(repo_path=".")
# トレーニング用の特徴量の取得(オフライン)
training_df = fs.get_historical_features(
entity_df=entity_df,
features=[
"user_features:age",
"user_features:total_purchases",
"user_features:avg_purchase_value"
]
).to_df()
# サービング用の特徴量の取得(オンライン)
online_features = fs.get_online_features(
features=[
"user_features:age",
"user_features:total_purchases"
],
entity_rows=[{"user_id": 123}]
).to_dict()
# カスタム特徴量ストアの実装
class SimpleFeatureStore:
def __init__(self, redis_client, s3_client):
self.redis = redis_client # オンラインストア
self.s3 = s3_client # オフラインストア
def get_online_features(self, entity_id, feature_names):
features = {}
for feature in feature_names:
key = f"{entity_id}:{feature}"
features[feature] = self.redis.get(key)
return features
def write_features(self, entity_id, features):
# オンラインストアに書き込み
for feature_name, value in features.items():
key = f"{entity_id}:{feature_name}"
self.redis.set(key, value, ex=86400) # 24時間のTTL
# トレーニング用にオフラインストアに書き込み
self.s3.put_object(
Bucket='features',
Key=f'{entity_id}/features.json',
Body=json.dumps(features)
)希少性: 普通 難易度: 難しい
7. モデルのバージョン管理と実験の追跡をどのように実装しますか?
回答: 結果を再現し、モデルを比較するために実験を追跡します。
# 実験追跡のためのMLflow
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
# 実験の設定
mlflow.set_experiment("model_comparison")
# 実験の追跡
with mlflow.start_run(run_name="random_forest_v1"):
# パラメータのログ
params = {
'n_estimators': 100,
'max_depth': 10,
'min_samples_split': 2
}
mlflow.log_params(params)
# モデルのトレーニング
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# メトリクスのログ
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
mlflow.log_metrics({
'train_accuracy': train_score,
'test_accuracy': test_score
})
# モデルのログ
mlflow.sklearn.log_model(model, "model")
# アーティファクトのログ
mlflow.log_artifact("feature_importance.png")
# 実行のタグ付け
mlflow.set_tags({
'model_type': 'random_forest',
'dataset_version': 'v2.0'
})
# 最適なモデルのロード
best_run = mlflow.search_runs(
experiment_ids=['1'],
order_by=['metrics.test_accuracy DESC'],
max_results=1
)
model_uri = f"runs:/{best_run.iloc[0].run_id}/model"
loaded_model = mlflow.sklearn.load_model(model_uri)
# Weights & Biasesの代替
import wandb
wandb.init(project="ml-project", name="experiment-1")
# ハイパーパラメータのログ
wandb.config.update({
'learning_rate': 0.001,
'batch_size': 32,
'epochs': 10
})
# トレーニング中のメトリクスのログ
for epoch in range(10):
# トレーニングコード
wandb.log({
'epoch': epoch,
'train_loss': train_loss,
'val_loss': val_loss,
'accuracy': accuracy
})
# モデルのログ
wandb.save('model.h5')
# データとモデルのバージョン管理のためのDVC
"""
# DVCの初期化
dvc init
# データの追跡
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "トレーニングデータの追加"
# モデルの追跡
dvc add models/model.pkl
git add models/model.pkl.dvc
git commit -m "トレーニング済みモデルv1の追加"
# リモートストレージへのプッシュ
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push
"""希少性: 非常に一般的 難易度: 普通
8. Kubernetesでモデルをどのようにデプロイしますか?
回答: Kubernetesは、コンテナ化されたMLサービスをオーケストレーションします。
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-model-deployment
spec:
replicas: 3
selector:
matchLabels:
app: ml-model
template:
metadata:
labels:
app: ml-model
spec:
containers:
- name: ml-model
image: ml-model:v1
ports:
- containerPort: 5000
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
env:
- name: MODEL_PATH
value: "/models/model.pkl"
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 5000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: ml-model-service
spec:
selector:
app: ml-model
ports:
- protocol: TCP
port: 80
targetPort: 5000
type: LoadBalancer
---
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ml-model-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ml-model-deployment
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70# ヘルスチェック付きのapp.py
from flask import Flask, request, jsonify
import joblib
import logging
app = Flask(__name__)
model = None
@app.route('/health')
def health():
"""Livenessプローブ"""
return jsonify({'status': 'healthy'}), 200
@app.route('/ready')
def ready():
"""Readinessプローブ"""
if model is not None:
return jsonify({'status': 'ready'}), 200
return jsonify({'status': 'not ready'}), 503
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.get_json()
prediction = model.predict([data['features']])
return jsonify({'prediction': int(prediction[0])})
except Exception as e:
logging.error(f"Prediction error: {e}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
model = joblib.load('/models/model.pkl')
app.run(host='0.0.0.0', port=5000)希少性: 一般的 難易度: 難しい
9. モデルドリフトとは何ですか?また、どのように検出しますか?
回答: モデルドリフトは、モデルのパフォーマンスが時間とともに低下するときに発生します。
- 種類:
- データドリフト: 入力分布の変化
- コンセプトドリフト: Xとyの関係の変化
- 検出:
- 統計テスト(KSテスト、PSI)
- パフォーマンスモニタリング
- 分布の比較
import numpy as np
from scipy import stats
from sklearn.metrics import accuracy_score
class DriftDetector:
def __init__(self, reference_data, threshold=0.05):
self.reference_data = reference_data
self.threshold = threshold
self.reference_mean = np.mean(reference_data, axis=0)
self.reference_std = np.std(reference_data, axis=0)
def detect_data_drift(self, new_data):
"""各特徴量のKolmogorov-Smirnovテスト"""
drift_detected = []
for i in range(new_data.shape[1]):
statistic, p_value = stats.ks_2samp(
self.reference_data[:, i],
new_data[:, i]
)
if p_value < self.threshold:
drift_detected.append({
'feature': i,
'p_value': p_value,
'statistic': statistic
})
return drift_detected
def calculate_psi(self, expected, actual, buckets=10):
"""Population Stability Index"""
def scale_range(x, min_val, max_val):
return (x - min_val) / (max_val - min_val)
min_val = min(expected.min(), actual.min())
max_val = max(expected.max(), actual.max())
expected_scaled = scale_range(expected, min_val, max_val)
actual_scaled = scale_range(actual, min_val, max_val)
expected_percents = np.histogram(expected_scaled, bins=buckets)[0] / len(expected)
actual_percents = np.histogram(actual_scaled, bins=buckets)[0] / len(actual)
psi = np.sum((actual_percents - expected_percents) *
np.log((actual_percents + 1e-10) / (expected_percents + 1e-10)))
return psi
class PerformanceMonitor:
def __init__(self, model, window_size=1000):
self.model = model
self.window_size = window_size
self.predictions = []
self.actuals = []
self.accuracies = []
def log_prediction(self, X, y_true):
y_pred = self.model.predict(X)
self.predictions.extend(y_pred)
self.actuals.extend(y_true)
# ローリング精度を計算
if len(self.predictions) >= self.window_size:
recent_preds = self.predictions[-self.window_size:]
recent_actuals = self.actuals[-self.window_size:]
accuracy = accuracy_score(recent_actuals, recent_preds)
self.accuracies.append(accuracy)
# パフォーマンスが低下した場合に警告
if len(self.accuracies) > 10:
recent_avg = np.mean(self.accuracies[-10:])
baseline_avg = np.mean(self.accuracies[:10])
if recent_avg < baseline_avg * 0.9: # 10%低下
print(f"ALERT: パフォーマンスが{baseline_avg:.3f}から{recent_avg:.3f}に低下しました")
return True
return False
# 使い方
detector = DriftDetector(X_train)
drift = detector.detect_data_drift(X_production)
if drift:
print(f"データドリフトが{len(drift)}個の特徴量で検出されました")
# 再トレーニングをトリガー希少性: 一般的 難易度: 難しい
10. MLモデルのA/Bテストをどのように実装しますか?
回答: A/Bテストは、本番環境でモデルバージョンを比較します。
import random
import hashlib
from datetime import datetime
class ABTestFramework:
def __init__(self, model_a, model_b, traffic_split=0.5):
self.model_a = model_a
self.model_b = model_b
self.traffic_split = traffic_split
self.results_a = []
self.results_b = []
def get_variant(self, user_id):
"""user_idに基づく一貫した割り当て"""
hash_value = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
return 'A' if (hash_value % 100) < (self.traffic_split * 100) else 'B'
def predict(self, user_id, features):
variant = self.get_variant(user_id)
if variant == 'A':
prediction = self.model_a.predict([features])[0]
model_version = 'A'
else:
prediction = self.model_b.predict([features])[0]
model_version = 'B'
# 予測のログ
self.log_prediction(user_id, features, prediction, model_version)
return prediction, model_version
def log_prediction(self, user_id, features, prediction, variant):
log_entry = {
'timestamp': datetime.now(),
'user_id': user_id,
'variant': variant,
'prediction': prediction
}
if variant == 'A':
self.results_a.append(log_entry)
else:
self.results_b.append(log_entry)
def log_outcome(self, user_id, actual_value):
"""分析のために実際の結果をログに記録"""
# ログで予測を見つけて更新
pass
def analyze_results(self):
"""A/Bテストの統計分析"""
from scipy import stats
# コンバージョン率を計算
conversions_a = sum(1 for r in self.results_a if r.get('converted'))
conversions_b = sum(1 for r in self.results_b if r.get('converted'))
rate_a = conversions_a / len(self.results_a)
rate_b = conversions_b / len(self.results_b)
# 統計的有意性テスト
statistic, p_value = stats.chi2_contingency([
[conversions_a, len(self.results_a) - conversions_a],
[conversions_b, len(self.results_b) - conversions_b]
])[:2]
return {
'variant_a_rate': rate_a,
'variant_b_rate': rate_b,
'lift': (rate_b - rate_a) / rate_a * 100,
'p_value': p_value,
'significant': p_value < 0.05
}
# 使い方
ab_test = ABTestFramework(model_v1, model_v2, traffic_split=0.5)
# 予測を行う
for user_id, features in requests:
prediction, variant = ab_test.predict(user_id, features)
# データを収集した後で分析
results = ab_test.analyze_results()
print(f"バリアントBのリフト:{results['lift']:.2f}%")
print(f"統計的に有意:{results['significant']}")希少性: 一般的


