十二月 21, 2025
47 分钟阅读

高级 SRE 面试题与回答示例

interview
career-advice
job-search
高级 SRE 面试题与回答示例
Milad Bonakdar

Milad Bonakdar

作者

用实战问题准备高级 SRE 面试,覆盖 SLO、错误预算、容量规划、事故指挥、混沌测试、值班负担和可靠性取舍。


简介

高级站点可靠性工程师需要具备架构大规模可靠系统的能力,领导事件响应,推动 SRE 文化,并就可靠性投资做出战略决策。这个职位需要深厚的技术专长、领导技能以及在可靠性和业务速度之间取得平衡的能力。

本综合指南涵盖了高级 SRE 的重要面试问题,重点关注高级概念、系统设计和组织影响。每个问题都包含详细的解释和实际示例。


高级 SLO 设计

1. 如何为数据有限的新服务设计 SLI 和 SLO?

回答: 为新服务设计 SLO 需要在目标和可实现性之间取得平衡:

方法:

1. 从用户旅程映射开始:

# 识别关键用户旅程
user_journeys = {
    'search_product': {
        'steps': ['search_query', 'results_display', 'product_click'],
        'criticality': 'high',
        'expected_latency': '< 500ms'
    },
    'checkout': {
        'steps': ['add_to_cart', 'payment', 'confirmation'],
        'criticality': 'critical',
        'expected_latency': '< 2s'
    },
    'browse_recommendations': {
        'steps': ['load_page', 'fetch_recommendations'],
        'criticality': 'medium',
        'expected_latency': '< 1s'
    }
}

2. 根据用户体验定义 SLI:

# SLI 规范
slis:
  availability:
    description: "成功请求的百分比"
    measurement: "count(http_status < 500) / count(http_requests)"
    
  latency:
    description: "第 95 百分位的请求延迟"
    measurement: "histogram_quantile(0.95, http_request_duration_seconds)"
    
  correctness:
    description: "返回正确数据的请求的百分比"
    measurement: "count(validation_passed) / count(requests)"

3. 保守地设置初始 SLO:

def calculate_initial_slo(service_type, criticality):
    """
    根据服务特性计算初始 SLO
    """
    base_slos = {
        'critical': {
            'availability': 0.999,  # 99.9%
            'latency_p95': 1.0,     # 1 秒
            'latency_p99': 2.0      # 2 秒
        },
        'high': {
            'availability': 0.995,  # 99.5%
            'latency_p95': 2.0,
            'latency_p99': 5.0
        },
        'medium': {
            'availability': 0.99,   # 99%
            'latency_p95': 5.0,
            'latency_p99': 10.0
        }
    }
    
    return base_slos.get(criticality, base_slos['medium'])

# 示例
checkout_slo = calculate_initial_slo('payment', 'critical')
print(f"Checkout SLO: {checkout_slo}")

4. 计划迭代:

  • 从 4 周的测量窗口开始
  • 每周审查 SLO 性能
  • 根据实际性能和用户反馈进行调整
  • 随着系统成熟,收紧 SLO

5. 记录假设:

## SLO 假设(初始)

### 可用性:99.9%
- 假设:标准云基础设施可靠性
- 错误预算:43 分钟/月
- 审查:在 3 个月的数据后

### 延迟 (p95):< 1 秒
- 假设:数据库查询 < 100 毫秒
- 假设:没有复杂的计算
- 审查:如果查询模式改变

### 依赖关系
- 外部 API 可用性:99.95%
- 数据库可用性:99.99%

稀有度: 常见
难度: 困难


2. 如何处理不同用户群体的 SLO 冲突?

回答: 不同的用户群体通常有不同的可靠性需求:

策略:多层 SLO

class SLOTier:
    def __init__(self, name, availability, latency_p95, latency_p99):
        self.name = name
        self.availability = availability
        self.latency_p95 = latency_p95
        self.latency_p99 = latency_p99
        self.error_budget = 1 - availability

# 定义层级
tiers = {
    'premium': SLOTier(
        name='Premium',
        availability=0.9999,  # 99.99% - 4.3 分钟/月
        latency_p95=0.5,
        latency_p99=1.0
    ),
    'standard': SLOTier(
        name='Standard',
        availability=0.999,   # 99.9% - 43 分钟/月
        latency_p95=1.0,
        latency_p99=2.0
    ),
    'free': SLOTier(
        name='Free',
        availability=0.99,    # 99% - 7.2 小时/月
        latency_p95=2.0,
        latency_p99=5.0
    )
}

