初级机器学习工程师面试题:完整指南

Milad Bonakdar
作者
掌握机器学习工程基础知识,包括Python、机器学习算法、模型训练、部署基础和MLOps等方面的必备面试题,为初级机器学习工程师做好准备。
介绍
机器学习工程师负责构建、部署和维护生产环境中的机器学习系统。初级机器学习工程师应具备扎实的编程技能、对机器学习算法的理解、使用机器学习框架的经验以及部署实践的知识。
本指南涵盖了初级机器学习工程师的必备面试题。我们将探讨 Python 编程、机器学习算法、模型训练与评估、部署基础知识和 MLOps 基础,以帮助你为你的第一份机器学习工程角色做好准备。
Python & 编程 (5 题)
1. 如何处理无法装入内存的大型数据集?
答案: 有几种技术可以处理大于可用 RAM 的数据:
- 批量处理: 分块处理数据
- 生成器: 按需生成数据
- Dask/Ray: 分布式计算框架
- 数据库查询: 仅加载所需数据
- 内存映射文件: 像在内存中一样访问磁盘
- 数据流: 在数据到达时处理数据
import pandas as pd
import numpy as np
# 不好:将整个数据集加载到内存中
# df = pd.read_csv('large_file.csv') # 可能会崩溃
# 好:分块读取
chunk_size = 10000
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
# 处理每个块
processed = chunk[chunk['value'] > 0]
# 保存或聚合结果
processed.to_csv('output.csv', mode='a', header=False)
# 使用生成器
def data_generator(filename, batch_size=32):
while True:
batch = []
with open(filename, 'r') as f:
for line in f:
batch.append(process_line(line))
if len(batch) == batch_size:
yield np.array(batch)
batch = []
# 使用 Dask 进行分布式计算
import dask.dataframe as dd
ddf = dd.read_csv('large_file.csv')
result = ddf.groupby('category').mean().compute()稀有度: 非常常见 难度: 中等
2. 解释 Python 中的装饰器,并给出一个 ML 用例。
答案: 装饰器可以修改或增强函数,而无需更改其代码。
- ML 中的用例:
- 计时函数执行
- 记录预测
- 缓存结果
- 输入验证
import time
import functools
# 计时装饰器
def timer(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} took {end - start:.2f} seconds")
return result
return wrapper
# 日志装饰器
def log_predictions(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
predictions = func(*args, **kwargs)
print(f"Made {len(predictions)} predictions")
print(f"Prediction distribution: {np.bincount(predictions)}")
return predictions
return wrapper
# 用法
@timer
@log_predictions
def predict_batch(model, X):
return model.predict(X)
# 缓存装饰器 (memoization)
def cache_results(func):
cache = {}
@functools.wraps(func)
def wrapper(*args):
if args not in cache:
cache[args] = func(*args)
return cache[args]
return wrapper
@cache_results
def expensive_feature_engineering(data_id):
# 昂贵的计算
return processed_features稀有度: 常见 难度: 中等
3. @staticmethod 和 @classmethod 有什么区别?
答案: 两者都定义了不需要实例的方法。
- @staticmethod: 无法访问类或实例
- @classmethod: 将类作为第一个参数接收
class MLModel:
model_type = "classifier"
def __init__(self, name):
self.name = name
# 常规方法 - 需要实例
def predict(self, X):
return self.model.predict(X)
# 静态方法 - 实用函数
@staticmethod
def preprocess_data(X):
# 无法访问 self 或 cls
return (X - X.mean()) / X.std()
# 类方法 - 工厂模式
@classmethod
def create_default(cls):
# 可以访问 cls
return cls(name=f"default_{cls.model_type}")
@classmethod
def from_config(cls, config):
return cls(name=config['name'])
# 用法
# 静态方法 - 不需要实例
processed = MLModel.preprocess_data(X_train)
# 类方法 - 创建实例
model = MLModel.create_default()
model2 = MLModel.from_config({'name': 'my_model'})稀有度: 中等 难度: 中等
4. 如何处理 ML 管道中的异常?
答案: 适当的错误处理可以防止管道失败并有助于调试。
import logging
from typing import Optional
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ModelTrainingError(Exception):
"""模型训练失败的自定义异常"""
pass
def train_model(X, y, model_type='random_forest'):
try:
logger.info(f"Starting training with {model_type}")
# 验证输入
if X.shape[0] != y.shape[0]:
raise ValueError("X 和 y 必须具有相同数量的样本")
if X.shape[0] < 100:
raise ModelTrainingError("训练数据不足")
# 训练模型
if model_type == 'random_forest':
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier()
else:
raise ValueError(f"Unknown model type: {model_type}")
model.fit(X, y)
logger.info("Training completed successfully")
return model
except ValueError as e:
logger.error(f"Validation error: {e}")
raise
except ModelTrainingError as e:
logger.error(f"Training error: {e}")
# 可以回退到更简单的模型
return train_fallback_model(X, y)
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
finally:
logger.info("Training attempt finished")
# 用于资源管理的上下文管理器
class ModelLoader:
def __init__(self, model_path):
self.model_path = model_path
self.model = None
def __enter__(self):
logger.info(f"Loading model from {self.model_path}")
self.model = load_model(self.model_path)
return self.model
def __exit__(self, exc_type, exc_val, exc_tb):
logger.info("Cleaning up resources")
if self.model:
del self.model
return False # 不要抑制异常
# 用法
with ModelLoader('model.pkl') as model:
predictions = model.predict(X_test)稀有度: 常见 难度: 中等
5. 什么是 Python 生成器,为什么它们在 ML 中很有用?
答案: 生成器一次生成一个值,从而节省内存。
- 优点:
- 内存效率高
- 惰性求值
- 无限序列
- ML 用例:
- 数据加载
- 批量处理
- 数据增强
import numpy as np
# 用于批量处理的生成器
def batch_generator(X, y, batch_size=32):
n_samples = len(X)
indices = np.arange(n_samples)
np.random.shuffle(indices)
for start_idx in range(0, n_samples, batch_size):
end_idx = min(start_idx + batch_size, n_samples)
batch_indices = indices[start_idx:end_idx]
yield X[batch_indices], y[batch_indices]
# 在训练中使用
for epoch in range(10):
for X_batch, y_batch in batch_generator(X_train, y_train):
model.train_on_batch(X_batch, y_batch)
# 数据增强生成器
def augment_images(images, labels):
for img, label in zip(images, labels):
# 原始
yield img, label
# 翻转
yield np.fliplr(img), label
# 旋转
yield np.rot90(img), label
# 用于训练的无限生成器
def infinite_batch_generator(X, y, batch_size=32):
while True:
indices = np.random.choice(len(X), batch_size)
yield X[indices], y[indices]
# 与 steps_per_epoch 一起使用
gen = infinite_batch_generator(X_train, y_train)
# model.fit(gen, steps_per_epoch=100, epochs=10)稀有度: 常见 难度: 中等
ML 算法 & 理论 (5 题)
6. 解释 bagging 和 boosting 之间的区别。
答案: 两者都是集成方法,但工作方式不同:
- Bagging (Bootstrap Aggregating):
- 在随机子集上并行训练
- 减少方差
- 示例:随机森林
- Boosting:
- 顺序训练,每个模型纠正先前的错误
- 减少偏差
- 示例:AdaBoost、Gradient Boosting、XGBoost
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
# 生成数据
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
# Bagging - 随机森林
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf_scores = cross_val_score(rf, X, y, cv=5)
print(f"Random Forest CV: {rf_scores.mean():.3f} (+/- {rf_scores.std():.3f})")
# Boosting - 梯度提升
gb = GradientBoostingClassifier(n_estimators=100, random_state=42)
gb_scores = cross_val_score(gb, X, y, cv=5)
print(f"Gradient Boosting CV: {gb_scores.mean():.3f} (+/- {gb_scores.std():.3f})")
# XGBoost (高级 boosting)
import xgboost as xgb
xgb_model = xgb.XGBClassifier(n_estimators=100, random_state=42)
xgb_scores = cross_val_score(xgb_model, X, y, cv=5)
print(f"XGBoost CV: {xgb_scores.mean():.3f} (+/- {xgb_scores.std():.3f})")稀有度: 非常常见 难度: 中等
7. 如何处理不平衡数据集?
答案: 不平衡数据可能会使模型偏向多数类。
- 技术:
- 重采样: SMOTE、欠采样
- 类别权重: 惩罚错误分类
- 集成方法: 平衡随机森林
- 评估: 使用 F1、精确率、召回率(而不是准确率)
- 阈值调整: 优化决策阈值
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from collections import Counter
# 创建不平衡数据集
X, y = make_classification(
n_samples=1000, n_features=20,
weights=[0.95, 0.05], # 95% 类别 0,5% 类别 1
random_state=42
)
print(f"Original distribution: {Counter(y)}")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
# 1. 类别权重
model_weighted = RandomForestClassifier(class_weight='balanced', random_state=42)
model_weighted.fit(X_train, y_train)
print("\nWith class weights:")
print(classification_report(y_test, model_weighted.predict(X_test)))
# 2. SMOTE (过采样少数类)
smote = SMOTE(random_state=42)
X_train_smote, y_train_smote = smote.fit_resample(X_train, y_train)
print(f"After SMOTE: {Counter(y_train_smote)}")
model_smote = RandomForestClassifier(random_state=42)
model_smote.fit(X_train_smote, y_train_smote)
print("\nWith SMOTE:")
print(classification_report(y_test, model_smote.predict(X_test)))
# 3. 阈值调整
y_proba = model_weighted.predict_proba(X_test)[:, 1]
threshold = 0.3 # 较低的阈值有利于少数类
y_pred_adjusted = (y_proba >= threshold).astype(int)
print("\nWith adjusted threshold:")
print(classification_report(y_test, y_pred_adjusted))稀有度: 非常常见 难度: 中等
8. 什么是交叉验证,为什么它很重要?
答案: 交叉验证比单一的训练-测试分割更可靠地评估模型性能。
- 类型:
- K-Fold:分成 k 折
- 分层 K-Fold:保留类别分布
- 时间序列分割:尊重时间顺序
- 优点:
- 更稳健的性能估计
- 使用所有数据进行训练和验证
- 检测过拟合
from sklearn.model_selection import (
cross_val_score, KFold, StratifiedKFold,
TimeSeriesSplit, cross_validate
)
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
# 加载数据
data = load_iris()
X, y = data.data, data.target
model = RandomForestClassifier(random_state=42)
# 标准 K-Fold
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, X, y, cv=kfold)
print(f"K-Fold scores: {scores}")
print(f"Mean: {scores.mean():.3f} (+/- {scores.std():.3f})")
# 分层 K-Fold (保留类别分布)
stratified_kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
stratified_scores = cross_val_score(model, X, y, cv=stratified_kfold)
print(f"\nStratified K-Fold: {stratified_scores.mean():.3f}")
# 时间序列分割 (用于时间数据)
tscv = TimeSeriesSplit(n_splits=5)
ts_scores = cross_val_score(model, X, y, cv=tscv)
print(f"Time Series CV: {ts_scores.mean():.3f}")
# 多个指标
cv_results = cross_validate(
model, X, y, cv=5,
scoring=['accuracy', 'precision_macro', 'recall_macro', 'f1_macro'],
return_train_score=True
)
print(f"\nAccuracy: {cv_results['test_accuracy'].mean():.3f}")
print(f"Precision: {cv_results['test_precision_macro'].mean():.3f}")
print(f"Recall: {cv_results['test_recall_macro'].mean():.3f}")
print(f"F1: {cv_results['test_f1_macro'].mean():.3f}")稀有度: 非常常见 难度: 简单
9. 解释精确率、召回率和 F1 分数。
答案: 用于评估模型性能的分类指标:
- 精确率: 在预测为正例的样本中,有多少是正确的
- 公式:TP / (TP + FP)
- 使用场景:假正例的代价很高
- 召回率: 在实际为正例的样本中,有多少被找到
- 公式:TP / (TP + FN)
- 使用场景:假负例的代价很高
- F1 分数: 精确率和召回率的调和平均值
- 公式:2 × (精确率 × 召回率) / (精确率 + 召回率)
- 使用场景:需要在精确率和召回率之间取得平衡
from sklearn.metrics import (
precision_score, recall_score, f1_score,
classification_report, confusion_matrix
)
import numpy as np
# 示例预测
y_true = np.array([0, 1, 1, 0, 1, 1, 0, 1, 0, 0])
y_pred = np.array([0, 1, 0, 0, 1, 1, 0, 1, 1, 0])
# 计算指标
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
print(f"Precision: {precision:.3f}") # 0.800
print(f"Recall: {recall:.3f}") # 0.800
print(f"F1-Score: {f1:.3f}") # 0.800
# 混淆矩阵
cm = confusion_matrix(y_true, y_pred)
print(f"\nConfusion Matrix:\n{cm}")
# [[4 1]
# [1 4]]
# 分类报告 (所有指标)
print("\nClassification Report:")
print(classification_report(y_true, y_pred))
# 权衡示例
from sklearn.metrics import precision_recall_curve
import matplotlib.pyplot as plt
# 获取概率预测
y_proba = model.predict_proba(X_test)[:, 1]
# 计算不同阈值下的精确率-召回率
precisions, recalls, thresholds = precision_recall_curve(y_test, y_proba)
# 找到最佳阈值 (最大化 F1)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-10)
optimal_idx = np.argmax(f1_scores)
optimal_threshold = thresholds[optimal_idx]
print(f"Optimal threshold: {optimal_threshold:.3f}")稀有度: 非常常见 难度: 简单
10. 什么是正则化,何时使用它?
答案: 正则化通过惩罚模型复杂性来防止过拟合。
- 类型:
- L1 (Lasso): 添加系数的绝对值
- L2 (Ridge): 添加系数的平方
- Elastic Net: 结合 L1 和 L2
- 何时使用:
- 高方差(过拟合)
- 许多特征
- 多重共线性
from sklearn.linear_model import Ridge, Lasso, ElasticNet, LinearRegression
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np
# 生成具有许多特征的数据
X, y = make_regression(
n_samples=100, n_features=50,
n_informative=10, noise=10, random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
# 无正则化
lr = LinearRegression()
lr.fit(X_train, y_train)
lr_score = r2_score(y_test, lr.predict(X_test))
print(f"Linear Regression R²: {lr_score:.3f}")
# L2 正则化 (Ridge)
ridge = Ridge(alpha=1.0)
ridge.fit(X_train, y_train)
ridge_score = r2_score(y_test, ridge.predict(X_test))
print(f"Ridge R²: {ridge_score:.3f}")
# L1 正则化 (Lasso)
lasso = Lasso(alpha=0.1)
lasso.fit(X_train, y_train)
lasso_score = r2_score(y_test, lasso.predict(X_test))
print(f"Lasso R²: {lasso_score:.3f}")
print(f"Lasso non-zero coefficients: {np.sum(lasso.coef_ != 0)}")
# Elastic Net (L1 + L2)
elastic = ElasticNet(alpha=0.1, l1_ratio=0.5)
elastic.fit(X_train, y_train)
elastic_score = r2_score(y_test, elastic.predict(X_test))
print(f"Elastic Net R²: {elastic_score:.3f}")
# Alpha 的超参数调整
from sklearn.model_selection import GridSearchCV
param_grid = {'alpha': [0.001, 0.01, 0.1, 1.0, 10.0]}
grid = GridSearchCV(Ridge(), param_grid, cv=5)
grid.fit(X_train, y_train)
print(f"\nBest alpha: {grid.best_params_['alpha']}")
print(f"Best CV score: {grid.best_score_:.3f}")稀有度: 非常常见 难度: 中等
模型训练 & 部署 (5 题)
11. 如何在生产环境中保存和加载模型?
答案: 模型序列化支持部署和重用。
import joblib
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
# 训练模型
X, y = load_iris(return_X_y=True)
model = RandomForestClassifier()
model.fit(X, y)
# 方法 1:Joblib (推荐用于 scikit-learn)
joblib.dump(model, 'model.joblib')
loaded_model = joblib.load('model.joblib')
# 方法 2:Pickle
with open('model.pkl', 'wb') as f:
pickle.dump(model, f)
with open('model.pkl', 'rb') as f:
loaded_model = pickle.load(f)
# 对于 TensorFlow/Keras
import tensorflow as tf
# 保存整个模型
# keras_model.save('model.h5')
# loaded_model = tf.keras.models.load_model('model.h5')
# 仅保存权重
# keras_model.save_weights('model_weights.h5')
# new_model.load_weights('model_weights.h5')
# 对于 PyTorch
import torch
# 保存模型状态字典
# torch.save(model.state_dict(), 'model.pth')
# model.load_state_dict(torch.load('model.pth'))
# 保存整个模型
# torch.save(model, 'model_complete.pth')
# model = torch.load('model_complete.pth')
# 模型的版本控制
import datetime
model_version = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
model_path = f'models/model_{model_version}.joblib'
joblib.dump(model, model_path)
print(f"Model saved to {model_path}")稀有度: 非常常见 难度: 简单
12. 如何为模型服务创建一个 REST API?
答案: REST API 使应用程序可以访问模型。
from flask import Flask, request, jsonify
import joblib
import numpy as np
app = Flask(__name__)
# 在启动时加载模型
model = joblib.load('model.joblib')
@app.route('/predict', methods=['POST'])
def predict():
try:
# 从请求中获取数据
data = request.get_json()
features = np.array(data['features']).reshape(1, -1)
# 进行预测
prediction = model.predict(features)
probability = model.predict_proba(features)
# 返回响应
return jsonify({
'prediction': int(prediction[0]),
'probability': probability[0].tolist(),
'status': 'success'
})
except Exception as e:
return jsonify({
'error': str(e),
'status': 'error'
}), 400
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
# FastAPI 替代方案 (现代,更快)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class PredictionRequest(BaseModel):
features: list
class PredictionResponse(BaseModel):
prediction: int
probability: list
status: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
features = np.array(request.features).reshape(1, -1)
prediction = model.predict(features)
probability = model.predict_proba(features)
return PredictionResponse(
prediction=int(prediction[0]),
probability=probability[0].tolist(),
status="success"
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# 用法:
# curl -X POST "http://localhost:5000/predict" \
# -H "Content-Type: application/json" \
# -d '{"features": [5.1, 3.5, 1.4, 0.2]}'稀有度: 非常常见 难度: 中等
13. 什么是 Docker,为什么它对 ML 部署有用?
答案: Docker 容器将应用程序与所有依赖项打包在一起。
- 优点:
- 可重现性
- 跨环境的一致性
- 易于部署
- 隔离
# ML 模型的 Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 复制 requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制模型和代码
COPY model.joblib .
COPY app.py .
# 暴露端口
EXPOSE 5000
# 运行应用程序
CMD ["python", "app.py"]# 构建 Docker 镜像
docker build -t ml-model:v1 .
# 运行容器
docker run -p 5000:5000 ml-model:v1
# 用于多容器设置的 Docker Compose# docker-compose.yml
version: '3.8'
services:
model:
build: .
ports:
- "5000:5000"
environment:
- MODEL_PATH=/app/model.joblib
volumes:
- ./models:/app/models稀有度: 常见 难度: 中等
14. 如何在生产环境中监控模型性能?
答案: 监控检测模型退化并确保可靠性。
- 要监控的内容:
- 预测指标: 准确率、延迟
- 数据漂移: 输入分布变化
- 模型漂移: 性能下降
- 系统指标: CPU、内存、错误
import logging
from datetime import datetime
import numpy as np
class ModelMonitor:
def __init__(self, model):
self.model = model
self.predictions = []
self.actuals = []
self.latencies = []
self.input_stats = []
# 设置日志
logging.basicConfig(
filename='model_monitor.log',
level=logging.INFO,
format='%(asctime)s - %(message)s'
)
def predict(self, X):
import time
# 跟踪输入统计信息
self.input_stats.append({
'mean': X.mean(),
'std': X.std(),
'min': X.min(),
'max': X.max()
})
# 测量延迟
start = time.time()
prediction = self.model.predict(X)
latency = time.time() - start
self.predictions.append(prediction)
self.latencies.append(latency)
# 记录预测
logging.info(f"Prediction: {prediction}, Latency: {latency:.3f}s")
# 如果延迟太高则发出警报
if latency > 1.0:
logging.warning(f"High latency detected: {latency:.3f}s")
return prediction
def log_actual(self, y_true):
self.actuals.append(y_true)
# 如果有足够的数据,则计算准确率
if len(self.actuals) >= 100:
accuracy = np.mean(
np.array(self.predictions[-100:]) == np.array(self.actuals[-100:])
)
logging.info(f"Rolling accuracy (last 100): {accuracy:.3f}")
if accuracy < 0.8:
logging.error(f"Model performance degraded: {accuracy:.3f}")
def check_data_drift(self, reference_stats):
if not self.input_stats:
return
current_stats = self.input_stats[-1]
# 简单的漂移检测 (比较均值)
mean_diff = abs(current_stats['mean'] - reference_stats['mean'])
if mean_diff > 2 * reference_stats['std']:
logging.warning(f"Data drift detected: mean diff = {mean_diff:.3f}")
# 用法
monitor = ModelMonitor(model)
prediction = monitor.predict(X_new)
# 稍后,当实际标签可用时
monitor.log_actual(y_true)稀有度: 常见 难度: 中等
15. 什么是机器学习的 CI/CD?
答案: CI/CD 自动化 ML 模型的测试和部署。
- 持续集成:
- 自动化测试
- 代码质量检查
- 模型验证
- 持续部署:
- 自动化部署
- 回滚功能
- A/B 测试
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest
- name: Run tests
run: pytest tests/
- name: Train model
run: python train.py
- name: Validate model
run: python validate_model.py
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to production
run: |
docker build -t ml-model:latest .
docker push ml-model:latest# tests/test_model.py
import pytest
import numpy as np
from sklearn.datasets import load_iris
def test_model_accuracy():
from train import train_model
X, y = load_iris(return_X_y=True)
model, accuracy = train_model(X, y)
assert accuracy > 0.9, f"Model accuracy {accuracy} below threshold"
def test_model_prediction_shape():
model = load_model('model.joblib')
X_test = np.random.rand(10, 4)
predictions = model.predict(X_test)
assert predictions.shape == (10,), "Unexpected prediction shape"
def test_model_prediction_range():
model = load_model('model.joblib')
X_test = np.random.rand(10, 4)
predictions = model.predict(X_test)
assert all(


