十二月 21, 2025
45 分钟阅读

初级机器学习工程师面试题:完整指南

interview
career-advice
job-search
entry-level
初级机器学习工程师面试题:完整指南
Milad Bonakdar

Milad Bonakdar

作者

掌握机器学习工程基础知识,包括Python、机器学习算法、模型训练、部署基础和MLOps等方面的必备面试题,为初级机器学习工程师做好准备。


介绍

机器学习工程师负责构建、部署和维护生产环境中的机器学习系统。初级机器学习工程师应具备扎实的编程技能、对机器学习算法的理解、使用机器学习框架的经验以及部署实践的知识。

本指南涵盖了初级机器学习工程师的必备面试题。我们将探讨 Python 编程、机器学习算法、模型训练与评估、部署基础知识和 MLOps 基础,以帮助你为你的第一份机器学习工程角色做好准备。


Python & 编程 (5 题)

1. 如何处理无法装入内存的大型数据集?

答案: 有几种技术可以处理大于可用 RAM 的数据:

  • 批量处理: 分块处理数据
  • 生成器: 按需生成数据
  • Dask/Ray: 分布式计算框架
  • 数据库查询: 仅加载所需数据
  • 内存映射文件: 像在内存中一样访问磁盘
  • 数据流: 在数据到达时处理数据
import pandas as pd
import numpy as np

# 不好:将整个数据集加载到内存中
# df = pd.read_csv('large_file.csv')  # 可能会崩溃

# 好:分块读取
chunk_size = 10000
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
    # 处理每个块
    processed = chunk[chunk['value'] > 0]
    # 保存或聚合结果
    processed.to_csv('output.csv', mode='a', header=False)

# 使用生成器
def data_generator(filename, batch_size=32):
    while True:
        batch = []
        with open(filename, 'r') as f:
            for line in f:
                batch.append(process_line(line))
                if len(batch) == batch_size:
                    yield np.array(batch)
                    batch = []

# 使用 Dask 进行分布式计算
import dask.dataframe as dd
ddf = dd.read_csv('large_file.csv')
result = ddf.groupby('category').mean().compute()

稀有度: 非常常见 难度: 中等


2. 解释 Python 中的装饰器,并给出一个 ML 用例。

答案: 装饰器可以修改或增强函数,而无需更改其代码。

  • ML 中的用例:
    • 计时函数执行
    • 记录预测
    • 缓存结果
    • 输入验证
import time
import functools

# 计时装饰器
def timer(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} took {end - start:.2f} seconds")
        return result
    return wrapper