# 根据层级路由请求
def get_user_tier(user_id):
    # 查找用户的订阅层级
    return user_subscription_tier(user_id)

def apply_slo_policy(user_id, request):
    tier = get_user_tier(user_id)
    slo = tiers[tier]
    
    # 应用特定于层级的策略
    request.timeout = slo.latency_p99
    request.priority = tier  # 用于队列优先级排序
    request.retry_budget = calculate_retry_budget(slo.error_budget)
    
    return request

使用流量路由实施:

# Kubernetes 示例:每个层级单独部署
apiVersion: apps/v1
kind: Deployment
metadata:
  name: api-premium
spec:
  replicas: 10
  template:
    spec:
      containers:
      - name: api
        resources:
          requests:
            cpu: "2"
            memory: "4Gi"
          limits:
            cpu: "4"
            memory: "8Gi"
      priorityClassName: high-priority
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: api-standard
spec:
  replicas: 5
  template:
    spec:
      containers:
      - name: api
        resources:
          requests:
            cpu: "1"
            memory: "2Gi"

按层级监控:

# 按层级划分的可用性
sum(rate(http_requests_total{status!~"5.."}[5m])) by (tier)
/
sum(rate(http_requests_total[5m])) by (tier)

# 按层级划分的延迟
histogram_quantile(0.95,
  rate(http_request_duration_seconds_bucket[5m])
) by (tier)

稀有度: 不常见
难度: 困难


容量规划

3. 详细介绍您为快速增长的服务制定的容量规划流程。

回答: 容量规划确保资源满足需求,同时优化成本:

容量规划框架:

Loading diagram...

1. 测量基线:

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression

class CapacityPlanner:
    def __init__(self, metrics_data):
        self.data = pd.DataFrame(metrics_data)
    
    def analyze_trends(self, metric_name, days=30):
        """分析历史趋势"""
        metric_data = self.data[metric_name].tail(days * 24)  # 每小时数据
        
        # 计算增长率
        start_value = metric_data.iloc[0]
        end_value = metric_data.iloc[-1]
        growth_rate = ((end_value - start_value) / start_value) * 100
        
        # 识别峰值使用量
        peak_value = metric_data.max()
        peak_time = metric_data.idxmax()
        
        # 计算百分位数
        p50 = metric_data.quantile(0.50)
        p95 = metric_data.quantile(0.95)
        p99 = metric_data.quantile(0.99)
        
        return {
            'growth_rate': growth_rate,
            'peak_value': peak_value,
            'peak_time': peak_time,
            'p50': p50,
            'p95': p95,
            'p99': p99
        }
    
    def forecast_capacity(self, metric_name, months_ahead=3):
        """预测未来容量需求"""
        # 准备数据
        df = self.data[[metric_name]].reset_index()
        df['days'] = (df.index / 24).astype(int)  # 将小时转换为天数
        
        # 训练模型
        X = df[['days']].values
        y = df[metric_name].values
        
        model = LinearRegression()
        model.fit(X, y)
        
        # 预测
        future_days = np.array([[df['days'].max() + (30 * months_ahead)]])
        forecast = model.predict(future_days)[0]
        
        # 添加安全边际 (20%)
        forecast_with_margin = forecast * 1.2
        
        return {
            'forecast': forecast,
            'with_margin': forecast_with_margin,
            'current': y[-1],
            'growth_factor': forecast / y[-1]
        }
    
    def calculate_resource_needs(self, requests_per_second, 
                                 requests_per_instance=100,
                                 headroom=0.3):
        """计算所需的实例数"""
        # 基本容量
        base_instances = np.ceil(requests_per_second / requests_per_instance)
        
        # 为峰值和维护添加预留空间
        total_instances = np.ceil(base_instances * (1 + headroom))
        
        return {
            'base_instances': int(base_instances),
            'total_instances': int(total_instances),
            'headroom_instances': int(total_instances - base_instances)
        }

# 示例用法
metrics = {
    'requests_per_second': [100, 105, 110, 115, 120, ...],  # 历史数据
    'cpu_usage': [45, 48, 50, 52, 55, ...],
    'memory_usage': [60, 62, 65, 67, 70, ...]
}

planner = CapacityPlanner(metrics)

# 分析趋势
trends = planner.analyze_trends('requests_per_second', days=30)
print(f"增长率: {trends['growth_rate']:.2f}%")
print(f"峰值 RPS: {trends['peak_value']}")

