高级 SRE 面试题与回答示例

Milad Bonakdar
作者
用实战问题准备高级 SRE 面试,覆盖 SLO、错误预算、容量规划、事故指挥、混沌测试、值班负担和可靠性取舍。
简介
高级站点可靠性工程师需要具备架构大规模可靠系统的能力,领导事件响应,推动 SRE 文化,并就可靠性投资做出战略决策。这个职位需要深厚的技术专长、领导技能以及在可靠性和业务速度之间取得平衡的能力。
本综合指南涵盖了高级 SRE 的重要面试问题,重点关注高级概念、系统设计和组织影响。每个问题都包含详细的解释和实际示例。
高级 SLO 设计
1. 如何为数据有限的新服务设计 SLI 和 SLO?
回答: 为新服务设计 SLO 需要在目标和可实现性之间取得平衡:
方法:
1. 从用户旅程映射开始:
# 识别关键用户旅程
user_journeys = {
'search_product': {
'steps': ['search_query', 'results_display', 'product_click'],
'criticality': 'high',
'expected_latency': '< 500ms'
},
'checkout': {
'steps': ['add_to_cart', 'payment', 'confirmation'],
'criticality': 'critical',
'expected_latency': '< 2s'
},
'browse_recommendations': {
'steps': ['load_page', 'fetch_recommendations'],
'criticality': 'medium',
'expected_latency': '< 1s'
}
}2. 根据用户体验定义 SLI:
# SLI 规范
slis:
availability:
description: "成功请求的百分比"
measurement: "count(http_status < 500) / count(http_requests)"
latency:
description: "第 95 百分位的请求延迟"
measurement: "histogram_quantile(0.95, http_request_duration_seconds)"
correctness:
description: "返回正确数据的请求的百分比"
measurement: "count(validation_passed) / count(requests)"3. 保守地设置初始 SLO:
def calculate_initial_slo(service_type, criticality):
"""
根据服务特性计算初始 SLO
"""
base_slos = {
'critical': {
'availability': 0.999, # 99.9%
'latency_p95': 1.0, # 1 秒
'latency_p99': 2.0 # 2 秒
},
'high': {
'availability': 0.995, # 99.5%
'latency_p95': 2.0,
'latency_p99': 5.0
},
'medium': {
'availability': 0.99, # 99%
'latency_p95': 5.0,
'latency_p99': 10.0
}
}
return base_slos.get(criticality, base_slos['medium'])
# 示例
checkout_slo = calculate_initial_slo('payment', 'critical')
print(f"Checkout SLO: {checkout_slo}")4. 计划迭代:
- 从 4 周的测量窗口开始
- 每周审查 SLO 性能
- 根据实际性能和用户反馈进行调整
- 随着系统成熟,收紧 SLO
5. 记录假设:
## SLO 假设(初始)
### 可用性:99.9%
- 假设:标准云基础设施可靠性
- 错误预算:43 分钟/月
- 审查:在 3 个月的数据后
### 延迟 (p95):< 1 秒
- 假设:数据库查询 < 100 毫秒
- 假设:没有复杂的计算
- 审查:如果查询模式改变
### 依赖关系
- 外部 API 可用性:99.95%
- 数据库可用性:99.99%稀有度: 常见
难度: 困难
2. 如何处理不同用户群体的 SLO 冲突?
回答: 不同的用户群体通常有不同的可靠性需求:
策略:多层 SLO
class SLOTier:
def __init__(self, name, availability, latency_p95, latency_p99):
self.name = name
self.availability = availability
self.latency_p95 = latency_p95
self.latency_p99 = latency_p99
self.error_budget = 1 - availability
# 定义层级
tiers = {
'premium': SLOTier(
name='Premium',
availability=0.9999, # 99.99% - 4.3 分钟/月
latency_p95=0.5,
latency_p99=1.0
),
'standard': SLOTier(
name='Standard',
availability=0.999, # 99.9% - 43 分钟/月
latency_p95=1.0,
latency_p99=2.0
),
'free': SLOTier(
name='Free',
availability=0.99, # 99% - 7.2 小时/月
latency_p95=2.0,
latency_p99=5.0
)
}
# 根据层级路由请求
def get_user_tier(user_id):
# 查找用户的订阅层级
return user_subscription_tier(user_id)
def apply_slo_policy(user_id, request):
tier = get_user_tier(user_id)
slo = tiers[tier]
# 应用特定于层级的策略
request.timeout = slo.latency_p99
request.priority = tier # 用于队列优先级排序
request.retry_budget = calculate_retry_budget(slo.error_budget)
return request使用流量路由实施:
# Kubernetes 示例:每个层级单独部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: api-premium
spec:
replicas: 10
template:
spec:
containers:
- name: api
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
priorityClassName: high-priority
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: api-standard
spec:
replicas: 5
template:
spec:
containers:
- name: api
resources:
requests:
cpu: "1"
memory: "2Gi"按层级监控:
# 按层级划分的可用性
sum(rate(http_requests_total{status!~"5.."}[5m])) by (tier)
/
sum(rate(http_requests_total[5m])) by (tier)
# 按层级划分的延迟
histogram_quantile(0.95,
rate(http_request_duration_seconds_bucket[5m])
) by (tier)稀有度: 不常见
难度: 困难
容量规划
3. 详细介绍您为快速增长的服务制定的容量规划流程。
回答: 容量规划确保资源满足需求,同时优化成本:
容量规划框架:
1. 测量基线:
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
class CapacityPlanner:
def __init__(self, metrics_data):
self.data = pd.DataFrame(metrics_data)
def analyze_trends(self, metric_name, days=30):
"""分析历史趋势"""
metric_data = self.data[metric_name].tail(days * 24) # 每小时数据
# 计算增长率
start_value = metric_data.iloc[0]
end_value = metric_data.iloc[-1]
growth_rate = ((end_value - start_value) / start_value) * 100
# 识别峰值使用量
peak_value = metric_data.max()
peak_time = metric_data.idxmax()
# 计算百分位数
p50 = metric_data.quantile(0.50)
p95 = metric_data.quantile(0.95)
p99 = metric_data.quantile(0.99)
return {
'growth_rate': growth_rate,
'peak_value': peak_value,
'peak_time': peak_time,
'p50': p50,
'p95': p95,
'p99': p99
}
def forecast_capacity(self, metric_name, months_ahead=3):
"""预测未来容量需求"""
# 准备数据
df = self.data[[metric_name]].reset_index()
df['days'] = (df.index / 24).astype(int) # 将小时转换为天数
# 训练模型
X = df[['days']].values
y = df[metric_name].values
model = LinearRegression()
model.fit(X, y)
# 预测
future_days = np.array([[df['days'].max() + (30 * months_ahead)]])
forecast = model.predict(future_days)[0]
# 添加安全边际 (20%)
forecast_with_margin = forecast * 1.2
return {
'forecast': forecast,
'with_margin': forecast_with_margin,
'current': y[-1],
'growth_factor': forecast / y[-1]
}
def calculate_resource_needs(self, requests_per_second,
requests_per_instance=100,
headroom=0.3):
"""计算所需的实例数"""
# 基本容量
base_instances = np.ceil(requests_per_second / requests_per_instance)
# 为峰值和维护添加预留空间
total_instances = np.ceil(base_instances * (1 + headroom))
return {
'base_instances': int(base_instances),
'total_instances': int(total_instances),
'headroom_instances': int(total_instances - base_instances)
}
# 示例用法
metrics = {
'requests_per_second': [100, 105, 110, 115, 120, ...], # 历史数据
'cpu_usage': [45, 48, 50, 52, 55, ...],
'memory_usage': [60, 62, 65, 67, 70, ...]
}
planner = CapacityPlanner(metrics)
# 分析趋势
trends = planner.analyze_trends('requests_per_second', days=30)
print(f"增长率: {trends['growth_rate']:.2f}%")
print(f"峰值 RPS: {trends['peak_value']}")
# 预测容量
forecast = planner.forecast_capacity('requests_per_second', months_ahead=3)
print(f"3 个月后的预测 RPS: {forecast['forecast']:.0f}")
print(f"带安全边际: {forecast['with_margin']:.0f}")
# 计算资源需求
resources = planner.calculate_resource_needs(
requests_per_second=forecast['with_margin'],
requests_per_instance=100,
headroom=0.3
)
print(f"所需的实例数: {resources['total_instances']}")2. 考虑增长驱动因素:
- 用户增长率
- 功能发布
- 季节性模式
- 营销活动
- 地理扩张
3. 规划预留空间:
- N+1: 应对一个实例故障
- N+2: 应对两个故障或一个区域中断
- 流量峰值: 正常容量的 2-3 倍
- 维护窗口: 20-30% 的开销
4. 成本优化:
def optimize_instance_mix(workload_profile):
"""
优化实例类型以降低成本
"""
# 实例类型的组合
instance_types = {
'on_demand': {
'cost_per_hour': 0.10,
'reliability': 1.0,
'percentage': 0.3 # 30% 按需用于基线
},
'spot': {
'cost_per_hour': 0.03,
'reliability': 0.95,
'percentage': 0.5 # 50% 竞价型用于节省成本
},
'reserved': {
'cost_per_hour': 0.06,
'reliability': 1.0,
'percentage': 0.2 # 20% 预留用于可预测的负载
}
}
total_instances = workload_profile['total_instances']
allocation = {}
for instance_type, config in instance_types.items():
count = int(total_instances * config['percentage'])
allocation[instance_type] = {
'count': count,
'monthly_cost': count * config['cost_per_hour'] * 730
}
return allocation稀有度: 非常常见
难度: 困难
混沌工程
4. 如何在生产环境中实施混沌工程?
回答: 混沌工程通过注入故障主动测试系统弹性:
混沌工程原则:
- 围绕稳定状态构建假设
- 模拟真实世界事件
- 在生产环境中运行实验
- 自动化实验
- 最小化爆炸半径
实施:
# 混沌实验框架
from dataclasses import dataclass
from enum import Enum
import random
import time
class ExperimentStatus(Enum):
PLANNED = "planned"
RUNNING = "running"
COMPLETED = "completed"
ABORTED = "aborted"
@dataclass
class ChaosExperiment:
name: str
hypothesis: str
blast_radius: float # 受影响的流量百分比
duration_seconds: int
rollback_criteria: dict
def __post_init__(self):
self.status = ExperimentStatus.PLANNED
self.metrics_before = {}
self.metrics_during = {}
self.metrics_after = {}
class ChaosRunner:
def __init__(self, monitoring_client):
self.monitoring = monitoring_client
self.experiments = []
def run_experiment(self, experiment: ChaosExperiment):
"""执行带有安全检查的混沌实验"""
print(f"开始实验: {experiment.name}")
# 1. 测量基线
experiment.metrics_before = self.measure_metrics()
print(f"基线指标: {experiment.metrics_before}")
# 2. 验证系统是否健康
if not self.is_system_healthy(experiment.metrics_before):
print("系统不健康,中止实验")
return False
# 3. 注入故障
experiment.status = ExperimentStatus.RUNNING
failure_injection = self.inject_failure(experiment)
try:
# 4. 在实验期间监控
start_time = time.time()
while time.time() - start_time < experiment.duration_seconds:
experiment.metrics_during = self.measure_metrics()
# 检查回滚标准
if self.should_rollback(experiment):
print("满足回滚标准,停止实验")
self.rollback(failure_injection)
experiment.status = ExperimentStatus.ABORTED
return False
time.sleep(10) # 每 10 秒检查一次
# 5. 回滚故障注入
self.rollback(failure_injection)
# 6. 测量恢复情况
time.sleep(60) # 等待系统稳定
experiment.metrics_after = self.measure_metrics()
# 7. 分析结果
experiment.status = ExperimentStatus.COMPLETED
return self.analyze_results(experiment)
except Exception as e:
print(f"实验失败: {e}")
self.rollback(failure_injection)
experiment.status = ExperimentStatus.ABORTED
return False
def inject_failure(self, experiment):
"""注入特定故障类型"""
# 实现取决于故障类型
pass
def measure_metrics(self):
"""测量关键系统指标"""
return {
'error_rate': self.monitoring.get_error_rate(),
'latency_p95': self.monitoring.get_latency_p95(),
'requests_per_second': self.monitoring.get_rps(),
'availability': self.monitoring.get_availability()
}
def is_system_healthy(self, metrics):
"""检查系统是否满足 SLO"""
return (
metrics['error_rate'] < 0.01 and # < 1% 错误
metrics['latency_p95'] < 1.0 and # < 1 秒延迟
metrics['availability'] > 0.999 # > 99.9% 可用
)
def should_rollback(self, experiment):
"""检查是否应该中止实验"""
current = experiment.metrics_during
criteria = experiment.rollback_criteria
return (
current['error_rate'] > criteria.get('max_error_rate', 0.05) or
current['latency_p95'] > criteria.get('max_latency', 5.0) or
current['availability'] < criteria.get('min_availability', 0.99)
)
def rollback(self, failure_injection):
"""移除故障注入"""
print("回滚故障注入")
# 实现取决于故障类型
def analyze_results(self, experiment):
"""分析实验结果"""
before = experiment.metrics_before
during = experiment.metrics_during
after = experiment.metrics_after
print(f"\n实验结果: {experiment.name}")
print(f"假设: {experiment.hypothesis}")
print(f"\n指标:")
print(f" 之前: {before}")
print(f" 期间: {during}")
print(f" 之后: {after}")
# 确定假设是否得到验证
hypothesis_validated = (
during['availability'] >= experiment.rollback_criteria['min_availability']
)
return hypothesis_validated
# 示例实验
experiment = ChaosExperiment(
name="数据库故障转移测试",
hypothesis="系统在数据库故障转移期间保持可用",
blast_radius=0.1, # 10% 的流量
duration_seconds=300, # 5 分钟
rollback_criteria={
'max_error_rate': 0.05,
'max_latency': 5.0,
'min_availability': 0.99
}
)常见的混沌实验:
1. 网络延迟:
# 使用 tc (traffic control) 添加延迟
tc qdisc add dev eth0 root netem delay 100ms 20ms
# 回滚
tc qdisc del dev eth0 root2. Pod 故障 (Kubernetes):
# 杀死随机 Pod
kubectl delete pod -l app=myapp --random=1
# 使用 Chaos Mesh
kubectl apply -f - <<EOF
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: pod-failure
spec:
action: pod-failure
mode: one
selector:
namespaces:
- production
labelSelectors:
app: myapp
duration: "30s"
EOF3. 资源耗尽:
# CPU 压力
stress-ng --cpu 4 --timeout 60s
# 内存压力
stress-ng --vm 2 --vm-bytes 2G --timeout 60s稀有度: 常见
难度: 困难
事件领导力
5. 如何领导从检测到事后分析的高危事件?
回答: 高级 SRE 通常担任关键中断的事件指挥官:
事件指挥结构:
事件指挥官的职责:
1. 初始响应(0-5 分钟):
## 事件指挥官检查清单
### 立即行动
- [ ] 确认事件
- [ ] 评估严重性(SEV-1、SEV-2、SEV-3)
- [ ] 创建事件频道(#incident-YYYY-MM-DD-NNN)
- [ ] 通知适当的团队
- [ ] 指定角色:
- [ ] 技术负责人
- [ ] 沟通负责人
- [ ] 记录员(记录时间线)
### 严重性评估
**SEV-1(严重):**
- 完全服务中断
- 数据丢失
- 安全漏洞
- > 50% 的用户受到影响
**SEV-2(高):**
- 部分中断
- 性能下降
- 10-50% 的用户受到影响
**SEV-3(中):**
- 轻微退化
- < 10% 的用户受到影响
- 提供解决方法2. 调查阶段:
class IncidentCommander:
def __init__(self, incident_id):
self.incident_id = incident_id
self.timeline = []
self.status_updates = []
self.action_items = []
def coordinate_investigation(self):
"""协调技术调查"""
# 并行调查轨道
tracks = {
'recent_changes': self.check_recent_deployments(),
'infrastructure': self.check_infrastructure_health(),
'dependencies': self.check_external_dependencies(),
'metrics': self.analyze_metrics_anomalies()
}
return tracks
def make_decision(self, options, deadline_minutes=5):
"""做出限时决策"""
print(f"需要在 {deadline_minutes} 分钟内做出决定")
print(f"选项: {options}")
# 从技术负责人那里收集意见
# 根据以下因素做出决定:
# - 用户影响
# - 每个选项的风险
# - 实施时间
# - 可逆性
return selected_option
def communicate_status(self, interval_minutes=15):
"""定期状态更新"""
update = {
'timestamp': datetime.now(),
'status': self.get_current_status(),
'impact': self.assess_user_impact(),
'eta': self.estimate_resolution_time(),
'next_update': datetime.now() + timedelta(minutes=interval_minutes)
}
# 发送给利益相关者
self.send_status_update(update)
self.status_updates.append(update)
def log_timeline_event(self, event, timestamp=None):
"""记录事件时间线"""
self.timeline.append({
'timestamp': timestamp or datetime.now(),
'event': event,
'logged_by': self.get_current_user()
})3. 缓解策略:
def evaluate_mitigation_options():
"""评估和优先排序缓解选项"""
options = [
{
'action': '回滚部署',
'time_to_implement': 5, # 分钟
'risk': 'low',
'effectiveness': 'high',
'reversible': True
},
{
'action': '扩展资源',
'time_to_implement': 2,
'risk': 'low',
'effectiveness': 'medium',
'reversible': True
},
{
'action': '禁用功能标志',
'time_to_implement': 1,
'risk': 'low',
'effectiveness': 'high',
'reversible': True
},
{
'action': '数据库故障转移',
'time_to_implement': 10,
'risk': 'medium',
'effectiveness': 'high',
'reversible': False
}
]
# 按以下因素排序:低风险、高效率、快速实施
sorted_options = sorted(
options,
key=lambda x: (
x['risk'] == 'low',
x['effectiveness'] == 'high',
-x['time_to_implement']
),
reverse=True
)
return sorted_options4. 事后分析(无责):
# 事件事后分析:API 中断
**日期:** 2024-11-25
**持续时间:** 45 分钟
**严重性:** SEV-1
**事件指挥官:** Alice
**技术负责人:** Bob
## 执行摘要
由于数据库连接池耗尽,导致所有用户受到影响的完整 API 中断。
## 影响
- **受影响的用户:** 100%(所有用户)
- **持续时间:** 45 分钟
- **收入影响:** ~$50,000
- **SLO 影响:** 消耗了 75% 的每月错误预算
## 根本原因
由于在事件发生前 2 小时部署的新功能中存在连接泄漏,导致数据库连接池耗尽。
## 时间线
| 时间 | 事件 |
|------|-------|
| 14:00 | 部署 v2.5.0 |
| 15:45 | 首次收到延迟增加的警报 |
| 15:50 | 完整的 API 中断 |
| 15:52 | 声明事件 (SEV-1) |
| 15:55 | 确定数据库为瓶颈 |
| 16:05 | 决定回滚部署 |
| 16:15 | 回滚完成 |
| 16:20 | 服务恢复 |
| 16:35 | 确认完全恢复 |
## 进展顺利
- 快速检测(从首次症状出现后 5 分钟)
- 清晰的事件指挥结构
- 快速决定回滚
- 与利益相关者进行良好的沟通
## 出现问题
- 连接泄漏未在测试中捕获
- 没有连接池监控
- 在高峰时段部署
## 行动项目
| 行动 | 负责人 | 截止日期 | 状态 |
|--------|-------|----------|--------|
| 添加连接池监控 | Alice | 2024-12-01 | 开放 |
| 在测试中实施连接泄漏检测 | Bob | 2024-12-05 | 开放 |
| 更新部署策略(避免高峰时段) | Charlie | 2024-11-30 | 开放 |
| 为数据库连接添加断路器 | David | 2024-12-10 | 开放 |
## 经验教训
- 监控差距可能会隐藏关键问题
- 部署时间很重要
- 需要更好的集成测试来检测资源泄漏稀有度: 非常常见
难度: 困难
分布式系统可靠性
6. 如何确保分布式微服务架构中的可靠性?
回答: 分布式系统引入了独特的可靠性挑战:
关键模式:
1. 用于弹性的服务网格:
# Istio 示例:断路和重试
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: api-service
spec:
host: api-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 50
http2MaxRequests: 100
maxRequestsPerConnection: 2
outlierDetection:
consecutiveErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
minHealthPercent: 40
loadBalancer:
simple: LEAST_REQUEST
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: api-service
spec:
hosts:
- api-service
http:
- retries:
attempts: 3
perTryTimeout: 2s
retryOn: 5xx,reset,connect-failure,refused-stream
timeout: 10s
route:
- destination:
host: api-service2. 分布式跟踪:
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# 设置跟踪
trace.set_tracer_provider(TracerProvider())
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# 仪器化请求库
RequestsInstrumentor().instrument()
# 在代码中使用
tracer = trace.get_tracer(__name__)
def process_order(order_id):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
# 调用支付服务
with tracer.start_as_current_span("call_payment_service"):
payment_result = call_payment_service(order_id)
# 调用库存服务
with tracer.start_as_current_span("call_inventory_service"):
inventory_result = call_inventory_service(order_id)
return combine_results(payment_result, inventory_result)3. 舱壁模式:
import asyncio
from asyncio import Semaphore
class BulkheadExecutor:
def __init__(self, max_concurrent=10):
self.semaphore = Semaphore(max_concurrent)
self.active_requests = 0
async def execute(self, func, *args, **kwargs):
"""执行具有并发限制的函数"""
async with self.semaphore:
self.active_requests += 1
try:
result = await func(*args, **kwargs)
return result
finally:
self.active_requests -= 1
# 为不同的服务分隔舱壁
payment_bulkhead = BulkheadExecutor(max_concurrent=20)
inventory_bulkhead = BulkheadExecutor(max_concurrent=10)
notification_bulkhead = BulkheadExecutor(max_concurrent=5)
# 用法
async def process_order(order_id):
# 每个服务都有隔离的资源池
payment = await payment_bulkhead.execute(call_payment_service, order_id)
inventory = await inventory_bulkhead.execute(call_inventory_service, order_id)
notification = await notification_bulkhead.execute(send_notification, order_id)4. 用于分布式事务的 Saga 模式:
class SagaOrchestrator:
def __init__(self):
self.steps = []
self.compensations = []
def add_step(self, action, compensation):
"""添加带有补偿的步骤"""
self.steps.append(action)
self.compensations.append(compensation)
async def execute(self):
"""执行 Saga,并在失败时自动补偿"""
completed_steps = []
try:
for step in self.steps:
result = await step()
completed_steps.append(result)
return completed_steps
except Exception as e:
# 按相反的顺序补偿
print(f"Saga 失败: {e}. 开始补偿...")
for i in range(len(completed_steps) - 1, -1, -1):
try:
await self.compensations[i](completed_steps[i])
except Exception as comp_error:
print(f"补偿失败: {comp_error}")
raise
# 示例:订单处理 Saga
async def process_order_saga(order_id):
saga = SagaOrchestrator()
# 步骤 1:预留库存
saga.结论
高级 SRE 面试中,最有说服力的回答听起来像真实生产判断,而不是背诵定义。练习说明你如何从用户旅程设定 SLO,如何用错误预算判断发布风险,如何用真实负载数据验证容量,如何运行受控的混沌实验,如何用清晰角色指挥事故,以及如何在不掩盖风险的前提下降低 toil。
面试前准备两到三个具体故事:一次你主导的事故,一次你影响过的可靠性取舍,一次改变团队行为的自动化或可观测性改进。每个故事都要能说清楚信号、决策、取舍和后续改进。