# 日志装饰器
def log_predictions(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        predictions = func(*args, **kwargs)
        print(f"Made {len(predictions)} predictions")
        print(f"Prediction distribution: {np.bincount(predictions)}")
        return predictions
    return wrapper

# 用法
@timer
@log_predictions
def predict_batch(model, X):
    return model.predict(X)

# 缓存装饰器 (memoization)
def cache_results(func):
    cache = {}
    @functools.wraps(func)
    def wrapper(*args):
        if args not in cache:
            cache[args] = func(*args)
        return cache[args]
    return wrapper

@cache_results
def expensive_feature_engineering(data_id):
    # 昂贵的计算
    return processed_features

稀有度: 常见 难度: 中等


3. @staticmethod@classmethod 有什么区别?

答案: 两者都定义了不需要实例的方法。

  • @staticmethod: 无法访问类或实例
  • @classmethod: 将类作为第一个参数接收
class MLModel:
    model_type = "classifier"
    
    def __init__(self, name):
        self.name = name
    
    # 常规方法 - 需要实例
    def predict(self, X):
        return self.model.predict(X)
    
    # 静态方法 - 实用函数
    @staticmethod
    def preprocess_data(X):
        # 无法访问 self 或 cls
        return (X - X.mean()) / X.std()
    
    # 类方法 - 工厂模式
    @classmethod
    def create_default(cls):
        # 可以访问 cls
        return cls(name=f"default_{cls.model_type}")
    
    @classmethod
    def from_config(cls, config):
        return cls(name=config['name'])

# 用法
# 静态方法 - 不需要实例
processed = MLModel.preprocess_data(X_train)

# 类方法 - 创建实例
model = MLModel.create_default()
model2 = MLModel.from_config({'name': 'my_model'})

稀有度: 中等 难度: 中等


4. 如何处理 ML 管道中的异常?

答案: 适当的错误处理可以防止管道失败并有助于调试。

import logging
from typing import Optional

# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ModelTrainingError(Exception):
    """模型训练失败的自定义异常"""
    pass

def train_model(X, y, model_type='random_forest'):
    try:
        logger.info(f"Starting training with {model_type}")
        
        # 验证输入
        if X.shape[0] != y.shape[0]:
            raise ValueError("X 和 y 必须具有相同数量的样本")
        
        if X.shape[0] < 100:
            raise ModelTrainingError("训练数据不足")
        
        # 训练模型
        if model_type == 'random_forest':
            from sklearn.ensemble import RandomForestClassifier
            model = RandomForestClassifier()
        else:
            raise ValueError(f"Unknown model type: {model_type}")
        
        model.fit(X, y)
        logger.info("Training completed successfully")
        return model
        
    except ValueError as e:
        logger.error(f"Validation error: {e}")
        raise
    except ModelTrainingError as e:
        logger.error(f"Training error: {e}")
        # 可以回退到更简单的模型
        return train_fallback_model(X, y)
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise
    finally:
        logger.info("Training attempt finished")

# 用于资源管理的上下文管理器
class ModelLoader:
    def __init__(self, model_path):
        self.model_path = model_path
        self.model = None
    
    def __enter__(self):
        logger.info(f"Loading model from {self.model_path}")
        self.model = load_model(self.model_path)
        return self.model
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        logger.info("Cleaning up resources")
        if self.model:
            del self.model
        return False  # 不要抑制异常

# 用法
with ModelLoader('model.pkl') as model:
    predictions = model.predict(X_test)

稀有度: 常见 难度: 中等


5. 什么是 Python 生成器,为什么它们在 ML 中很有用?

答案: 生成器一次生成一个值,从而节省内存。

  • 优点:
    • 内存效率高
    • 惰性求值
    • 无限序列
  • ML 用例:
    • 数据加载
    • 批量处理
    • 数据增强
import numpy as np

# 用于批量处理的生成器
def batch_generator(X, y, batch_size=32):
    n_samples = len(X)
    indices = np.arange(n_samples)
    np.random.shuffle(indices)
    
    for start_idx in range(0, n_samples, batch_size):
        end_idx = min(start_idx + batch_size, n_samples)
        batch_indices = indices[start_idx:end_idx]
        yield X[batch_indices], y[batch_indices]

# 在训练中使用
for epoch in range(10):
    for X_batch, y_batch in batch_generator(X_train, y_train):
        model.train_on_batch(X_batch, y_batch)

# 数据增强生成器
def augment_images(images, labels):
    for img, label in zip(images, labels):
        # 原始
        yield img, label
        # 翻转
        yield np.fliplr(img), label
        # 旋转
        yield np.rot90(img), label

# 用于训练的无限生成器
def infinite_batch_generator(X, y, batch_size=32):
    while True:
        indices = np.random.choice(len(X), batch_size)
        yield X[indices], y[indices]

# 与 steps_per_epoch 一起使用
gen = infinite_batch_generator(X_train, y_train)
# model.fit(gen, steps_per_epoch=100, epochs=10)

稀有度: 常见 难度: 中等


ML 算法 & 理论 (5 题)

6. 解释 bagging 和 boosting 之间的区别。

答案: 两者都是集成方法,但工作方式不同:

  • Bagging (Bootstrap Aggregating):
    • 在随机子集上并行训练
    • 减少方差
    • 示例:随机森林
  • Boosting:
    • 顺序训练,每个模型纠正先前的错误
    • 减少偏差
    • 示例:AdaBoost、Gradient Boosting、XGBoost
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score

# 生成数据
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)

# Bagging - 随机森林
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf_scores = cross_val_score(rf, X, y, cv=5)
print(f"Random Forest CV: {rf_scores.mean():.3f} (+/- {rf_scores.std():.3f})")