# 预测容量
forecast = planner.forecast_capacity('requests_per_second', months_ahead=3)
print(f"3 个月后的预测 RPS: {forecast['forecast']:.0f}")
print(f"带安全边际: {forecast['with_margin']:.0f}")

# 计算资源需求
resources = planner.calculate_resource_needs(
    requests_per_second=forecast['with_margin'],
    requests_per_instance=100,
    headroom=0.3
)
print(f"所需的实例数: {resources['total_instances']}")

2. 考虑增长驱动因素:

  • 用户增长率
  • 功能发布
  • 季节性模式
  • 营销活动
  • 地理扩张

3. 规划预留空间:

  • N+1: 应对一个实例故障
  • N+2: 应对两个故障或一个区域中断
  • 流量峰值: 正常容量的 2-3 倍
  • 维护窗口: 20-30% 的开销

4. 成本优化:

def optimize_instance_mix(workload_profile):
    """
    优化实例类型以降低成本
    """
    # 实例类型的组合
    instance_types = {
        'on_demand': {
            'cost_per_hour': 0.10,
            'reliability': 1.0,
            'percentage': 0.3  # 30% 按需用于基线
        },
        'spot': {
            'cost_per_hour': 0.03,
            'reliability': 0.95,
            'percentage': 0.5  # 50% 竞价型用于节省成本
        },
        'reserved': {
            'cost_per_hour': 0.06,
            'reliability': 1.0,
            'percentage': 0.2  # 20% 预留用于可预测的负载
        }
    }
    
    total_instances = workload_profile['total_instances']
    
    allocation = {}
    for instance_type, config in instance_types.items():
        count = int(total_instances * config['percentage'])
        allocation[instance_type] = {
            'count': count,
            'monthly_cost': count * config['cost_per_hour'] * 730
        }
    
    return allocation

稀有度: 非常常见
难度: 困难


混沌工程

4. 如何在生产环境中实施混沌工程?

回答: 混沌工程通过注入故障主动测试系统弹性:

混沌工程原则:

  1. 围绕稳定状态构建假设
  2. 模拟真实世界事件
  3. 在生产环境中运行实验
  4. 自动化实验
  5. 最小化爆炸半径

实施:

# 混沌实验框架
from dataclasses import dataclass
from enum import Enum
import random
import time

class ExperimentStatus(Enum):
    PLANNED = "planned"
    RUNNING = "running"
    COMPLETED = "completed"
    ABORTED = "aborted"

@dataclass
class ChaosExperiment:
    name: str
    hypothesis: str
    blast_radius: float  # 受影响的流量百分比
    duration_seconds: int
    rollback_criteria: dict
    
    def __post_init__(self):
        self.status = ExperimentStatus.PLANNED
        self.metrics_before = {}
        self.metrics_during = {}
        self.metrics_after = {}

