Senior Site Reliability Engineer Interview Questions: Complete Guide

Milad Bonakdar
Author
Master advanced SRE concepts with comprehensive interview questions covering capacity planning, chaos engineering, distributed systems, SLO design, incident leadership, and organizational SRE practices for senior roles.
Introduction
Senior Site Reliability Engineers are expected to architect reliable systems at scale, lead incident responses, drive SRE culture, and make strategic decisions about reliability investments. This role demands deep technical expertise, leadership skills, and the ability to balance reliability with business velocity.
This comprehensive guide covers essential interview questions for senior SREs, focusing on advanced concepts, system design, and organizational impact. Each question includes detailed explanations and practical examples.
Advanced SLO Design
1. How do you design SLIs and SLOs for a new service with limited data?
Answer: Designing SLOs for new services requires balancing ambition with achievability:
Approach:
1. Start with user journey mapping:
# Identify critical user journeys
user_journeys = {
'search_product': {
'steps': ['search_query', 'results_display', 'product_click'],
'criticality': 'high',
'expected_latency': '< 500ms'
},
'checkout': {
'steps': ['add_to_cart', 'payment', 'confirmation'],
'criticality': 'critical',
'expected_latency': '< 2s'
},
'browse_recommendations': {
'steps': ['load_page', 'fetch_recommendations'],
'criticality': 'medium',
'expected_latency': '< 1s'
}
}2. Define SLIs based on user experience:
# SLI Specification
slis:
availability:
description: "Percentage of successful requests"
measurement: "count(http_status < 500) / count(http_requests)"
latency:
description: "95th percentile request latency"
measurement: "histogram_quantile(0.95, http_request_duration_seconds)"
correctness:
description: "Percentage of requests returning correct data"
measurement: "count(validation_passed) / count(requests)"3. Set initial SLOs conservatively:
def calculate_initial_slo(service_type, criticality):
"""
Calculate initial SLO based on service characteristics
"""
base_slos = {
'critical': {
'availability': 0.999, # 99.9%
'latency_p95': 1.0, # 1 second
'latency_p99': 2.0 # 2 seconds
},
'high': {
'availability': 0.995, # 99.5%
'latency_p95': 2.0,
'latency_p99': 5.0
},
'medium': {
'availability': 0.99, # 99%
'latency_p95': 5.0,
'latency_p99': 10.0
}
}
return base_slos.get(criticality, base_slos['medium'])
# Example
checkout_slo = calculate_initial_slo('payment', 'critical')
print(f"Checkout SLO: {checkout_slo}")4. Plan for iteration:
- Start with 4-week measurement window
- Review SLO performance weekly
- Adjust based on actual performance and user feedback
- Tighten SLOs as system matures
5. Document assumptions:
## SLO Assumptions (Initial)
### Availability: 99.9%
- Assumes: Standard cloud infrastructure reliability
- Error budget: 43 minutes/month
- Review: After 3 months of data
### Latency (p95): < 1s
- Assumes: Database queries < 100ms
- Assumes: No complex computations
- Review: If query patterns change
### Dependencies
- External API availability: 99.95%
- Database availability: 99.99%Rarity: Common
Difficulty: Hard
2. How do you handle conflicting SLOs across different user segments?
Answer: Different user segments often have different reliability needs:
Strategy: Multi-tier SLOs
class SLOTier:
def __init__(self, name, availability, latency_p95, latency_p99):
self.name = name
self.availability = availability
self.latency_p95 = latency_p95
self.latency_p99 = latency_p99
self.error_budget = 1 - availability
# Define tiers
tiers = {
'premium': SLOTier(
name='Premium',
availability=0.9999, # 99.99% - 4.3 min/month
latency_p95=0.5,
latency_p99=1.0
),
'standard': SLOTier(
name='Standard',
availability=0.999, # 99.9% - 43 min/month
latency_p95=1.0,
latency_p99=2.0
),
'free': SLOTier(
name='Free',
availability=0.99, # 99% - 7.2 hours/month
latency_p95=2.0,
latency_p99=5.0
)
}
# Route requests based on tier
def get_user_tier(user_id):
# Lookup user's subscription tier
return user_subscription_tier(user_id)
def apply_slo_policy(user_id, request):
tier = get_user_tier(user_id)
slo = tiers[tier]
# Apply tier-specific policies
request.timeout = slo.latency_p99
request.priority = tier # For queue prioritization
request.retry_budget = calculate_retry_budget(slo.error_budget)
return requestImplementation with Traffic Routing:
# Kubernetes example: Separate deployments per tier
apiVersion: apps/v1
kind: Deployment
metadata:
name: api-premium
spec:
replicas: 10
template:
spec:
containers:
- name: api
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
priorityClassName: high-priority
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: api-standard
spec:
replicas: 5
template:
spec:
containers:
- name: api
resources:
requests:
cpu: "1"
memory: "2Gi"Monitoring per tier:
# Availability by tier
sum(rate(http_requests_total{status!~"5.."}[5m])) by (tier)
/
sum(rate(http_requests_total[5m])) by (tier)
# Latency by tier
histogram_quantile(0.95,
rate(http_request_duration_seconds_bucket[5m])
) by (tier)Rarity: Uncommon
Difficulty: Hard
Capacity Planning
3. Walk through your capacity planning process for a rapidly growing service.
Answer: Capacity planning ensures resources meet demand while optimizing costs:
Capacity Planning Framework:
1. Measure baseline:
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
class CapacityPlanner:
def __init__(self, metrics_data):
self.data = pd.DataFrame(metrics_data)
def analyze_trends(self, metric_name, days=30):
"""Analyze historical trends"""
metric_data = self.data[metric_name].tail(days * 24) # Hourly data
# Calculate growth rate
start_value = metric_data.iloc[0]
end_value = metric_data.iloc[-1]
growth_rate = ((end_value - start_value) / start_value) * 100
# Identify peak usage
peak_value = metric_data.max()
peak_time = metric_data.idxmax()
# Calculate percentiles
p50 = metric_data.quantile(0.50)
p95 = metric_data.quantile(0.95)
p99 = metric_data.quantile(0.99)
return {
'growth_rate': growth_rate,
'peak_value': peak_value,
'peak_time': peak_time,
'p50': p50,
'p95': p95,
'p99': p99
}
def forecast_capacity(self, metric_name, months_ahead=3):
"""Forecast future capacity needs"""
# Prepare data
df = self.data[[metric_name]].reset_index()
df['days'] = (df.index / 24).astype(int) # Convert hours to days
# Train model
X = df[['days']].values
y = df[metric_name].values
model = LinearRegression()
model.fit(X, y)
# Forecast
future_days = np.array([[df['days'].max() + (30 * months_ahead)]])
forecast = model.predict(future_days)[0]
# Add safety margin (20%)
forecast_with_margin = forecast * 1.2
return {
'forecast': forecast,
'with_margin': forecast_with_margin,
'current': y[-1],
'growth_factor': forecast / y[-1]
}
def calculate_resource_needs(self, requests_per_second,
requests_per_instance=100,
headroom=0.3):
"""Calculate required instances"""
# Base capacity
base_instances = np.ceil(requests_per_second / requests_per_instance)
# Add headroom for spikes and maintenance
total_instances = np.ceil(base_instances * (1 + headroom))
return {
'base_instances': int(base_instances),
'total_instances': int(total_instances),
'headroom_instances': int(total_instances - base_instances)
}
# Example usage
metrics = {
'requests_per_second': [100, 105, 110, 115, 120, ...], # Historical data
'cpu_usage': [45, 48, 50, 52, 55, ...],
'memory_usage': [60, 62, 65, 67, 70, ...]
}
planner = CapacityPlanner(metrics)
# Analyze trends
trends = planner.analyze_trends('requests_per_second', days=30)
print(f"Growth rate: {trends['growth_rate']:.2f}%")
print(f"Peak RPS: {trends['peak_value']}")
# Forecast capacity
forecast = planner.forecast_capacity('requests_per_second', months_ahead=3)
print(f"Forecasted RPS in 3 months: {forecast['forecast']:.0f}")
print(f"With safety margin: {forecast['with_margin']:.0f}")
# Calculate resource needs
resources = planner.calculate_resource_needs(
requests_per_second=forecast['with_margin'],
requests_per_instance=100,
headroom=0.3
)
print(f"Required instances: {resources['total_instances']}")2. Consider growth drivers:
- User growth rate
- Feature launches
- Seasonal patterns
- Marketing campaigns
- Geographic expansion
3. Plan for headroom:
- N+1: Survive one instance failure
- N+2: Survive two failures or one zone outage
- Traffic spikes: 2-3x normal capacity
- Maintenance windows: 20-30% overhead
4. Cost optimization:
def optimize_instance_mix(workload_profile):
"""
Optimize instance types for cost
"""
# Mix of instance types
instance_types = {
'on_demand': {
'cost_per_hour': 0.10,
'reliability': 1.0,
'percentage': 0.3 # 30% on-demand for baseline
},
'spot': {
'cost_per_hour': 0.03,
'reliability': 0.95,
'percentage': 0.5 # 50% spot for cost savings
},
'reserved': {
'cost_per_hour': 0.06,
'reliability': 1.0,
'percentage': 0.2 # 20% reserved for predictable load
}
}
total_instances = workload_profile['total_instances']
allocation = {}
for instance_type, config in instance_types.items():
count = int(total_instances * config['percentage'])
allocation[instance_type] = {
'count': count,
'monthly_cost': count * config['cost_per_hour'] * 730
}
return allocationRarity: Very Common
Difficulty: Hard
Chaos Engineering
4. How do you implement chaos engineering in production?
Answer: Chaos engineering proactively tests system resilience by injecting failures:
Chaos Engineering Principles:
- Build hypothesis around steady state
- Vary real-world events
- Run experiments in production
- Automate experiments
- Minimize blast radius
Implementation:
# Chaos experiment framework
from dataclasses import dataclass
from enum import Enum
import random
import time
class ExperimentStatus(Enum):
PLANNED = "planned"
RUNNING = "running"
COMPLETED = "completed"
ABORTED = "aborted"
@dataclass
class ChaosExperiment:
name: str
hypothesis: str
blast_radius: float # Percentage of traffic affected
duration_seconds: int
rollback_criteria: dict
def __post_init__(self):
self.status = ExperimentStatus.PLANNED
self.metrics_before = {}
self.metrics_during = {}
self.metrics_after = {}
class ChaosRunner:
def __init__(self, monitoring_client):
self.monitoring = monitoring_client
self.experiments = []
def run_experiment(self, experiment: ChaosExperiment):
"""Execute chaos experiment with safety checks"""
print(f"Starting experiment: {experiment.name}")
# 1. Measure baseline
experiment.metrics_before = self.measure_metrics()
print(f"Baseline metrics: {experiment.metrics_before}")
# 2. Verify system is healthy
if not self.is_system_healthy(experiment.metrics_before):
print("System unhealthy, aborting experiment")
return False
# 3. Inject failure
experiment.status = ExperimentStatus.RUNNING
failure_injection = self.inject_failure(experiment)
try:
# 4. Monitor during experiment
start_time = time.time()
while time.time() - start_time < experiment.duration_seconds:
experiment.metrics_during = self.measure_metrics()
# Check rollback criteria
if self.should_rollback(experiment):
print("Rollback criteria met, stopping experiment")
self.rollback(failure_injection)
experiment.status = ExperimentStatus.ABORTED
return False
time.sleep(10) # Check every 10 seconds
# 5. Rollback failure injection
self.rollback(failure_injection)
# 6. Measure recovery
time.sleep(60) # Wait for system to stabilize
experiment.metrics_after = self.measure_metrics()
# 7. Analyze results
experiment.status = ExperimentStatus.COMPLETED
return self.analyze_results(experiment)
except Exception as e:
print(f"Experiment failed: {e}")
self.rollback(failure_injection)
experiment.status = ExperimentStatus.ABORTED
return False
def inject_failure(self, experiment):
"""Inject specific failure type"""
# Implementation depends on failure type
pass
def measure_metrics(self):
"""Measure key system metrics"""
return {
'error_rate': self.monitoring.get_error_rate(),
'latency_p95': self.monitoring.get_latency_p95(),
'requests_per_second': self.monitoring.get_rps(),
'availability': self.monitoring.get_availability()
}
def is_system_healthy(self, metrics):
"""Check if system meets SLOs"""
return (
metrics['error_rate'] < 0.01 and # < 1% errors
metrics['latency_p95'] < 1.0 and # < 1s latency
metrics['availability'] > 0.999 # > 99.9% available
)
def should_rollback(self, experiment):
"""Check if experiment should be aborted"""
current = experiment.metrics_during
criteria = experiment.rollback_criteria
return (
current['error_rate'] > criteria.get('max_error_rate', 0.05) or
current['latency_p95'] > criteria.get('max_latency', 5.0) or
current['availability'] < criteria.get('min_availability', 0.99)
)
def rollback(self, failure_injection):
"""Remove failure injection"""
print("Rolling back failure injection")
# Implementation depends on failure type
def analyze_results(self, experiment):
"""Analyze experiment results"""
before = experiment.metrics_before
during = experiment.metrics_during
after = experiment.metrics_after
print(f"\nExperiment Results: {experiment.name}")
print(f"Hypothesis: {experiment.hypothesis}")
print(f"\nMetrics:")
print(f" Before: {before}")
print(f" During: {during}")
print(f" After: {after}")
# Determine if hypothesis was validated
hypothesis_validated = (
during['availability'] >= experiment.rollback_criteria['min_availability']
)
return hypothesis_validated
# Example experiment
experiment = ChaosExperiment(
name="Database Failover Test",
hypothesis="System remains available during database failover",
blast_radius=0.1, # 10% of traffic
duration_seconds=300, # 5 minutes
rollback_criteria={
'max_error_rate': 0.05,
'max_latency': 5.0,
'min_availability': 0.99
}
)Common Chaos Experiments:
1. Network Latency:
# Using tc (traffic control) to add latency
tc qdisc add dev eth0 root netem delay 100ms 20ms
# Rollback
tc qdisc del dev eth0 root2. Pod Failure (Kubernetes):
# Kill random pods
kubectl delete pod -l app=myapp --random=1
# Using Chaos Mesh
kubectl apply -f - <<EOF
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: pod-failure
spec:
action: pod-failure
mode: one
selector:
namespaces:
- production
labelSelectors:
app: myapp
duration: "30s"
EOF3. Resource Exhaustion:
# CPU stress
stress-ng --cpu 4 --timeout 60s
# Memory stress
stress-ng --vm 2 --vm-bytes 2G --timeout 60sRarity: Common
Difficulty: Hard
Incident Leadership
5. How do you lead a high-severity incident from detection through postmortem?
Answer: Senior SREs often serve as incident commanders for critical outages:
Incident Command Structure:
Incident Commander Responsibilities:
1. Initial Response (0-5 minutes):
## Incident Commander Checklist
### Immediate Actions
- [ ] Acknowledge incident
- [ ] Assess severity (SEV-1, SEV-2, SEV-3)
- [ ] Create incident channel (#incident-YYYY-MM-DD-NNN)
- [ ] Page appropriate teams
- [ ] Designate roles:
- [ ] Technical Lead
- [ ] Communications Lead
- [ ] Scribe (document timeline)
### Severity Assessment
**SEV-1 (Critical):**
- Complete service outage
- Data loss
- Security breach
- > 50% users affected
**SEV-2 (High):**
- Partial outage
- Degraded performance
- 10-50% users affected
**SEV-3 (Medium):**
- Minor degradation
- < 10% users affected
- Workaround available2. Investigation Phase:
class IncidentCommander:
def __init__(self, incident_id):
self.incident_id = incident_id
self.timeline = []
self.status_updates = []
self.action_items = []
def coordinate_investigation(self):
"""Coordinate technical investigation"""
# Parallel investigation tracks
tracks = {
'recent_changes': self.check_recent_deployments(),
'infrastructure': self.check_infrastructure_health(),
'dependencies': self.check_external_dependencies(),
'metrics': self.analyze_metrics_anomalies()
}
return tracks
def make_decision(self, options, deadline_minutes=5):
"""Make time-boxed decision"""
print(f"Decision needed within {deadline_minutes} minutes")
print(f"Options: {options}")
# Gather input from technical leads
# Make decision based on:
# - User impact
# - Risk of each option
# - Time to implement
# - Reversibility
return selected_option
def communicate_status(self, interval_minutes=15):
"""Regular status updates"""
update = {
'timestamp': datetime.now(),
'status': self.get_current_status(),
'impact': self.assess_user_impact(),
'eta': self.estimate_resolution_time(),
'next_update': datetime.now() + timedelta(minutes=interval_minutes)
}
# Send to stakeholders
self.send_status_update(update)
self.status_updates.append(update)
def log_timeline_event(self, event, timestamp=None):
"""Document incident timeline"""
self.timeline.append({
'timestamp': timestamp or datetime.now(),
'event': event,
'logged_by': self.get_current_user()
})3. Mitigation Strategies:
def evaluate_mitigation_options():
"""Evaluate and prioritize mitigation options"""
options = [
{
'action': 'Rollback deployment',
'time_to_implement': 5, # minutes
'risk': 'low',
'effectiveness': 'high',
'reversible': True
},
{
'action': 'Scale up resources',
'time_to_implement': 2,
'risk': 'low',
'effectiveness': 'medium',
'reversible': True
},
{
'action': 'Disable feature flag',
'time_to_implement': 1,
'risk': 'low',
'effectiveness': 'high',
'reversible': True
},
{
'action': 'Database failover',
'time_to_implement': 10,
'risk': 'medium',
'effectiveness': 'high',
'reversible': False
}
]
# Sort by: low risk, high effectiveness, fast implementation
sorted_options = sorted(
options,
key=lambda x: (
x['risk'] == 'low',
x['effectiveness'] == 'high',
-x['time_to_implement']
),
reverse=True
)
return sorted_options4. Postmortem (Blameless):
# Incident Postmortem: API Outage
**Date:** 2024-11-25
**Duration:** 45 minutes
**Severity:** SEV-1
**Incident Commander:** Alice
**Technical Lead:** Bob
## Executive Summary
Complete API outage affecting all users due to database connection pool exhaustion.
## Impact
- **Users affected:** 100% (all users)
- **Duration:** 45 minutes
- **Revenue impact:** ~$50,000
- **SLO impact:** Consumed 75% of monthly error budget
## Root Cause
Database connection pool exhausted due to connection leak in new feature deployed 2 hours before incident.
## Timeline
| Time | Event |
|------|-------|
| 14:00 | Deployment of v2.5.0 |
| 15:45 | First alerts for increased latency |
| 15:50 | Complete API outage |
| 15:52 | Incident declared (SEV-1) |
| 15:55 | Identified database as bottleneck |
| 16:05 | Decision to rollback deployment |
| 16:15 | Rollback completed |
| 16:20 | Service recovering |
| 16:35 | Full recovery confirmed |
## What Went Well
- Quick detection (5 minutes from first symptom)
- Clear incident command structure
- Fast decision to rollback
- Good communication with stakeholders
## What Went Wrong
- Connection leak not caught in testing
- No connection pool monitoring
- Deployment during peak hours
## Action Items
| Action | Owner | Due Date | Status |
|--------|-------|----------|--------|
| Add connection pool monitoring | Alice | 2024-12-01 | Open |
| Implement connection leak detection in tests | Bob | 2024-12-05 | Open |
| Update deployment policy (avoid peak hours) | Charlie | 2024-11-30 | Open |
| Add circuit breaker for database connections | David | 2024-12-10 | Open |
## Lessons Learned
- Monitoring gaps can hide critical issues
- Deployment timing matters
- Need better integration testing for resource leaksRarity: Very Common
Difficulty: Hard
Distributed Systems Reliability
6. How do you ensure reliability in a distributed microservices architecture?
Answer: Distributed systems introduce unique reliability challenges:
Key Patterns:
1. Service Mesh for Resilience:
# Istio example: Circuit breaking and retries
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: api-service
spec:
host: api-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 50
http2MaxRequests: 100
maxRequestsPerConnection: 2
outlierDetection:
consecutiveErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
minHealthPercent: 40
loadBalancer:
simple: LEAST_REQUEST
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: api-service
spec:
hosts:
- api-service
http:
- retries:
attempts: 3
perTryTimeout: 2s
retryOn: 5xx,reset,connect-failure,refused-stream
timeout: 10s
route:
- destination:
host: api-service2. Distributed Tracing:
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# Setup tracing
trace.set_tracer_provider(TracerProvider())
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# Instrument requests library
RequestsInstrumentor().instrument()
# Use in code
tracer = trace.get_tracer(__name__)
def process_order(order_id):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
# Call payment service
with tracer.start_as_current_span("call_payment_service"):
payment_result = call_payment_service(order_id)
# Call inventory service
with tracer.start_as_current_span("call_inventory_service"):
inventory_result = call_inventory_service(order_id)
return combine_results(payment_result, inventory_result)3. Bulkhead Pattern:
import asyncio
from asyncio import Semaphore
class BulkheadExecutor:
def __init__(self, max_concurrent=10):
self.semaphore = Semaphore(max_concurrent)
self.active_requests = 0
async def execute(self, func, *args, **kwargs):
"""Execute function with concurrency limit"""
async with self.semaphore:
self.active_requests += 1
try:
result = await func(*args, **kwargs)
return result
finally:
self.active_requests -= 1
# Separate bulkheads for different services
payment_bulkhead = BulkheadExecutor(max_concurrent=20)
inventory_bulkhead = BulkheadExecutor(max_concurrent=10)
notification_bulkhead = BulkheadExecutor(max_concurrent=5)
# Usage
async def process_order(order_id):
# Each service has isolated resource pool
payment = await payment_bulkhead.execute(call_payment_service, order_id)
inventory = await inventory_bulkhead.execute(call_inventory_service, order_id)
notification = await notification_bulkhead.execute(send_notification, order_id)4. Saga Pattern for Distributed Transactions:
class SagaOrchestrator:
def __init__(self):
self.steps = []
self.compensations = []
def add_step(self, action, compensation):
"""Add step with compensation"""
self.steps.append(action)
self.compensations.append(compensation)
async def execute(self):
"""Execute saga with automatic compensation on failure"""
completed_steps = []
try:
for step in self.steps:
result = await step()
completed_steps.append(result)
return completed_steps
except Exception as e:
# Compensate in reverse order
print(f"Saga failed: {e}. Starting compensation...")
for i in range(len(completed_steps) - 1, -1, -1):
try:
await self.compensations[i](completed_steps[i])
except Exception as comp_error:
print(f"Compensation failed: {comp_error}")
raise
# Example: Order processing saga
async def process_order_saga(order_id):
saga = SagaOrchestrator()
# Step 1: Reserve inventory
saga.add_step(
action=lambda: reserve_inventory(order_id),
compensation=lambda result: release_inventory(result['reservation_id'])
)
# Step 2: Process payment
saga.add_step(
action=lambda: process_payment(order_id),
compensation=lambda result: refund_payment(result['transaction_id'])
)
# Step 3: Create shipment
saga.add_step(
action=lambda: create_shipment(order_id),
compensation=lambda result: cancel_shipment(result['shipment_id'])
)
return await saga.execute()Rarity: Common
Difficulty: Hard
Performance Optimization
7. How do you approach performance optimization for a production system?
Answer: Systematic performance optimization requires data-driven decisions:
Performance Optimization Framework:
1. Establish Baseline:
import time
import statistics
from dataclasses import dataclass
from typing import List
@dataclass
class PerformanceMetrics:
latency_p50: float
latency_p95: float
latency_p99: float
throughput_rps: float
error_rate: float
cpu_usage: float
memory_usage: float
class PerformanceProfiler:
def __init__(self):
self.measurements = []
def measure_endpoint(self, endpoint_func, iterations=1000):
"""Measure endpoint performance"""
latencies = []
errors = 0
start_time = time.time()
for _ in range(iterations):
request_start = time.time()
try:
endpoint_func()
latency = (time.time() - request_start) * 1000 # ms
latencies.append(latency)
except Exception as e:
errors += 1
duration = time.time() - start_time
return PerformanceMetrics(
latency_p50=statistics.quantiles(latencies, n=100)[49],
latency_p95=statistics.quantiles(latencies, n=100)[94],
latency_p99=statistics.quantiles(latencies, n=100)[98],
throughput_rps=iterations / duration,
error_rate=errors / iterations,
cpu_usage=self.get_cpu_usage(),
memory_usage=self.get_memory_usage()
)
def compare_performance(self, before: PerformanceMetrics,
after: PerformanceMetrics):
"""Compare performance improvements"""
improvements = {
'latency_p95': ((before.latency_p95 - after.latency_p95) /
before.latency_p95 * 100),
'throughput': ((after.throughput_rps - before.throughput_rps) /
before.throughput_rps * 100),
'error_rate': ((before.error_rate - after.error_rate) /
before.error_rate * 100) if before.error_rate > 0 else 0
}
print(f"Performance Improvements:")
print(f" Latency (p95): {improvements['latency_p95']:.1f}% faster")
print(f" Throughput: {improvements['throughput']:.1f}% increase")
print(f" Error Rate: {improvements['error_rate']:.1f}% reduction")
return improvements2. Identify Bottlenecks:
import cProfile
import pstats
from functools import wraps
def profile_function(func):
"""Decorator to profile function performance"""
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 functions
return result
return wrapper
# Database query optimization
def analyze_slow_queries():
"""Identify and optimize slow database queries"""
slow_queries = """
SELECT
query,
calls,
total_time,
mean_time,
max_time
FROM pg_stat_statements
WHERE mean_time > 100 -- queries averaging > 100ms
ORDER BY total_time DESC
LIMIT 20;
"""
# Analysis steps:
# 1. Add indexes for frequently filtered columns
# 2. Optimize JOIN operations
# 3. Use EXPLAIN ANALYZE to understand query plans
# 4. Consider query result caching
# 5. Denormalize if necessary
# Example optimization
@profile_function
def optimized_user_search(query):
"""Optimized user search with caching and indexing"""
# Check cache first
cache_key = f"user_search:{query}"
cached_result = redis_client.get(cache_key)
if cached_result:
return cached_result
# Use indexed query
results = db.execute("""
SELECT id, name, email
FROM users
WHERE search_vector @@ to_tsquery(%s)
LIMIT 100
""", (query,))
# Cache results
redis_client.setex(cache_key, 300, results) # 5 min TTL
return results3. Common Optimizations:
Database:
-- Add indexes
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_created ON orders(user_id, created_at);
-- Use connection pooling
-- Implement read replicas for read-heavy workloads
-- Use materialized views for complex aggregations
-- Example: Materialized view for dashboard
CREATE MATERIALIZED VIEW daily_stats AS
SELECT
date_trunc('day', created_at) as day,
COUNT(*) as order_count,
SUM(total) as revenue
FROM orders
GROUP BY day;
-- Refresh periodically
REFRESH MATERIALIZED VIEW CONCURRENTLY daily_stats;Caching Strategy:
from functools import lru_cache
import redis
import pickle
class CacheStrategy:
def __init__(self):
self.redis_client = redis.Redis()
self.local_cache = {}
def multi_level_cache(self, key, fetch_func, ttl=300):
"""Multi-level caching: memory -> Redis -> database"""
# Level 1: In-memory cache
if key in self.local_cache:
return self.local_cache[key]
# Level 2: Redis cache
cached = self.redis_client.get(key)
if cached:
value = pickle.loads(cached)
self.local_cache[key] = value
return value
# Level 3: Fetch from source
value = fetch_func()
# Populate caches
self.redis_client.setex(key, ttl, pickle.dumps(value))
self.local_cache[key] = value
return value
@lru_cache(maxsize=1000)
def cached_computation(self, input_data):
"""Cache expensive computations"""
return expensive_computation(input_data)Async Processing:
import asyncio
import aiohttp
async def fetch_multiple_apis(urls):
"""Fetch multiple APIs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
# Before: Sequential (slow)
def sequential_fetch(urls):
results = []
for url in urls:
results.append(requests.get(url).json())
return results # Takes N * avg_latency
# After: Concurrent (fast)
async def concurrent_fetch(urls):
return await fetch_multiple_apis(urls) # Takes max(latencies)4. Load Testing:
# Using Locust for load testing
from locust import HttpUser, task, between
class APIUser(HttpUser):
wait_time = between(1, 3)
@task(3) # Weight: 3x more common
def get_products(self):
self.client.get("/api/products")
@task(1)
def create_order(self):
self.client.post("/api/orders", json={
"product_id": 123,
"quantity": 1
})
def on_start(self):
"""Login once per user"""
self.client.post("/api/login", json={
"username": "test",
"password": "test"
})
# Run: locust -f loadtest.py --host=https://api.example.com
# Test with: 100 users, 10 users/sec spawn rate5. Monitoring Performance:
# Track latency improvements
histogram_quantile(0.95,
rate(http_request_duration_seconds_bucket[5m])
)
# Monitor cache hit rate
sum(rate(cache_hits_total[5m])) /
sum(rate(cache_requests_total[5m]))
# Database query performance
rate(pg_stat_statements_total_time[5m]) /
rate(pg_stat_statements_calls[5m])Rarity: Very Common
Difficulty: Hard
On-Call & Operational Excellence
8. How do you design an effective on-call rotation and reduce on-call burden?
Answer: Sustainable on-call requires balancing coverage with engineer wellbeing:
On-Call Structure:
1. Rotation Design:
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List
@dataclass
class OnCallShift:
engineer: str
start_time: datetime
end_time: datetime
shift_type: str # 'primary', 'secondary', 'escalation'
class OnCallScheduler:
def __init__(self, engineers: List[str]):
self.engineers = engineers
self.shifts = []
def create_rotation(self, weeks=4):
"""Create balanced on-call rotation"""
shift_duration = timedelta(days=7) # Weekly rotation
current_date = datetime.now()
for week in range(weeks):
# Rotate through engineers
primary = self.engineers[week % len(self.engineers)]
secondary = self.engineers[(week + 1) % len(self.engineers)]
shift_start = current_date + (week * shift_duration)
shift_end = shift_start + shift_duration
self.shifts.append(OnCallShift(
engineer=primary,
start_time=shift_start,
end_time=shift_end,
shift_type='primary'
))
self.shifts.append(OnCallShift(
engineer=secondary,
start_time=shift_start,
end_time=shift_end,
shift_type='secondary'
))
return self.shifts
def calculate_on_call_load(self, engineer: str, days=30):
"""Calculate on-call burden per engineer"""
total_hours = 0
incident_count = 0
for shift in self.shifts:
if shift.engineer == engineer:
hours = (shift.end_time - shift.start_time).total_seconds() / 3600
total_hours += hours
incident_count += self.get_incidents_during_shift(shift)
return {
'engineer': engineer,
'total_hours': total_hours,
'incident_count': incident_count,
'avg_incidents_per_week': incident_count / (days / 7),
'burnout_risk': 'high' if incident_count > 10 else 'normal'
}2. Reduce Alert Fatigue:
# Alert severity classification
alerts:
critical: # Page immediately
- service_down
- data_loss
- security_breach
- slo_burn_rate_critical
warning: # Ticket during business hours
- high_latency
- elevated_error_rate
- disk_space_low
info: # Dashboard only
- deployment_completed
- cache_miss_rate_elevated
- background_job_slow
# Alert tuning principles
alert_best_practices:
- name: "Avoid flapping"
rule: "Use 'for: 5m' to require sustained condition"
- name: "Alert on symptoms, not causes"
bad: "High CPU usage"
good: "User-facing latency > SLO"
- name: "Include runbook link"
example: "annotations.runbook: https://wiki/runbooks/high-latency"
- name: "Make alerts actionable"
bad: "Database slow"
good: "Database connection pool 90% full - scale or investigate slow queries"3. Incident Response Automation:
class IncidentAutomation:
def __init__(self):
self.slack_client = SlackClient()
self.pagerduty_client = PagerDutyClient()
def auto_create_incident(self, alert):
"""Automatically create incident from alert"""
# 1. Create incident channel
channel = self.slack_client.create_channel(
name=f"incident-{alert['id']}",
topic=alert['summary']
)
# 2. Page on-call engineer
on_call = self.pagerduty_client.get_on_call('production')
self.pagerduty_client.create_incident(
title=alert['summary'],
service='production-api',
assigned_to=on_call
)
# 3. Post initial context
self.slack_client.post_message(
channel=channel,
message=f"""
🚨 **Incident Created**
**Alert:** {alert['name']}
**Severity:** {alert['severity']}
**On-Call:** @{on_call}
**Quick Links:**
- [Runbook]({alert['runbook_url']})
- [Dashboard]({alert['dashboard_url']})
- [Logs]({alert['logs_url']})
**Next Steps:**
1. Acknowledge incident
2. Assess impact
3. Begin investigation
"""
)
# 4. Auto-gather diagnostics
diagnostics = self.gather_diagnostics(alert)
self.slack_client.post_message(
channel=channel,
message=f"**Auto-Diagnostics:**\n```{diagnostics}```"
)
def gather_diagnostics(self, alert):
"""Automatically gather relevant diagnostics"""
return {
'recent_deployments': self.get_recent_deployments(),
'error_rate': self.get_current_error_rate(),
'latency_p95': self.get_current_latency(),
'active_alerts': self.get_active_alerts(),
'resource_usage': self.get_resource_usage()
}4. On-Call Metrics:
def calculate_on_call_metrics(team, period_days=30):
"""Track on-call health metrics"""
metrics = {
'total_incidents': count_incidents(period_days),
'mttr': calculate_mttr(period_days), # Mean time to resolution
'incidents_per_week': count_incidents(period_days) / (period_days / 7),
'after_hours_incidents': count_after_hours_incidents(period_days),
'false_positive_rate': count_false_positives(period_days) / count_incidents(period_days),
'auto_resolved': count_auto_resolved(period_days),
'escalations': count_escalations(period_days)
}
# Health indicators
health = {
'alert_quality': 1 - metrics['false_positive_rate'],
'automation_effectiveness': metrics['auto_resolved'] / metrics['total_incidents'],
'on_call_burden': 'high' if metrics['after_hours_incidents'] > 5 else 'normal'
}
return metrics, health5. Improve On-Call Experience:
- Reduce toil: Automate common responses
- Better runbooks: Clear, tested procedures
- Post-incident reviews: Learn and improve
- Compensation: Time off after incidents
- Rotation fairness: Balance load across team
- Escalation paths: Clear escalation procedures
Rarity: Very Common
Difficulty: Hard
SRE Metrics & Organizational Impact
9. How do you measure and demonstrate the value of SRE work?
Answer: Quantifying SRE impact requires tracking both technical and business metrics:
SRE Success Metrics:
1. Reliability Metrics:
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class ReliabilityMetrics:
# Service Level Indicators
availability: float # 99.9%
latency_p95: float # milliseconds
error_rate: float # percentage
# Incident Metrics
mttr: float # Mean time to resolution (minutes)
mtbf: float # Mean time between failures (hours)
incident_count: int
# Error Budget
error_budget_remaining: float # percentage
error_budget_burn_rate: float # per day
class SREMetricsCalculator:
def calculate_quarterly_metrics(self, quarter_start, quarter_end):
"""Calculate SRE metrics for quarterly review"""
# Reliability improvements
reliability = {
'availability_improvement': self.compare_availability(
quarter_start, quarter_end
),
'latency_improvement': self.compare_latency(
quarter_start, quarter_end
),
'error_rate_reduction': self.compare_error_rate(
quarter_start, quarter_end
)
}
# Operational efficiency
efficiency = {
'mttr_improvement': self.compare_mttr(
quarter_start, quarter_end
),
'incident_reduction': self.compare_incident_count(
quarter_start, quarter_end
),
'toil_reduction': self.calculate_toil_reduction(
quarter_start, quarter_end
),
'automation_coverage': self.calculate_automation_coverage()
}
# Business impact
business_impact = {
'revenue_protected': self.calculate_revenue_protected(),
'cost_savings': self.calculate_cost_savings(),
'deployment_frequency': self.calculate_deployment_frequency(),
'change_failure_rate': self.calculate_change_failure_rate()
}
return {
'reliability': reliability,
'efficiency': efficiency,
'business_impact': business_impact
}
def calculate_revenue_protected(self):
"""Calculate revenue protected by reliability improvements"""
# Revenue per minute of uptime
revenue_per_minute = 1000 # $1000/min
# Downtime prevented
baseline_downtime = 100 # minutes (previous quarter)
current_downtime = 20 # minutes (current quarter)
downtime_prevented = baseline_downtime - current_downtime
revenue_protected = downtime_prevented * revenue_per_minute
return {
'downtime_prevented_minutes': downtime_prevented,
'revenue_protected': revenue_protected,
'availability_improvement': (
(downtime_prevented / baseline_downtime) * 100
)
}
def calculate_toil_reduction(self, start_date, end_date):
"""Calculate time saved through automation"""
# Manual tasks automated
automated_tasks = [
{'name': 'Service restarts', 'time_saved_per_week': 5},
{'name': 'Log analysis', 'time_saved_per_week': 10},
{'name': 'Deployment rollbacks', 'time_saved_per_week': 3},
{'name': 'Certificate rotation', 'time_saved_per_week': 2}
]
weeks = (end_date - start_date).days / 7
total_hours_saved = sum(
task['time_saved_per_week'] * weeks
for task in automated_tasks
)
# Convert to engineering capacity
engineer_cost_per_hour = 100
cost_savings = total_hours_saved * engineer_cost_per_hour
return {
'hours_saved': total_hours_saved,
'cost_savings': cost_savings,
'tasks_automated': len(automated_tasks),
'engineering_capacity_freed': total_hours_saved / 40 # weeks
}2. DORA Metrics (DevOps Research and Assessment):
class DORAMetrics:
"""Track DORA 4 key metrics"""
def deployment_frequency(self, days=30):
"""How often we deploy to production"""
deployments = self.get_deployments(days)
return len(deployments) / days
def lead_time_for_changes(self, days=30):
"""Time from commit to production"""
changes = self.get_changes(days)
lead_times = [
(c.deployed_at - c.committed_at).total_seconds() / 3600
for c in changes
]
return statistics.median(lead_times)
def change_failure_rate(self, days=30):
"""Percentage of deployments causing incidents"""
deployments = self.get_deployments(days)
failed_deployments = [
d for d in deployments
if self.caused_incident(d)
]
return len(failed_deployments) / len(deployments)
def time_to_restore_service(self, days=30):
"""MTTR - how quickly we recover from incidents"""
incidents = self.get_incidents(days)
resolution_times = [
(i.resolved_at - i.started_at).total_seconds() / 60
for i in incidents
]
return statistics.median(resolution_times)
def get_dora_level(self):
"""Classify team performance (Elite/High/Medium/Low)"""
df = self.deployment_frequency(30)
lt = self.lead_time_for_changes(30)
cfr = self.change_failure_rate(30)
mttr = self.time_to_restore_service(30)
# Elite performance criteria
if (df > 1 and # Multiple deploys per day
lt < 1 and # < 1 hour lead time
cfr < 0.15 and # < 15% failure rate
mttr < 60): # < 1 hour MTTR
return "Elite"
# High performance
elif (df > 0.14 and # Weekly deploys
lt < 24 and # < 1 day lead time
cfr < 0.20 and # < 20% failure rate
mttr < 1440): # < 1 day MTTR
return "High"
# Medium performance
elif (df > 0.03 and # Monthly deploys
lt < 168 and # < 1 week lead time
cfr < 0.30 and # < 30% failure rate
mttr < 10080): # < 1 week MTTR
return "Medium"
else:
return "Low"3. Executive Dashboard:
def generate_executive_report(quarter):
"""Generate executive-friendly SRE report"""
report = {
'headline_metrics': {
'availability': '99.95%',
'availability_vs_target': '+0.05%',
'incidents': 12,
'incidents_vs_last_quarter': '-40%',
'mttr': '15 minutes',
'mttr_vs_last_quarter': '-50%'
},
'business_impact': {
'revenue_protected': '$500,000',
'cost_savings': '$200,000',
'engineering_capacity_freed': '8 engineer-weeks',
'customer_satisfaction': '+15%'
},
'key_achievements': [
'Automated incident response (50% faster resolution)',
'Implemented chaos engineering (prevented 3 major outages)',
'Reduced deployment time from 2 hours to 15 minutes',
'Achieved 99.95% availability (exceeded 99.9% SLO)'
],
'investments': [
'Observability platform: $50k',
'Chaos engineering tools: $20k',
'Training and certifications: $15k'
],
'roi': {
'total_investment': '$85,000',
'total_value': '$700,000',
'roi_multiple': '8.2x'
}
}
return report4. Tracking Toil:
def track_toil_percentage(team_members, weeks=4):
"""Ensure toil stays below 50% per SRE principle"""
toil_data = {}
for engineer in team_members:
hours = {
'total_hours': 40 * weeks,
'toil_hours': 0,
'project_hours': 0,
'on_call_hours': 0
}
# Categorize activities
activities = get_engineer_activities(engineer, weeks)
for activity in activities:
if activity.is_toil():
hours['toil_hours'] += activity.duration
elif activity.is_on_call():
hours['on_call_hours'] += activity.duration
else:
hours['project_hours'] += activity.duration
toil_percentage = (hours['toil_hours'] / hours['total_hours']) * 100
toil_data[engineer] = {
'toil_percentage': toil_percentage,
'status': 'healthy' if toil_percentage < 50 else 'over_limit',
'breakdown': hours
}
return toil_dataKey Metrics to Track:
- Reliability: Availability, latency, error rate
- Efficiency: MTTR, deployment frequency, toil percentage
- Business: Revenue protected, cost savings, customer satisfaction
- Team Health: On-call burden, burnout indicators, toil levels
Rarity: Common
Difficulty: Hard
Conclusion
Senior SRE interviews require deep technical expertise, leadership experience, and strategic thinking. Key areas to master:
- Advanced SLO Design: Multi-tier SLOs, new service SLOs, balancing reliability and velocity
- Capacity Planning: Forecasting, resource optimization, cost management
- Chaos Engineering: Production testing, blast radius control, automated experiments
- Incident Leadership: Command structure, decision-making, blameless postmortems
- Distributed Systems: Service mesh, tracing, resilience patterns, saga transactions
- Organizational Impact: Driving SRE culture, mentoring, strategic planning
Focus on real-world experience, architectural decisions, and demonstrating how you've improved reliability at scale. Be prepared to discuss trade-offs, failures you've learned from, and your approach to building reliable systems. Good luck!