# Boosting - 梯度提升
gb = GradientBoostingClassifier(n_estimators=100, random_state=42)
gb_scores = cross_val_score(gb, X, y, cv=5)
print(f"Gradient Boosting CV: {gb_scores.mean():.3f} (+/- {gb_scores.std():.3f})")

# XGBoost (高级 boosting)
import xgboost as xgb
xgb_model = xgb.XGBClassifier(n_estimators=100, random_state=42)
xgb_scores = cross_val_score(xgb_model, X, y, cv=5)
print(f"XGBoost CV: {xgb_scores.mean():.3f} (+/- {xgb_scores.std():.3f})")

稀有度: 非常常见 难度: 中等


7. 如何处理不平衡数据集?

答案: 不平衡数据可能会使模型偏向多数类。

  • 技术:
    • 重采样: SMOTE、欠采样
    • 类别权重: 惩罚错误分类
    • 集成方法: 平衡随机森林
    • 评估: 使用 F1、精确率、召回率(而不是准确率)
    • 阈值调整: 优化决策阈值
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from collections import Counter

# 创建不平衡数据集
X, y = make_classification(
    n_samples=1000, n_features=20,
    weights=[0.95, 0.05],  # 95% 类别 0,5% 类别 1
    random_state=42
)

print(f"Original distribution: {Counter(y)}")

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)

# 1. 类别权重
model_weighted = RandomForestClassifier(class_weight='balanced', random_state=42)
model_weighted.fit(X_train, y_train)
print("\nWith class weights:")
print(classification_report(y_test, model_weighted.predict(X_test)))

# 2. SMOTE (过采样少数类)
smote = SMOTE(random_state=42)
X_train_smote, y_train_smote = smote.fit_resample(X_train, y_train)
print(f"After SMOTE: {Counter(y_train_smote)}")

model_smote = RandomForestClassifier(random_state=42)
model_smote.fit(X_train_smote, y_train_smote)
print("\nWith SMOTE:")
print(classification_report(y_test, model_smote.predict(X_test)))

# 3. 阈值调整
y_proba = model_weighted.predict_proba(X_test)[:, 1]
threshold = 0.3  # 较低的阈值有利于少数类
y_pred_adjusted = (y_proba >= threshold).astype(int)
print("\nWith adjusted threshold:")
print(classification_report(y_test, y_pred_adjusted))

稀有度: 非常常见 难度: 中等


8. 什么是交叉验证,为什么它很重要?

答案: 交叉验证比单一的训练-测试分割更可靠地评估模型性能。

  • 类型:
    • K-Fold:分成 k 折
    • 分层 K-Fold:保留类别分布
    • 时间序列分割:尊重时间顺序
  • 优点:
    • 更稳健的性能估计
    • 使用所有数据进行训练和验证
    • 检测过拟合
from sklearn.model_selection import (
    cross_val_score, KFold, StratifiedKFold,
    TimeSeriesSplit, cross_validate
)
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris

# 加载数据
data = load_iris()
X, y = data.data, data.target

model = RandomForestClassifier(random_state=42)

# 标准 K-Fold
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, X, y, cv=kfold)
print(f"K-Fold scores: {scores}")
print(f"Mean: {scores.mean():.3f} (+/- {scores.std():.3f})")

# 分层 K-Fold (保留类别分布)
stratified_kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
stratified_scores = cross_val_score(model, X, y, cv=stratified_kfold)
print(f"\nStratified K-Fold: {stratified_scores.mean():.3f}")

# 时间序列分割 (用于时间数据)
tscv = TimeSeriesSplit(n_splits=5)
ts_scores = cross_val_score(model, X, y, cv=tscv)
print(f"Time Series CV: {ts_scores.mean():.3f}")

# 多个指标
cv_results = cross_validate(
    model, X, y, cv=5,
    scoring=['accuracy', 'precision_macro', 'recall_macro', 'f1_macro'],
    return_train_score=True
)

print(f"\nAccuracy: {cv_results['test_accuracy'].mean():.3f}")
print(f"Precision: {cv_results['test_precision_macro'].mean():.3f}")
print(f"Recall: {cv_results['test_recall_macro'].mean():.3f}")
print(f"F1: {cv_results['test_f1_macro'].mean():.3f}")