class ChaosRunner:
    def __init__(self, monitoring_client):
        self.monitoring = monitoring_client
        self.experiments = []
    
    def run_experiment(self, experiment: ChaosExperiment):
        """执行带有安全检查的混沌实验"""
        print(f"开始实验: {experiment.name}")
        
        # 1. 测量基线
        experiment.metrics_before = self.measure_metrics()
        print(f"基线指标: {experiment.metrics_before}")
        
        # 2. 验证系统是否健康
        if not self.is_system_healthy(experiment.metrics_before):
            print("系统不健康,中止实验")
            return False
        
        # 3. 注入故障
        experiment.status = ExperimentStatus.RUNNING
        failure_injection = self.inject_failure(experiment)
        
        try:
            # 4. 在实验期间监控
            start_time = time.time()
            while time.time() - start_time < experiment.duration_seconds:
                experiment.metrics_during = self.measure_metrics()
                
                # 检查回滚标准
                if self.should_rollback(experiment):
                    print("满足回滚标准,停止实验")
                    self.rollback(failure_injection)
                    experiment.status = ExperimentStatus.ABORTED
                    return False
                
                time.sleep(10)  # 每 10 秒检查一次
            
            # 5. 回滚故障注入
            self.rollback(failure_injection)
            
            # 6. 测量恢复情况
            time.sleep(60)  # 等待系统稳定
            experiment.metrics_after = self.measure_metrics()
            
            # 7. 分析结果
            experiment.status = ExperimentStatus.COMPLETED
            return self.analyze_results(experiment)
            
        except Exception as e:
            print(f"实验失败: {e}")
            self.rollback(failure_injection)
            experiment.status = ExperimentStatus.ABORTED
            return False
    
    def inject_failure(self, experiment):
        """注入特定故障类型"""
        # 实现取决于故障类型
        pass
    
    def measure_metrics(self):
        """测量关键系统指标"""
        return {
            'error_rate': self.monitoring.get_error_rate(),
            'latency_p95': self.monitoring.get_latency_p95(),
            'requests_per_second': self.monitoring.get_rps(),
            'availability': self.monitoring.get_availability()
        }
    
    def is_system_healthy(self, metrics):
        """检查系统是否满足 SLO"""
        return (
            metrics['error_rate'] < 0.01 and  # < 1% 错误
            metrics['latency_p95'] < 1.0 and  # < 1 秒延迟
            metrics['availability'] > 0.999   # > 99.9% 可用
        )
    
    def should_rollback(self, experiment):
        """检查是否应该中止实验"""
        current = experiment.metrics_during
        criteria = experiment.rollback_criteria
        
        return (
            current['error_rate'] > criteria.get('max_error_rate', 0.05) or
            current['latency_p95'] > criteria.get('max_latency', 5.0) or
            current['availability'] < criteria.get('min_availability', 0.99)
        )
    
    def rollback(self, failure_injection):
        """移除故障注入"""
        print("回滚故障注入")
        # 实现取决于故障类型
    
    def analyze_results(self, experiment):
        """分析实验结果"""
        before = experiment.metrics_before
        during = experiment.metrics_during
        after = experiment.metrics_after
        
        print(f"\n实验结果: {experiment.name}")
        print(f"假设: {experiment.hypothesis}")
        print(f"\n指标:")
        print(f"  之前: {before}")
        print(f"  期间: {during}")
        print(f"  之后: {after}")
        
        # 确定假设是否得到验证
        hypothesis_validated = (
            during['availability'] >= experiment.rollback_criteria['min_availability']
        )
        
        return hypothesis_validated

# 示例实验
experiment = ChaosExperiment(
    name="数据库故障转移测试",
    hypothesis="系统在数据库故障转移期间保持可用",
    blast_radius=0.1,  # 10% 的流量
    duration_seconds=300,  # 5 分钟
    rollback_criteria={
        'max_error_rate': 0.05,
        'max_latency': 5.0,
        'min_availability': 0.99
    }
)

常见的混沌实验:

1. 网络延迟:

# 使用 tc (traffic control) 添加延迟
tc qdisc add dev eth0 root netem delay 100ms 20ms

# 回滚
tc qdisc del dev eth0 root

2. Pod 故障 (Kubernetes):

# 杀死随机 Pod
kubectl delete pod -l app=myapp --random=1

# 使用 Chaos Mesh
kubectl apply -f - <<EOF
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
  name: pod-failure
spec:
  action: pod-failure
  mode: one
  selector:
    namespaces:
      - production
    labelSelectors:
      app: myapp
  duration: "30s"
EOF

3. 资源耗尽:

# CPU 压力
stress-ng --cpu 4 --timeout 60s

# 内存压力
stress-ng --vm 2 --vm-bytes 2G --timeout 60s

稀有度: 常见
难度: 困难


事件领导力

5. 如何领导从检测到事后分析的高危事件?

回答: 高级 SRE 通常担任关键中断的事件指挥官:

事件指挥结构:

Loading diagram...

事件指挥官的职责:

1. 初始响应(0-5 分钟):

## 事件指挥官检查清单