稀有度: 非常常见 难度: 简单


9. 解释精确率、召回率和 F1 分数。

答案: 用于评估模型性能的分类指标:

  • 精确率: 在预测为正例的样本中,有多少是正确的
    • 公式:TP / (TP + FP)
    • 使用场景:假正例的代价很高
  • 召回率: 在实际为正例的样本中,有多少被找到
    • 公式:TP / (TP + FN)
    • 使用场景:假负例的代价很高
  • F1 分数: 精确率和召回率的调和平均值
    • 公式:2 × (精确率 × 召回率) / (精确率 + 召回率)
    • 使用场景:需要在精确率和召回率之间取得平衡
from sklearn.metrics import (
    precision_score, recall_score, f1_score,
    classification_report, confusion_matrix
)
import numpy as np

# 示例预测
y_true = np.array([0, 1, 1, 0, 1, 1, 0, 1, 0, 0])
y_pred = np.array([0, 1, 0, 0, 1, 1, 0, 1, 1, 0])

# 计算指标
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print(f"Precision: {precision:.3f}")  # 0.800
print(f"Recall: {recall:.3f}")        # 0.800
print(f"F1-Score: {f1:.3f}")          # 0.800

# 混淆矩阵
cm = confusion_matrix(y_true, y_pred)
print(f"\nConfusion Matrix:\n{cm}")
# [[4 1]
#  [1 4]]

# 分类报告 (所有指标)
print("\nClassification Report:")
print(classification_report(y_true, y_pred))

# 权衡示例
from sklearn.metrics import precision_recall_curve
import matplotlib.pyplot as plt

# 获取概率预测
y_proba = model.predict_proba(X_test)[:, 1]

# 计算不同阈值下的精确率-召回率
precisions, recalls, thresholds = precision_recall_curve(y_test, y_proba)

# 找到最佳阈值 (最大化 F1)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-10)
optimal_idx = np.argmax(f1_scores)
optimal_threshold = thresholds[optimal_idx]
print(f"Optimal threshold: {optimal_threshold:.3f}")

稀有度: 非常常见 难度: 简单


10. 什么是正则化,何时使用它?

答案: 正则化通过惩罚模型复杂性来防止过拟合。

  • 类型:
    • L1 (Lasso): 添加系数的绝对值
    • L2 (Ridge): 添加系数的平方
    • Elastic Net: 结合 L1 和 L2
  • 何时使用:
    • 高方差(过拟合)
    • 许多特征
    • 多重共线性
from sklearn.linear_model import Ridge, Lasso, ElasticNet, LinearRegression
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np

# 生成具有许多特征的数据
X, y = make_regression(
    n_samples=100, n_features=50,
    n_informative=10, noise=10, random_state=42
)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)

# 无正则化
lr = LinearRegression()
lr.fit(X_train, y_train)
lr_score = r2_score(y_test, lr.predict(X_test))
print(f"Linear Regression R²: {lr_score:.3f}")

# L2 正则化 (Ridge)
ridge = Ridge(alpha=1.0)
ridge.fit(X_train, y_train)
ridge_score = r2_score(y_test, ridge.predict(X_test))
print(f"Ridge R²: {ridge_score:.3f}")

# L1 正则化 (Lasso)
lasso = Lasso(alpha=0.1)
lasso.fit(X_train, y_train)
lasso_score = r2_score(y_test, lasso.predict(X_test))
print(f"Lasso R²: {lasso_score:.3f}")
print(f"Lasso non-zero coefficients: {np.sum(lasso.coef_ != 0)}")

# Elastic Net (L1 + L2)
elastic = ElasticNet(alpha=0.1, l1_ratio=0.5)
elastic.fit(X_train, y_train)
elastic_score = r2_score(y_test, elastic.predict(X_test))
print(f"Elastic Net R²: {elastic_score:.3f}")

# Alpha 的超参数调整
from sklearn.model_selection import GridSearchCV

param_grid = {'alpha': [0.001, 0.01, 0.1, 1.0, 10.0]}
grid = GridSearchCV(Ridge(), param_grid, cv=5)
grid.fit(X_train, y_train)
print(f"\nBest alpha: {grid.best_params_['alpha']}")
print(f"Best CV score: {grid.best_score_:.3f}")

稀有度: 非常常见 难度: 中等


模型训练 & 部署 (5 题)

11. 如何在生产环境中保存和加载模型?

答案: 模型序列化支持部署和重用。

import joblib
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris

# 训练模型
X, y = load_iris(return_X_y=True)
model = RandomForestClassifier()
model.fit(X, y)

# 方法 1:Joblib (推荐用于 scikit-learn)
joblib.dump(model, 'model.joblib')
loaded_model = joblib.load('model.joblib')

# 方法 2:Pickle
with open('model.pkl', 'wb') as f:
    pickle.dump(model, f)

with open('model.pkl', 'rb') as f:
    loaded_model = pickle.load(f)

# 对于 TensorFlow/Keras
import tensorflow as tf

# 保存整个模型
# keras_model.save('model.h5')
# loaded_model = tf.keras.models.load_model('model.h5')

# 仅保存权重
# keras_model.save_weights('model_weights.h5')
# new_model.load_weights('model_weights.h5')

# 对于 PyTorch
import torch

# 保存模型状态字典
# torch.save(model.state_dict(), 'model.pth')
# model.load_state_dict(torch.load('model.pth'))

# 保存整个模型
# torch.save(model, 'model_complete.pth')
# model = torch.load('model_complete.pth')

# 模型的版本控制
import datetime

model_version = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
model_path = f'models/model_{model_version}.joblib'
joblib.dump(model, model_path)
print(f"Model saved to {model_path}")

稀有度: 非常常见 难度: 简单


12. 如何为模型服务创建一个 REST API?

答案: REST API 使应用程序可以访问模型。

from flask import Flask, request, jsonify
import joblib
import numpy as np

app = Flask(__name__)

# 在启动时加载模型
model = joblib.load('model.joblib')

@app.route('/predict', methods=['POST'])
def predict():
    try:
        # 从请求中获取数据
        data = request.get_json()
        features = np.array(data['features']).reshape(1, -1)
        
        # 进行预测
        prediction = model.predict(features)
        probability = model.predict_proba(features)
        
        # 返回响应
        return jsonify({
            'prediction': int(prediction[0]),
            'probability': probability[0].tolist(),
            'status': 'success'
        })
    
    except Exception as e:
        return jsonify({
            'error': str(e),
            'status': 'error'
        }), 400