### 立即行动
- [ ] 确认事件
- [ ] 评估严重性(SEV-1、SEV-2、SEV-3)
- [ ] 创建事件频道(#incident-YYYY-MM-DD-NNN)
- [ ] 通知适当的团队
- [ ] 指定角色:
  - [ ] 技术负责人
  - [ ] 沟通负责人
  - [ ] 记录员(记录时间线)

### 严重性评估
**SEV-1(严重):**
- 完全服务中断
- 数据丢失
- 安全漏洞
- > 50% 的用户受到影响

**SEV-2(高):**
- 部分中断
- 性能下降
- 10-50% 的用户受到影响

**SEV-3(中):**
- 轻微退化
- < 10% 的用户受到影响
- 提供解决方法

2. 调查阶段:

class IncidentCommander:
    def __init__(self, incident_id):
        self.incident_id = incident_id
        self.timeline = []
        self.status_updates = []
        self.action_items = []
    
    def coordinate_investigation(self):
        """协调技术调查"""
        # 并行调查轨道
        tracks = {
            'recent_changes': self.check_recent_deployments(),
            'infrastructure': self.check_infrastructure_health(),
            'dependencies': self.check_external_dependencies(),
            'metrics': self.analyze_metrics_anomalies()
        }
        
        return tracks
    
    def make_decision(self, options, deadline_minutes=5):
        """做出限时决策"""
        print(f"需要在 {deadline_minutes} 分钟内做出决定")
        print(f"选项: {options}")
        
        # 从技术负责人那里收集意见
        # 根据以下因素做出决定:
        # - 用户影响
        # - 每个选项的风险
        # - 实施时间
        # - 可逆性
        
        return selected_option
    
    def communicate_status(self, interval_minutes=15):
        """定期状态更新"""
        update = {
            'timestamp': datetime.now(),
            'status': self.get_current_status(),
            'impact': self.assess_user_impact(),
            'eta': self.estimate_resolution_time(),
            'next_update': datetime.now() + timedelta(minutes=interval_minutes)
        }
        
        # 发送给利益相关者
        self.send_status_update(update)
        self.status_updates.append(update)
    
    def log_timeline_event(self, event, timestamp=None):
        """记录事件时间线"""
        self.timeline.append({
            'timestamp': timestamp or datetime.now(),
            'event': event,
            'logged_by': self.get_current_user()
        })

3. 缓解策略:

def evaluate_mitigation_options():
    """评估和优先排序缓解选项"""
    options = [
        {
            'action': '回滚部署',
            'time_to_implement': 5,  # 分钟
            'risk': 'low',
            'effectiveness': 'high',
            'reversible': True
        },
        {
            'action': '扩展资源',
            'time_to_implement': 2,
            'risk': 'low',
            'effectiveness': 'medium',
            'reversible': True
        },
        {
            'action': '禁用功能标志',
            'time_to_implement': 1,
            'risk': 'low',
            'effectiveness': 'high',
            'reversible': True
        },
        {
            'action': '数据库故障转移',
            'time_to_implement': 10,
            'risk': 'medium',
            'effectiveness': 'high',
            'reversible': False
        }
    ]
    
    # 按以下因素排序:低风险、高效率、快速实施
    sorted_options = sorted(
        options,
        key=lambda x: (
            x['risk'] == 'low',
            x['effectiveness'] == 'high',
            -x['time_to_implement']
        ),
        reverse=True
    )
    
    return sorted_options

4. 事后分析(无责):

# 事件事后分析:API 中断

**日期:** 2024-11-25  
**持续时间:** 45 分钟  
**严重性:** SEV-1  
**事件指挥官:** Alice  
**技术负责人:** Bob

## 执行摘要
由于数据库连接池耗尽,导致所有用户受到影响的完整 API 中断。

## 影响
- **受影响的用户:** 100%(所有用户)
- **持续时间:** 45 分钟
- **收入影响:** ~$50,000
- **SLO 影响:** 消耗了 75% 的每月错误预算

## 根本原因
由于在事件发生前 2 小时部署的新功能中存在连接泄漏,导致数据库连接池耗尽。

## 时间线
| 时间 | 事件 |
|------|-------|
| 14:00 | 部署 v2.5.0 |
| 15:45 | 首次收到延迟增加的警报 |
| 15:50 | 完整的 API 中断 |
| 15:52 | 声明事件 (SEV-1) |
| 15:55 | 确定数据库为瓶颈 |
| 16:05 | 决定回滚部署 |
| 16:15 | 回滚完成 |
| 16:20 | 服务恢复 |
| 16:35 | 确认完全恢复 |

## 进展顺利
- 快速检测(从首次症状出现后 5 分钟)
- 清晰的事件指挥结构
- 快速决定回滚
- 与利益相关者进行良好的沟通

## 出现问题
- 连接泄漏未在测试中捕获
- 没有连接池监控
- 在高峰时段部署

## 行动项目
| 行动 | 负责人 | 截止日期 | 状态 |
|--------|-------|----------|--------|
| 添加连接池监控 | Alice | 2024-12-01 | 开放 |
| 在测试中实施连接泄漏检测 | Bob | 2024-12-05 | 开放 |
| 更新部署策略(避免高峰时段) | Charlie | 2024-11-30 | 开放 |
| 为数据库连接添加断路器 | David | 2024-12-10 | 开放 |

## 经验教训
- 监控差距可能会隐藏关键问题
- 部署时间很重要
- 需要更好的集成测试来检测资源泄漏

稀有度: 非常常见
难度: 困难


分布式系统可靠性

6. 如何确保分布式微服务架构中的可靠性?

回答: 分布式系统引入了独特的可靠性挑战:

关键模式:

1. 用于弹性的服务网格:

# Istio 示例:断路和重试
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: api-service
spec:
  host: api-service
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 50
        http2MaxRequests: 100
        maxRequestsPerConnection: 2
    outlierDetection:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
      minHealthPercent: 40
    loadBalancer:
      simple: LEAST_REQUEST
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: api-service
spec:
  hosts:
  - api-service
  http:
  - retries:
      attempts: 3
      perTryTimeout: 2s
      retryOn: 5xx,reset,connect-failure,refused-stream
    timeout: 10s
    route:
    - destination:
        host: api-service

2. 分布式跟踪:

from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

# 设置跟踪
trace.set_tracer_provider(TracerProvider())
jaeger_exporter = JaegerExporter(
    agent_host_name="jaeger",
    agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

# 仪器化请求库
RequestsInstrumentor().instrument()

# 在代码中使用
tracer = trace.get_tracer(__name__)

def process_order(order_id):
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)
        
        # 调用支付服务
        with tracer.start_as_current_span("call_payment_service"):
            payment_result = call_payment_service(order_id)
        
        # 调用库存服务
        with tracer.start_as_current_span("call_inventory_service"):
            inventory_result = call_inventory_service(order_id)
        
        return combine_results(payment_result, inventory_result)

3. 舱壁模式:

import asyncio
from asyncio import Semaphore

class BulkheadExecutor:
    def __init__(self, max_concurrent=10):
        self.semaphore = Semaphore(max_concurrent)
        self.active_requests = 0
    
    async def execute(self, func, *args, **kwargs):
        """执行具有并发限制的函数"""
        async with self.semaphore:
            self.active_requests += 1
            try:
                result = await func(*args, **kwargs)
                return result
            finally:
                self.active_requests -= 1

# 为不同的服务分隔舱壁
payment_bulkhead = BulkheadExecutor(max_concurrent=20)
inventory_bulkhead = BulkheadExecutor(max_concurrent=10)
notification_bulkhead = BulkheadExecutor(max_concurrent=5)

# 用法
async def process_order(order_id):
    # 每个服务都有隔离的资源池
    payment = await payment_bulkhead.execute(call_payment_service, order_id)
    inventory = await inventory_bulkhead.execute(call_inventory_service, order_id)
    notification = await notification_bulkhead.execute(send_notification, order_id)

4. 用于分布式事务的 Saga 模式:

class SagaOrchestrator:
    def __init__(self):
        self.steps = []
        self.compensations = []
    
    def add_step(self, action, compensation):
        """添加带有补偿的步骤"""
        self.steps.append(action)
        self.compensations.append(compensation)
    
    async def execute(self):
        """执行 Saga,并在失败时自动补偿"""
        completed_steps = []
        
        try:
            for step in self.steps:
                result = await step()
                completed_steps.append(result)
            return completed_steps
            
        except Exception as e:
            # 按相反的顺序补偿
            print(f"Saga 失败: {e}. 开始补偿...")
            for i in range(len(completed_steps) - 1, -1, -1):
                try:
                    await self.compensations[i](completed_steps[i])
                except Exception as comp_error:
                    print(f"补偿失败: {comp_error}")
            raise

# 示例:订单处理 Saga
async def process_order_saga(order_id):
    saga = SagaOrchestrator()
    
    # 步骤 1:预留库存
    saga.

结论

高级 SRE 面试中,最有说服力的回答听起来像真实生产判断,而不是背诵定义。练习说明你如何从用户旅程设定 SLO,如何用错误预算判断发布风险,如何用真实负载数据验证容量,如何运行受控的混沌实验,如何用清晰角色指挥事故,以及如何在不掩盖风险的前提下降低 toil。

面试前准备两到三个具体故事:一次你主导的事故,一次你影响过的可靠性取舍,一次改变团队行为的自动化或可观测性改进。每个故事都要能说清楚信号、决策、取舍和后续改进。

Newsletter subscription

真正有效的每周职业建议

将最新见解直接发送到您的收件箱

您的下一次面试只差一份简历

在几分钟内创建一份专业、优化的简历。无需设计技能——只有经过验证的结果。

创建我的简历

分享这篇文章

让面试回访翻倍

根据职位描述定制简历的候选人获得的面试机会是其他人的2.5倍。使用我们的AI为每一份申请即时自动定制您的简历。