@app.route('/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

# FastAPI 替代方案 (现代,更快)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class PredictionRequest(BaseModel):
    features: list

class PredictionResponse(BaseModel):
    prediction: int
    probability: list
    status: str

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    try:
        features = np.array(request.features).reshape(1, -1)
        prediction = model.predict(features)
        probability = model.predict_proba(features)
        
        return PredictionResponse(
            prediction=int(prediction[0]),
            probability=probability[0].tolist(),
            status="success"
        )
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

# 用法:
# curl -X POST "http://localhost:5000/predict" \
#      -H "Content-Type: application/json" \
#      -d '{"features": [5.1, 3.5, 1.4, 0.2]}'

稀有度: 非常常见 难度: 中等


13. 什么是 Docker,为什么它对 ML 部署有用?

答案: Docker 容器将应用程序与所有依赖项打包在一起。

  • 优点:
    • 可重现性
    • 跨环境的一致性
    • 易于部署
    • 隔离
# ML 模型的 Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 复制 requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制模型和代码
COPY model.joblib .
COPY app.py .

# 暴露端口
EXPOSE 5000

# 运行应用程序
CMD ["python", "app.py"]
# 构建 Docker 镜像
docker build -t ml-model:v1 .

# 运行容器
docker run -p 5000:5000 ml-model:v1

# 用于多容器设置的 Docker Compose
# docker-compose.yml
version: '3.8'
services:
  model:
    build: .
    ports:
      - "5000:5000"
    environment:
      - MODEL_PATH=/app/model.joblib
    volumes:
      - ./models:/app/models

稀有度: 常见 难度: 中等


14. 如何在生产环境中监控模型性能?

答案: 监控检测模型退化并确保可靠性。

  • 要监控的内容:
    • 预测指标: 准确率、延迟
    • 数据漂移: 输入分布变化
    • 模型漂移: 性能下降
    • 系统指标: CPU、内存、错误
import logging
from datetime import datetime
import numpy as np

class ModelMonitor:
    def __init__(self, model):
        self.model = model
        self.predictions = []
        self.actuals = []
        self.latencies = []
        self.input_stats = []
        
        # 设置日志
        logging.basicConfig(
            filename='model_monitor.log',
            level=logging.INFO,
            format='%(asctime)s - %(message)s'
        )
    
    def predict(self, X):
        import time
        
        # 跟踪输入统计信息
        self.input_stats.append({
            'mean': X.mean(),
            'std': X.std(),
            'min': X.min(),
            'max': X.max()
        })
        
        # 测量延迟
        start = time.time()
        prediction = self.model.predict(X)
        latency = time.time() - start
        
        self.predictions.append(prediction)
        self.latencies.append(latency)
        
        # 记录预测
        logging.info(f"Prediction: {prediction}, Latency: {latency:.3f}s")
        
        # 如果延迟太高则发出警报
        if latency > 1.0:
            logging.warning(f"High latency detected: {latency:.3f}s")
        
        return prediction
    
    def log_actual(self, y_true):
        self.actuals.append(y_true)
        
        # 如果有足够的数据,则计算准确率
        if len(self.actuals) >= 100:
            accuracy = np.mean(
                np.array(self.predictions[-100:]) == np.array(self.actuals[-100:])
            )
            logging.info(f"Rolling accuracy (last 100): {accuracy:.3f}")
            
            if accuracy < 0.8:
                logging.error(f"Model performance degraded: {accuracy:.3f}")
    
    def check_data_drift(self, reference_stats):
        if not self.input_stats:
            return
        
        current_stats = self.input_stats[-1]
        
        # 简单的漂移检测 (比较均值)
        mean_diff = abs(current_stats['mean'] - reference_stats['mean'])
        if mean_diff > 2 * reference_stats['std']:
            logging.warning(f"Data drift detected: mean diff = {mean_diff:.3f}")

# 用法
monitor = ModelMonitor(model)
prediction = monitor.predict(X_new)
# 稍后,当实际标签可用时
monitor.log_actual(y_true)

稀有度: 常见 难度: 中等


15. 什么是机器学习的 CI/CD?

答案: CI/CD 自动化 ML 模型的测试和部署。

  • 持续集成:
    • 自动化测试
    • 代码质量检查
    • 模型验证
  • 持续部署:
    • 自动化部署
    • 回滚功能
    • A/B 测试
# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      
      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: 3.9
      
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest
      
      - name: Run tests
        run: pytest tests/
      
      - name: Train model
        run: python train.py
      
      - name: Validate model
        run: python validate_model.py
  
  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Deploy to production
        run: |
          docker build -t ml-model:latest .
          docker push ml-model:latest
# tests/test_model.py
import pytest
import numpy as np
from sklearn.datasets import load_iris

def test_model_accuracy():
    from train import train_model
    X, y = load_iris(return_X_y=True)
    model, accuracy = train_model(X, y)
    assert accuracy > 0.9, f"Model accuracy {accuracy} below threshold"

def test_model_prediction_shape():
    model = load_model('model.joblib')
    X_test = np.random.rand(10, 4)
    predictions = model.predict(X_test)
    assert predictions.shape == (10,), "Unexpected prediction shape"

def test_model_prediction_range():
    model = load_model('model.joblib')
    X_test = np.random.rand(10, 4)
    predictions = model.predict(X_test)
    assert all(
Newsletter subscription

真正有效的每周职业建议

将最新见解直接发送到您的收件箱

Decorative doodle

您的下一次面试只差一份简历

在几分钟内创建一份专业、优化的简历。无需设计技能——只有经过验证的结果。

创建我的简历

分享这篇文章

快50%获得工作

使用专业AI增强简历的求职者平均在5周内找到工作,而标准时间是10周。停止等待,开始面试。