November 25, 2025
25 min read

Senior Site Reliability Engineer Interview Questions: Complete Guide

interview
career-advice
job-search
Senior Site Reliability Engineer Interview Questions: Complete Guide
Milad Bonakdar

Milad Bonakdar

Author

Master advanced SRE concepts with comprehensive interview questions covering capacity planning, chaos engineering, distributed systems, SLO design, incident leadership, and organizational SRE practices for senior roles.


Introduction

Senior Site Reliability Engineers are expected to architect reliable systems at scale, lead incident responses, drive SRE culture, and make strategic decisions about reliability investments. This role demands deep technical expertise, leadership skills, and the ability to balance reliability with business velocity.

This comprehensive guide covers essential interview questions for senior SREs, focusing on advanced concepts, system design, and organizational impact. Each question includes detailed explanations and practical examples.


Advanced SLO Design

1. How do you design SLIs and SLOs for a new service with limited data?

Answer: Designing SLOs for new services requires balancing ambition with achievability:

Approach:

1. Start with user journey mapping:

# Identify critical user journeys
user_journeys = {
    'search_product': {
        'steps': ['search_query', 'results_display', 'product_click'],
        'criticality': 'high',
        'expected_latency': '< 500ms'
    },
    'checkout': {
        'steps': ['add_to_cart', 'payment', 'confirmation'],
        'criticality': 'critical',
        'expected_latency': '< 2s'
    },
    'browse_recommendations': {
        'steps': ['load_page', 'fetch_recommendations'],
        'criticality': 'medium',
        'expected_latency': '< 1s'
    }
}

2. Define SLIs based on user experience:

# SLI Specification
slis:
  availability:
    description: "Percentage of successful requests"
    measurement: "count(http_status < 500) / count(http_requests)"
    
  latency:
    description: "95th percentile request latency"
    measurement: "histogram_quantile(0.95, http_request_duration_seconds)"
    
  correctness:
    description: "Percentage of requests returning correct data"
    measurement: "count(validation_passed) / count(requests)"

3. Set initial SLOs conservatively:

def calculate_initial_slo(service_type, criticality):
    """
    Calculate initial SLO based on service characteristics
    """
    base_slos = {
        'critical': {
            'availability': 0.999,  # 99.9%
            'latency_p95': 1.0,     # 1 second
            'latency_p99': 2.0      # 2 seconds
        },
        'high': {
            'availability': 0.995,  # 99.5%
            'latency_p95': 2.0,
            'latency_p99': 5.0
        },
        'medium': {
            'availability': 0.99,   # 99%
            'latency_p95': 5.0,
            'latency_p99': 10.0
        }
    }
    
    return base_slos.get(criticality, base_slos['medium'])

# Example
checkout_slo = calculate_initial_slo('payment', 'critical')
print(f"Checkout SLO: {checkout_slo}")

4. Plan for iteration:

  • Start with 4-week measurement window
  • Review SLO performance weekly
  • Adjust based on actual performance and user feedback
  • Tighten SLOs as system matures

5. Document assumptions:

## SLO Assumptions (Initial)

### Availability: 99.9%
- Assumes: Standard cloud infrastructure reliability
- Error budget: 43 minutes/month
- Review: After 3 months of data

### Latency (p95): < 1s
- Assumes: Database queries < 100ms
- Assumes: No complex computations
- Review: If query patterns change

### Dependencies
- External API availability: 99.95%
- Database availability: 99.99%

Rarity: Common
Difficulty: Hard


2. How do you handle conflicting SLOs across different user segments?

Answer: Different user segments often have different reliability needs:

Strategy: Multi-tier SLOs

class SLOTier:
    def __init__(self, name, availability, latency_p95, latency_p99):
        self.name = name
        self.availability = availability
        self.latency_p95 = latency_p95
        self.latency_p99 = latency_p99
        self.error_budget = 1 - availability

# Define tiers
tiers = {
    'premium': SLOTier(
        name='Premium',
        availability=0.9999,  # 99.99% - 4.3 min/month
        latency_p95=0.5,
        latency_p99=1.0
    ),
    'standard': SLOTier(
        name='Standard',
        availability=0.999,   # 99.9% - 43 min/month
        latency_p95=1.0,
        latency_p99=2.0
    ),
    'free': SLOTier(
        name='Free',
        availability=0.99,    # 99% - 7.2 hours/month
        latency_p95=2.0,
        latency_p99=5.0
    )
}

# Route requests based on tier
def get_user_tier(user_id):
    # Lookup user's subscription tier
    return user_subscription_tier(user_id)

def apply_slo_policy(user_id, request):
    tier = get_user_tier(user_id)
    slo = tiers[tier]
    
    # Apply tier-specific policies
    request.timeout = slo.latency_p99
    request.priority = tier  # For queue prioritization
    request.retry_budget = calculate_retry_budget(slo.error_budget)
    
    return request

Implementation with Traffic Routing:

# Kubernetes example: Separate deployments per tier
apiVersion: apps/v1
kind: Deployment
metadata:
  name: api-premium
spec:
  replicas: 10
  template:
    spec:
      containers:
      - name: api
        resources:
          requests:
            cpu: "2"
            memory: "4Gi"
          limits:
            cpu: "4"
            memory: "8Gi"
      priorityClassName: high-priority
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: api-standard
spec:
  replicas: 5
  template:
    spec:
      containers:
      - name: api
        resources:
          requests:
            cpu: "1"
            memory: "2Gi"

Monitoring per tier:

# Availability by tier
sum(rate(http_requests_total{status!~"5.."}[5m])) by (tier)
/
sum(rate(http_requests_total[5m])) by (tier)

# Latency by tier
histogram_quantile(0.95,
  rate(http_request_duration_seconds_bucket[5m])
) by (tier)

Rarity: Uncommon
Difficulty: Hard


Capacity Planning

3. Walk through your capacity planning process for a rapidly growing service.

Answer: Capacity planning ensures resources meet demand while optimizing costs:

Capacity Planning Framework:

Loading diagram...

1. Measure baseline:

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression

class CapacityPlanner:
    def __init__(self, metrics_data):
        self.data = pd.DataFrame(metrics_data)
    
    def analyze_trends(self, metric_name, days=30):
        """Analyze historical trends"""
        metric_data = self.data[metric_name].tail(days * 24)  # Hourly data
        
        # Calculate growth rate
        start_value = metric_data.iloc[0]
        end_value = metric_data.iloc[-1]
        growth_rate = ((end_value - start_value) / start_value) * 100
        
        # Identify peak usage
        peak_value = metric_data.max()
        peak_time = metric_data.idxmax()
        
        # Calculate percentiles
        p50 = metric_data.quantile(0.50)
        p95 = metric_data.quantile(0.95)
        p99 = metric_data.quantile(0.99)
        
        return {
            'growth_rate': growth_rate,
            'peak_value': peak_value,
            'peak_time': peak_time,
            'p50': p50,
            'p95': p95,
            'p99': p99
        }
    
    def forecast_capacity(self, metric_name, months_ahead=3):
        """Forecast future capacity needs"""
        # Prepare data
        df = self.data[[metric_name]].reset_index()
        df['days'] = (df.index / 24).astype(int)  # Convert hours to days
        
        # Train model
        X = df[['days']].values
        y = df[metric_name].values
        
        model = LinearRegression()
        model.fit(X, y)
        
        # Forecast
        future_days = np.array([[df['days'].max() + (30 * months_ahead)]])
        forecast = model.predict(future_days)[0]
        
        # Add safety margin (20%)
        forecast_with_margin = forecast * 1.2
        
        return {
            'forecast': forecast,
            'with_margin': forecast_with_margin,
            'current': y[-1],
            'growth_factor': forecast / y[-1]
        }
    
    def calculate_resource_needs(self, requests_per_second, 
                                 requests_per_instance=100,
                                 headroom=0.3):
        """Calculate required instances"""
        # Base capacity
        base_instances = np.ceil(requests_per_second / requests_per_instance)
        
        # Add headroom for spikes and maintenance
        total_instances = np.ceil(base_instances * (1 + headroom))
        
        return {
            'base_instances': int(base_instances),
            'total_instances': int(total_instances),
            'headroom_instances': int(total_instances - base_instances)
        }

# Example usage
metrics = {
    'requests_per_second': [100, 105, 110, 115, 120, ...],  # Historical data
    'cpu_usage': [45, 48, 50, 52, 55, ...],
    'memory_usage': [60, 62, 65, 67, 70, ...]
}

planner = CapacityPlanner(metrics)

# Analyze trends
trends = planner.analyze_trends('requests_per_second', days=30)
print(f"Growth rate: {trends['growth_rate']:.2f}%")
print(f"Peak RPS: {trends['peak_value']}")

# Forecast capacity
forecast = planner.forecast_capacity('requests_per_second', months_ahead=3)
print(f"Forecasted RPS in 3 months: {forecast['forecast']:.0f}")
print(f"With safety margin: {forecast['with_margin']:.0f}")

# Calculate resource needs
resources = planner.calculate_resource_needs(
    requests_per_second=forecast['with_margin'],
    requests_per_instance=100,
    headroom=0.3
)
print(f"Required instances: {resources['total_instances']}")

2. Consider growth drivers:

  • User growth rate
  • Feature launches
  • Seasonal patterns
  • Marketing campaigns
  • Geographic expansion

3. Plan for headroom:

  • N+1: Survive one instance failure
  • N+2: Survive two failures or one zone outage
  • Traffic spikes: 2-3x normal capacity
  • Maintenance windows: 20-30% overhead

4. Cost optimization:

def optimize_instance_mix(workload_profile):
    """
    Optimize instance types for cost
    """
    # Mix of instance types
    instance_types = {
        'on_demand': {
            'cost_per_hour': 0.10,
            'reliability': 1.0,
            'percentage': 0.3  # 30% on-demand for baseline
        },
        'spot': {
            'cost_per_hour': 0.03,
            'reliability': 0.95,
            'percentage': 0.5  # 50% spot for cost savings
        },
        'reserved': {
            'cost_per_hour': 0.06,
            'reliability': 1.0,
            'percentage': 0.2  # 20% reserved for predictable load
        }
    }
    
    total_instances = workload_profile['total_instances']
    
    allocation = {}
    for instance_type, config in instance_types.items():
        count = int(total_instances * config['percentage'])
        allocation[instance_type] = {
            'count': count,
            'monthly_cost': count * config['cost_per_hour'] * 730
        }
    
    return allocation

Rarity: Very Common
Difficulty: Hard


Chaos Engineering

4. How do you implement chaos engineering in production?

Answer: Chaos engineering proactively tests system resilience by injecting failures:

Chaos Engineering Principles:

  1. Build hypothesis around steady state
  2. Vary real-world events
  3. Run experiments in production
  4. Automate experiments
  5. Minimize blast radius

Implementation:

# Chaos experiment framework
from dataclasses import dataclass
from enum import Enum
import random
import time

class ExperimentStatus(Enum):
    PLANNED = "planned"
    RUNNING = "running"
    COMPLETED = "completed"
    ABORTED = "aborted"

@dataclass
class ChaosExperiment:
    name: str
    hypothesis: str
    blast_radius: float  # Percentage of traffic affected
    duration_seconds: int
    rollback_criteria: dict
    
    def __post_init__(self):
        self.status = ExperimentStatus.PLANNED
        self.metrics_before = {}
        self.metrics_during = {}
        self.metrics_after = {}

class ChaosRunner:
    def __init__(self, monitoring_client):
        self.monitoring = monitoring_client
        self.experiments = []
    
    def run_experiment(self, experiment: ChaosExperiment):
        """Execute chaos experiment with safety checks"""
        print(f"Starting experiment: {experiment.name}")
        
        # 1. Measure baseline
        experiment.metrics_before = self.measure_metrics()
        print(f"Baseline metrics: {experiment.metrics_before}")
        
        # 2. Verify system is healthy
        if not self.is_system_healthy(experiment.metrics_before):
            print("System unhealthy, aborting experiment")
            return False
        
        # 3. Inject failure
        experiment.status = ExperimentStatus.RUNNING
        failure_injection = self.inject_failure(experiment)
        
        try:
            # 4. Monitor during experiment
            start_time = time.time()
            while time.time() - start_time < experiment.duration_seconds:
                experiment.metrics_during = self.measure_metrics()
                
                # Check rollback criteria
                if self.should_rollback(experiment):
                    print("Rollback criteria met, stopping experiment")
                    self.rollback(failure_injection)
                    experiment.status = ExperimentStatus.ABORTED
                    return False
                
                time.sleep(10)  # Check every 10 seconds
            
            # 5. Rollback failure injection
            self.rollback(failure_injection)
            
            # 6. Measure recovery
            time.sleep(60)  # Wait for system to stabilize
            experiment.metrics_after = self.measure_metrics()
            
            # 7. Analyze results
            experiment.status = ExperimentStatus.COMPLETED
            return self.analyze_results(experiment)
            
        except Exception as e:
            print(f"Experiment failed: {e}")
            self.rollback(failure_injection)
            experiment.status = ExperimentStatus.ABORTED
            return False
    
    def inject_failure(self, experiment):
        """Inject specific failure type"""
        # Implementation depends on failure type
        pass
    
    def measure_metrics(self):
        """Measure key system metrics"""
        return {
            'error_rate': self.monitoring.get_error_rate(),
            'latency_p95': self.monitoring.get_latency_p95(),
            'requests_per_second': self.monitoring.get_rps(),
            'availability': self.monitoring.get_availability()
        }
    
    def is_system_healthy(self, metrics):
        """Check if system meets SLOs"""
        return (
            metrics['error_rate'] < 0.01 and  # < 1% errors
            metrics['latency_p95'] < 1.0 and  # < 1s latency
            metrics['availability'] > 0.999   # > 99.9% available
        )
    
    def should_rollback(self, experiment):
        """Check if experiment should be aborted"""
        current = experiment.metrics_during
        criteria = experiment.rollback_criteria
        
        return (
            current['error_rate'] > criteria.get('max_error_rate', 0.05) or
            current['latency_p95'] > criteria.get('max_latency', 5.0) or
            current['availability'] < criteria.get('min_availability', 0.99)
        )
    
    def rollback(self, failure_injection):
        """Remove failure injection"""
        print("Rolling back failure injection")
        # Implementation depends on failure type
    
    def analyze_results(self, experiment):
        """Analyze experiment results"""
        before = experiment.metrics_before
        during = experiment.metrics_during
        after = experiment.metrics_after
        
        print(f"\nExperiment Results: {experiment.name}")
        print(f"Hypothesis: {experiment.hypothesis}")
        print(f"\nMetrics:")
        print(f"  Before: {before}")
        print(f"  During: {during}")
        print(f"  After: {after}")
        
        # Determine if hypothesis was validated
        hypothesis_validated = (
            during['availability'] >= experiment.rollback_criteria['min_availability']
        )
        
        return hypothesis_validated

# Example experiment
experiment = ChaosExperiment(
    name="Database Failover Test",
    hypothesis="System remains available during database failover",
    blast_radius=0.1,  # 10% of traffic
    duration_seconds=300,  # 5 minutes
    rollback_criteria={
        'max_error_rate': 0.05,
        'max_latency': 5.0,
        'min_availability': 0.99
    }
)

Common Chaos Experiments:

1. Network Latency:

# Using tc (traffic control) to add latency
tc qdisc add dev eth0 root netem delay 100ms 20ms

# Rollback
tc qdisc del dev eth0 root

2. Pod Failure (Kubernetes):

# Kill random pods
kubectl delete pod -l app=myapp --random=1

# Using Chaos Mesh
kubectl apply -f - <<EOF
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
  name: pod-failure
spec:
  action: pod-failure
  mode: one
  selector:
    namespaces:
      - production
    labelSelectors:
      app: myapp
  duration: "30s"
EOF

3. Resource Exhaustion:

# CPU stress
stress-ng --cpu 4 --timeout 60s

# Memory stress
stress-ng --vm 2 --vm-bytes 2G --timeout 60s

Rarity: Common
Difficulty: Hard


Incident Leadership

5. How do you lead a high-severity incident from detection through postmortem?

Answer: Senior SREs often serve as incident commanders for critical outages:

Incident Command Structure:

Loading diagram...

Incident Commander Responsibilities:

1. Initial Response (0-5 minutes):

## Incident Commander Checklist

### Immediate Actions
- [ ] Acknowledge incident
- [ ] Assess severity (SEV-1, SEV-2, SEV-3)
- [ ] Create incident channel (#incident-YYYY-MM-DD-NNN)
- [ ] Page appropriate teams
- [ ] Designate roles:
  - [ ] Technical Lead
  - [ ] Communications Lead
  - [ ] Scribe (document timeline)

### Severity Assessment
**SEV-1 (Critical):**
- Complete service outage
- Data loss
- Security breach
- > 50% users affected

**SEV-2 (High):**
- Partial outage
- Degraded performance
- 10-50% users affected

**SEV-3 (Medium):**
- Minor degradation
- < 10% users affected
- Workaround available

2. Investigation Phase:

class IncidentCommander:
    def __init__(self, incident_id):
        self.incident_id = incident_id
        self.timeline = []
        self.status_updates = []
        self.action_items = []
    
    def coordinate_investigation(self):
        """Coordinate technical investigation"""
        # Parallel investigation tracks
        tracks = {
            'recent_changes': self.check_recent_deployments(),
            'infrastructure': self.check_infrastructure_health(),
            'dependencies': self.check_external_dependencies(),
            'metrics': self.analyze_metrics_anomalies()
        }
        
        return tracks
    
    def make_decision(self, options, deadline_minutes=5):
        """Make time-boxed decision"""
        print(f"Decision needed within {deadline_minutes} minutes")
        print(f"Options: {options}")
        
        # Gather input from technical leads
        # Make decision based on:
        # - User impact
        # - Risk of each option
        # - Time to implement
        # - Reversibility
        
        return selected_option
    
    def communicate_status(self, interval_minutes=15):
        """Regular status updates"""
        update = {
            'timestamp': datetime.now(),
            'status': self.get_current_status(),
            'impact': self.assess_user_impact(),
            'eta': self.estimate_resolution_time(),
            'next_update': datetime.now() + timedelta(minutes=interval_minutes)
        }
        
        # Send to stakeholders
        self.send_status_update(update)
        self.status_updates.append(update)
    
    def log_timeline_event(self, event, timestamp=None):
        """Document incident timeline"""
        self.timeline.append({
            'timestamp': timestamp or datetime.now(),
            'event': event,
            'logged_by': self.get_current_user()
        })

3. Mitigation Strategies:

def evaluate_mitigation_options():
    """Evaluate and prioritize mitigation options"""
    options = [
        {
            'action': 'Rollback deployment',
            'time_to_implement': 5,  # minutes
            'risk': 'low',
            'effectiveness': 'high',
            'reversible': True
        },
        {
            'action': 'Scale up resources',
            'time_to_implement': 2,
            'risk': 'low',
            'effectiveness': 'medium',
            'reversible': True
        },
        {
            'action': 'Disable feature flag',
            'time_to_implement': 1,
            'risk': 'low',
            'effectiveness': 'high',
            'reversible': True
        },
        {
            'action': 'Database failover',
            'time_to_implement': 10,
            'risk': 'medium',
            'effectiveness': 'high',
            'reversible': False
        }
    ]
    
    # Sort by: low risk, high effectiveness, fast implementation
    sorted_options = sorted(
        options,
        key=lambda x: (
            x['risk'] == 'low',
            x['effectiveness'] == 'high',
            -x['time_to_implement']
        ),
        reverse=True
    )
    
    return sorted_options

4. Postmortem (Blameless):

# Incident Postmortem: API Outage

**Date:** 2024-11-25  
**Duration:** 45 minutes  
**Severity:** SEV-1  
**Incident Commander:** Alice  
**Technical Lead:** Bob

## Executive Summary
Complete API outage affecting all users due to database connection pool exhaustion.

## Impact
- **Users affected:** 100% (all users)
- **Duration:** 45 minutes
- **Revenue impact:** ~$50,000
- **SLO impact:** Consumed 75% of monthly error budget

## Root Cause
Database connection pool exhausted due to connection leak in new feature deployed 2 hours before incident.

## Timeline
| Time | Event |
|------|-------|
| 14:00 | Deployment of v2.5.0 |
| 15:45 | First alerts for increased latency |
| 15:50 | Complete API outage |
| 15:52 | Incident declared (SEV-1) |
| 15:55 | Identified database as bottleneck |
| 16:05 | Decision to rollback deployment |
| 16:15 | Rollback completed |
| 16:20 | Service recovering |
| 16:35 | Full recovery confirmed |

## What Went Well
- Quick detection (5 minutes from first symptom)
- Clear incident command structure
- Fast decision to rollback
- Good communication with stakeholders

## What Went Wrong
- Connection leak not caught in testing
- No connection pool monitoring
- Deployment during peak hours

## Action Items
| Action | Owner | Due Date | Status |
|--------|-------|----------|--------|
| Add connection pool monitoring | Alice | 2024-12-01 | Open |
| Implement connection leak detection in tests | Bob | 2024-12-05 | Open |
| Update deployment policy (avoid peak hours) | Charlie | 2024-11-30 | Open |
| Add circuit breaker for database connections | David | 2024-12-10 | Open |

## Lessons Learned
- Monitoring gaps can hide critical issues
- Deployment timing matters
- Need better integration testing for resource leaks

Rarity: Very Common
Difficulty: Hard


Distributed Systems Reliability

6. How do you ensure reliability in a distributed microservices architecture?

Answer: Distributed systems introduce unique reliability challenges:

Key Patterns:

1. Service Mesh for Resilience:

# Istio example: Circuit breaking and retries
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: api-service
spec:
  host: api-service
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 50
        http2MaxRequests: 100
        maxRequestsPerConnection: 2
    outlierDetection:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
      minHealthPercent: 40
    loadBalancer:
      simple: LEAST_REQUEST
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: api-service
spec:
  hosts:
  - api-service
  http:
  - retries:
      attempts: 3
      perTryTimeout: 2s
      retryOn: 5xx,reset,connect-failure,refused-stream
    timeout: 10s
    route:
    - destination:
        host: api-service

2. Distributed Tracing:

from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

# Setup tracing
trace.set_tracer_provider(TracerProvider())
jaeger_exporter = JaegerExporter(
    agent_host_name="jaeger",
    agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

# Instrument requests library
RequestsInstrumentor().instrument()

# Use in code
tracer = trace.get_tracer(__name__)

def process_order(order_id):
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)
        
        # Call payment service
        with tracer.start_as_current_span("call_payment_service"):
            payment_result = call_payment_service(order_id)
        
        # Call inventory service
        with tracer.start_as_current_span("call_inventory_service"):
            inventory_result = call_inventory_service(order_id)
        
        return combine_results(payment_result, inventory_result)

3. Bulkhead Pattern:

import asyncio
from asyncio import Semaphore

class BulkheadExecutor:
    def __init__(self, max_concurrent=10):
        self.semaphore = Semaphore(max_concurrent)
        self.active_requests = 0
    
    async def execute(self, func, *args, **kwargs):
        """Execute function with concurrency limit"""
        async with self.semaphore:
            self.active_requests += 1
            try:
                result = await func(*args, **kwargs)
                return result
            finally:
                self.active_requests -= 1

# Separate bulkheads for different services
payment_bulkhead = BulkheadExecutor(max_concurrent=20)
inventory_bulkhead = BulkheadExecutor(max_concurrent=10)
notification_bulkhead = BulkheadExecutor(max_concurrent=5)

# Usage
async def process_order(order_id):
    # Each service has isolated resource pool
    payment = await payment_bulkhead.execute(call_payment_service, order_id)
    inventory = await inventory_bulkhead.execute(call_inventory_service, order_id)
    notification = await notification_bulkhead.execute(send_notification, order_id)

4. Saga Pattern for Distributed Transactions:

class SagaOrchestrator:
    def __init__(self):
        self.steps = []
        self.compensations = []
    
    def add_step(self, action, compensation):
        """Add step with compensation"""
        self.steps.append(action)
        self.compensations.append(compensation)
    
    async def execute(self):
        """Execute saga with automatic compensation on failure"""
        completed_steps = []
        
        try:
            for step in self.steps:
                result = await step()
                completed_steps.append(result)
            return completed_steps
            
        except Exception as e:
            # Compensate in reverse order
            print(f"Saga failed: {e}. Starting compensation...")
            for i in range(len(completed_steps) - 1, -1, -1):
                try:
                    await self.compensations[i](completed_steps[i])
                except Exception as comp_error:
                    print(f"Compensation failed: {comp_error}")
            raise

# Example: Order processing saga
async def process_order_saga(order_id):
    saga = SagaOrchestrator()
    
    # Step 1: Reserve inventory
    saga.add_step(
        action=lambda: reserve_inventory(order_id),
        compensation=lambda result: release_inventory(result['reservation_id'])
    )
    
    # Step 2: Process payment
    saga.add_step(
        action=lambda: process_payment(order_id),
        compensation=lambda result: refund_payment(result['transaction_id'])
    )
    
    # Step 3: Create shipment
    saga.add_step(
        action=lambda: create_shipment(order_id),
        compensation=lambda result: cancel_shipment(result['shipment_id'])
    )
    
    return await saga.execute()

Rarity: Common
Difficulty: Hard


Performance Optimization

7. How do you approach performance optimization for a production system?

Answer: Systematic performance optimization requires data-driven decisions:

Performance Optimization Framework:

Loading diagram...

1. Establish Baseline:

import time
import statistics
from dataclasses import dataclass
from typing import List

@dataclass
class PerformanceMetrics:
    latency_p50: float
    latency_p95: float
    latency_p99: float
    throughput_rps: float
    error_rate: float
    cpu_usage: float
    memory_usage: float

class PerformanceProfiler:
    def __init__(self):
        self.measurements = []
    
    def measure_endpoint(self, endpoint_func, iterations=1000):
        """Measure endpoint performance"""
        latencies = []
        errors = 0
        
        start_time = time.time()
        
        for _ in range(iterations):
            request_start = time.time()
            try:
                endpoint_func()
                latency = (time.time() - request_start) * 1000  # ms
                latencies.append(latency)
            except Exception as e:
                errors += 1
        
        duration = time.time() - start_time
        
        return PerformanceMetrics(
            latency_p50=statistics.quantiles(latencies, n=100)[49],
            latency_p95=statistics.quantiles(latencies, n=100)[94],
            latency_p99=statistics.quantiles(latencies, n=100)[98],
            throughput_rps=iterations / duration,
            error_rate=errors / iterations,
            cpu_usage=self.get_cpu_usage(),
            memory_usage=self.get_memory_usage()
        )
    
    def compare_performance(self, before: PerformanceMetrics, 
                          after: PerformanceMetrics):
        """Compare performance improvements"""
        improvements = {
            'latency_p95': ((before.latency_p95 - after.latency_p95) / 
                           before.latency_p95 * 100),
            'throughput': ((after.throughput_rps - before.throughput_rps) / 
                          before.throughput_rps * 100),
            'error_rate': ((before.error_rate - after.error_rate) / 
                          before.error_rate * 100) if before.error_rate > 0 else 0
        }
        
        print(f"Performance Improvements:")
        print(f"  Latency (p95): {improvements['latency_p95']:.1f}% faster")
        print(f"  Throughput: {improvements['throughput']:.1f}% increase")
        print(f"  Error Rate: {improvements['error_rate']:.1f}% reduction")
        
        return improvements

2. Identify Bottlenecks:

import cProfile
import pstats
from functools import wraps

def profile_function(func):
    """Decorator to profile function performance"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        profiler = cProfile.Profile()
        profiler.enable()
        
        result = func(*args, **kwargs)
        
        profiler.disable()
        stats = pstats.Stats(profiler)
        stats.sort_stats('cumulative')
        stats.print_stats(20)  # Top 20 functions
        
        return result
    return wrapper

# Database query optimization
def analyze_slow_queries():
    """Identify and optimize slow database queries"""
    slow_queries = """
    SELECT 
        query,
        calls,
        total_time,
        mean_time,
        max_time
    FROM pg_stat_statements
    WHERE mean_time > 100  -- queries averaging > 100ms
    ORDER BY total_time DESC
    LIMIT 20;
    """
    
    # Analysis steps:
    # 1. Add indexes for frequently filtered columns
    # 2. Optimize JOIN operations
    # 3. Use EXPLAIN ANALYZE to understand query plans
    # 4. Consider query result caching
    # 5. Denormalize if necessary

# Example optimization
@profile_function
def optimized_user_search(query):
    """Optimized user search with caching and indexing"""
    # Check cache first
    cache_key = f"user_search:{query}"
    cached_result = redis_client.get(cache_key)
    if cached_result:
        return cached_result
    
    # Use indexed query
    results = db.execute("""
        SELECT id, name, email
        FROM users
        WHERE search_vector @@ to_tsquery(%s)
        LIMIT 100
    """, (query,))
    
    # Cache results
    redis_client.setex(cache_key, 300, results)  # 5 min TTL
    
    return results

3. Common Optimizations:

Database:

-- Add indexes
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_created ON orders(user_id, created_at);

-- Use connection pooling
-- Implement read replicas for read-heavy workloads
-- Use materialized views for complex aggregations

-- Example: Materialized view for dashboard
CREATE MATERIALIZED VIEW daily_stats AS
SELECT 
    date_trunc('day', created_at) as day,
    COUNT(*) as order_count,
    SUM(total) as revenue
FROM orders
GROUP BY day;

-- Refresh periodically
REFRESH MATERIALIZED VIEW CONCURRENTLY daily_stats;

Caching Strategy:

from functools import lru_cache
import redis
import pickle

class CacheStrategy:
    def __init__(self):
        self.redis_client = redis.Redis()
        self.local_cache = {}
    
    def multi_level_cache(self, key, fetch_func, ttl=300):
        """Multi-level caching: memory -> Redis -> database"""
        # Level 1: In-memory cache
        if key in self.local_cache:
            return self.local_cache[key]
        
        # Level 2: Redis cache
        cached = self.redis_client.get(key)
        if cached:
            value = pickle.loads(cached)
            self.local_cache[key] = value
            return value
        
        # Level 3: Fetch from source
        value = fetch_func()
        
        # Populate caches
        self.redis_client.setex(key, ttl, pickle.dumps(value))
        self.local_cache[key] = value
        
        return value
    
    @lru_cache(maxsize=1000)
    def cached_computation(self, input_data):
        """Cache expensive computations"""
        return expensive_computation(input_data)

Async Processing:

import asyncio
import aiohttp

async def fetch_multiple_apis(urls):
    """Fetch multiple APIs concurrently"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

# Before: Sequential (slow)
def sequential_fetch(urls):
    results = []
    for url in urls:
        results.append(requests.get(url).json())
    return results  # Takes N * avg_latency

# After: Concurrent (fast)
async def concurrent_fetch(urls):
    return await fetch_multiple_apis(urls)  # Takes max(latencies)

4. Load Testing:

# Using Locust for load testing
from locust import HttpUser, task, between

class APIUser(HttpUser):
    wait_time = between(1, 3)
    
    @task(3)  # Weight: 3x more common
    def get_products(self):
        self.client.get("/api/products")
    
    @task(1)
    def create_order(self):
        self.client.post("/api/orders", json={
            "product_id": 123,
            "quantity": 1
        })
    
    def on_start(self):
        """Login once per user"""
        self.client.post("/api/login", json={
            "username": "test",
            "password": "test"
        })

# Run: locust -f loadtest.py --host=https://api.example.com
# Test with: 100 users, 10 users/sec spawn rate

5. Monitoring Performance:

# Track latency improvements
histogram_quantile(0.95,
  rate(http_request_duration_seconds_bucket[5m])
)

# Monitor cache hit rate
sum(rate(cache_hits_total[5m])) /
sum(rate(cache_requests_total[5m]))

# Database query performance
rate(pg_stat_statements_total_time[5m]) /
rate(pg_stat_statements_calls[5m])

Rarity: Very Common
Difficulty: Hard


On-Call & Operational Excellence

8. How do you design an effective on-call rotation and reduce on-call burden?

Answer: Sustainable on-call requires balancing coverage with engineer wellbeing:

On-Call Structure:

1. Rotation Design:

from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List

@dataclass
class OnCallShift:
    engineer: str
    start_time: datetime
    end_time: datetime
    shift_type: str  # 'primary', 'secondary', 'escalation'

class OnCallScheduler:
    def __init__(self, engineers: List[str]):
        self.engineers = engineers
        self.shifts = []
    
    def create_rotation(self, weeks=4):
        """Create balanced on-call rotation"""
        shift_duration = timedelta(days=7)  # Weekly rotation
        current_date = datetime.now()
        
        for week in range(weeks):
            # Rotate through engineers
            primary = self.engineers[week % len(self.engineers)]
            secondary = self.engineers[(week + 1) % len(self.engineers)]
            
            shift_start = current_date + (week * shift_duration)
            shift_end = shift_start + shift_duration
            
            self.shifts.append(OnCallShift(
                engineer=primary,
                start_time=shift_start,
                end_time=shift_end,
                shift_type='primary'
            ))
            
            self.shifts.append(OnCallShift(
                engineer=secondary,
                start_time=shift_start,
                end_time=shift_end,
                shift_type='secondary'
            ))
        
        return self.shifts
    
    def calculate_on_call_load(self, engineer: str, days=30):
        """Calculate on-call burden per engineer"""
        total_hours = 0
        incident_count = 0
        
        for shift in self.shifts:
            if shift.engineer == engineer:
                hours = (shift.end_time - shift.start_time).total_seconds() / 3600
                total_hours += hours
                incident_count += self.get_incidents_during_shift(shift)
        
        return {
            'engineer': engineer,
            'total_hours': total_hours,
            'incident_count': incident_count,
            'avg_incidents_per_week': incident_count / (days / 7),
            'burnout_risk': 'high' if incident_count > 10 else 'normal'
        }

2. Reduce Alert Fatigue:

# Alert severity classification
alerts:
  critical:  # Page immediately
    - service_down
    - data_loss
    - security_breach
    - slo_burn_rate_critical
    
  warning:  # Ticket during business hours
    - high_latency
    - elevated_error_rate
    - disk_space_low
    
  info:  # Dashboard only
    - deployment_completed
    - cache_miss_rate_elevated
    - background_job_slow

# Alert tuning principles
alert_best_practices:
  - name: "Avoid flapping"
    rule: "Use 'for: 5m' to require sustained condition"
  
  - name: "Alert on symptoms, not causes"
    bad: "High CPU usage"
    good: "User-facing latency > SLO"
  
  - name: "Include runbook link"
    example: "annotations.runbook: https://wiki/runbooks/high-latency"
  
  - name: "Make alerts actionable"
    bad: "Database slow"
    good: "Database connection pool 90% full - scale or investigate slow queries"

3. Incident Response Automation:

class IncidentAutomation:
    def __init__(self):
        self.slack_client = SlackClient()
        self.pagerduty_client = PagerDutyClient()
    
    def auto_create_incident(self, alert):
        """Automatically create incident from alert"""
        # 1. Create incident channel
        channel = self.slack_client.create_channel(
            name=f"incident-{alert['id']}",
            topic=alert['summary']
        )
        
        # 2. Page on-call engineer
        on_call = self.pagerduty_client.get_on_call('production')
        self.pagerduty_client.create_incident(
            title=alert['summary'],
            service='production-api',
            assigned_to=on_call
        )
        
        # 3. Post initial context
        self.slack_client.post_message(
            channel=channel,
            message=f"""
            🚨 **Incident Created**
            
            **Alert:** {alert['name']}
            **Severity:** {alert['severity']}
            **On-Call:** @{on_call}
            
            **Quick Links:**
            - [Runbook]({alert['runbook_url']})
            - [Dashboard]({alert['dashboard_url']})
            - [Logs]({alert['logs_url']})
            
            **Next Steps:**
            1. Acknowledge incident
            2. Assess impact
            3. Begin investigation
            """
        )
        
        # 4. Auto-gather diagnostics
        diagnostics = self.gather_diagnostics(alert)
        self.slack_client.post_message(
            channel=channel,
            message=f"**Auto-Diagnostics:**\n```{diagnostics}```"
        )
    
    def gather_diagnostics(self, alert):
        """Automatically gather relevant diagnostics"""
        return {
            'recent_deployments': self.get_recent_deployments(),
            'error_rate': self.get_current_error_rate(),
            'latency_p95': self.get_current_latency(),
            'active_alerts': self.get_active_alerts(),
            'resource_usage': self.get_resource_usage()
        }

4. On-Call Metrics:

def calculate_on_call_metrics(team, period_days=30):
    """Track on-call health metrics"""
    metrics = {
        'total_incidents': count_incidents(period_days),
        'mttr': calculate_mttr(period_days),  # Mean time to resolution
        'incidents_per_week': count_incidents(period_days) / (period_days / 7),
        'after_hours_incidents': count_after_hours_incidents(period_days),
        'false_positive_rate': count_false_positives(period_days) / count_incidents(period_days),
        'auto_resolved': count_auto_resolved(period_days),
        'escalations': count_escalations(period_days)
    }
    
    # Health indicators
    health = {
        'alert_quality': 1 - metrics['false_positive_rate'],
        'automation_effectiveness': metrics['auto_resolved'] / metrics['total_incidents'],
        'on_call_burden': 'high' if metrics['after_hours_incidents'] > 5 else 'normal'
    }
    
    return metrics, health

5. Improve On-Call Experience:

  • Reduce toil: Automate common responses
  • Better runbooks: Clear, tested procedures
  • Post-incident reviews: Learn and improve
  • Compensation: Time off after incidents
  • Rotation fairness: Balance load across team
  • Escalation paths: Clear escalation procedures

Rarity: Very Common
Difficulty: Hard


SRE Metrics & Organizational Impact

9. How do you measure and demonstrate the value of SRE work?

Answer: Quantifying SRE impact requires tracking both technical and business metrics:

SRE Success Metrics:

1. Reliability Metrics:

from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class ReliabilityMetrics:
    # Service Level Indicators
    availability: float  # 99.9%
    latency_p95: float   # milliseconds
    error_rate: float    # percentage
    
    # Incident Metrics
    mttr: float          # Mean time to resolution (minutes)
    mtbf: float          # Mean time between failures (hours)
    incident_count: int
    
    # Error Budget
    error_budget_remaining: float  # percentage
    error_budget_burn_rate: float  # per day

class SREMetricsCalculator:
    def calculate_quarterly_metrics(self, quarter_start, quarter_end):
        """Calculate SRE metrics for quarterly review"""
        
        # Reliability improvements
        reliability = {
            'availability_improvement': self.compare_availability(
                quarter_start, quarter_end
            ),
            'latency_improvement': self.compare_latency(
                quarter_start, quarter_end
            ),
            'error_rate_reduction': self.compare_error_rate(
                quarter_start, quarter_end
            )
        }
        
        # Operational efficiency
        efficiency = {
            'mttr_improvement': self.compare_mttr(
                quarter_start, quarter_end
            ),
            'incident_reduction': self.compare_incident_count(
                quarter_start, quarter_end
            ),
            'toil_reduction': self.calculate_toil_reduction(
                quarter_start, quarter_end
            ),
            'automation_coverage': self.calculate_automation_coverage()
        }
        
        # Business impact
        business_impact = {
            'revenue_protected': self.calculate_revenue_protected(),
            'cost_savings': self.calculate_cost_savings(),
            'deployment_frequency': self.calculate_deployment_frequency(),
            'change_failure_rate': self.calculate_change_failure_rate()
        }
        
        return {
            'reliability': reliability,
            'efficiency': efficiency,
            'business_impact': business_impact
        }
    
    def calculate_revenue_protected(self):
        """Calculate revenue protected by reliability improvements"""
        # Revenue per minute of uptime
        revenue_per_minute = 1000  # $1000/min
        
        # Downtime prevented
        baseline_downtime = 100  # minutes (previous quarter)
        current_downtime = 20    # minutes (current quarter)
        downtime_prevented = baseline_downtime - current_downtime
        
        revenue_protected = downtime_prevented * revenue_per_minute
        
        return {
            'downtime_prevented_minutes': downtime_prevented,
            'revenue_protected': revenue_protected,
            'availability_improvement': (
                (downtime_prevented / baseline_downtime) * 100
            )
        }
    
    def calculate_toil_reduction(self, start_date, end_date):
        """Calculate time saved through automation"""
        # Manual tasks automated
        automated_tasks = [
            {'name': 'Service restarts', 'time_saved_per_week': 5},
            {'name': 'Log analysis', 'time_saved_per_week': 10},
            {'name': 'Deployment rollbacks', 'time_saved_per_week': 3},
            {'name': 'Certificate rotation', 'time_saved_per_week': 2}
        ]
        
        weeks = (end_date - start_date).days / 7
        total_hours_saved = sum(
            task['time_saved_per_week'] * weeks 
            for task in automated_tasks
        )
        
        # Convert to engineering capacity
        engineer_cost_per_hour = 100
        cost_savings = total_hours_saved * engineer_cost_per_hour
        
        return {
            'hours_saved': total_hours_saved,
            'cost_savings': cost_savings,
            'tasks_automated': len(automated_tasks),
            'engineering_capacity_freed': total_hours_saved / 40  # weeks
        }

2. DORA Metrics (DevOps Research and Assessment):

class DORAMetrics:
    """Track DORA 4 key metrics"""
    
    def deployment_frequency(self, days=30):
        """How often we deploy to production"""
        deployments = self.get_deployments(days)
        return len(deployments) / days
    
    def lead_time_for_changes(self, days=30):
        """Time from commit to production"""
        changes = self.get_changes(days)
        lead_times = [
            (c.deployed_at - c.committed_at).total_seconds() / 3600
            for c in changes
        ]
        return statistics.median(lead_times)
    
    def change_failure_rate(self, days=30):
        """Percentage of deployments causing incidents"""
        deployments = self.get_deployments(days)
        failed_deployments = [
            d for d in deployments 
            if self.caused_incident(d)
        ]
        return len(failed_deployments) / len(deployments)
    
    def time_to_restore_service(self, days=30):
        """MTTR - how quickly we recover from incidents"""
        incidents = self.get_incidents(days)
        resolution_times = [
            (i.resolved_at - i.started_at).total_seconds() / 60
            for i in incidents
        ]
        return statistics.median(resolution_times)
    
    def get_dora_level(self):
        """Classify team performance (Elite/High/Medium/Low)"""
        df = self.deployment_frequency(30)
        lt = self.lead_time_for_changes(30)
        cfr = self.change_failure_rate(30)
        mttr = self.time_to_restore_service(30)
        
        # Elite performance criteria
        if (df > 1 and          # Multiple deploys per day
            lt < 1 and          # < 1 hour lead time
            cfr < 0.15 and      # < 15% failure rate
            mttr < 60):         # < 1 hour MTTR
            return "Elite"
        
        # High performance
        elif (df > 0.14 and     # Weekly deploys
              lt < 24 and       # < 1 day lead time
              cfr < 0.20 and    # < 20% failure rate
              mttr < 1440):     # < 1 day MTTR
            return "High"
        
        # Medium performance
        elif (df > 0.03 and     # Monthly deploys
              lt < 168 and      # < 1 week lead time
              cfr < 0.30 and    # < 30% failure rate
              mttr < 10080):    # < 1 week MTTR
            return "Medium"
        
        else:
            return "Low"

3. Executive Dashboard:

def generate_executive_report(quarter):
    """Generate executive-friendly SRE report"""
    
    report = {
        'headline_metrics': {
            'availability': '99.95%',
            'availability_vs_target': '+0.05%',
            'incidents': 12,
            'incidents_vs_last_quarter': '-40%',
            'mttr': '15 minutes',
            'mttr_vs_last_quarter': '-50%'
        },
        
        'business_impact': {
            'revenue_protected': '$500,000',
            'cost_savings': '$200,000',
            'engineering_capacity_freed': '8 engineer-weeks',
            'customer_satisfaction': '+15%'
        },
        
        'key_achievements': [
            'Automated incident response (50% faster resolution)',
            'Implemented chaos engineering (prevented 3 major outages)',
            'Reduced deployment time from 2 hours to 15 minutes',
            'Achieved 99.95% availability (exceeded 99.9% SLO)'
        ],
        
        'investments': [
            'Observability platform: $50k',
            'Chaos engineering tools: $20k',
            'Training and certifications: $15k'
        ],
        
        'roi': {
            'total_investment': '$85,000',
            'total_value': '$700,000',
            'roi_multiple': '8.2x'
        }
    }
    
    return report

4. Tracking Toil:

def track_toil_percentage(team_members, weeks=4):
    """Ensure toil stays below 50% per SRE principle"""
    
    toil_data = {}
    
    for engineer in team_members:
        hours = {
            'total_hours': 40 * weeks,
            'toil_hours': 0,
            'project_hours': 0,
            'on_call_hours': 0
        }
        
        # Categorize activities
        activities = get_engineer_activities(engineer, weeks)
        
        for activity in activities:
            if activity.is_toil():
                hours['toil_hours'] += activity.duration
            elif activity.is_on_call():
                hours['on_call_hours'] += activity.duration
            else:
                hours['project_hours'] += activity.duration
        
        toil_percentage = (hours['toil_hours'] / hours['total_hours']) * 100
        
        toil_data[engineer] = {
            'toil_percentage': toil_percentage,
            'status': 'healthy' if toil_percentage < 50 else 'over_limit',
            'breakdown': hours
        }
    
    return toil_data

Key Metrics to Track:

  • Reliability: Availability, latency, error rate
  • Efficiency: MTTR, deployment frequency, toil percentage
  • Business: Revenue protected, cost savings, customer satisfaction
  • Team Health: On-call burden, burnout indicators, toil levels

Rarity: Common
Difficulty: Hard


Conclusion

Senior SRE interviews require deep technical expertise, leadership experience, and strategic thinking. Key areas to master:

  1. Advanced SLO Design: Multi-tier SLOs, new service SLOs, balancing reliability and velocity
  2. Capacity Planning: Forecasting, resource optimization, cost management
  3. Chaos Engineering: Production testing, blast radius control, automated experiments
  4. Incident Leadership: Command structure, decision-making, blameless postmortems
  5. Distributed Systems: Service mesh, tracing, resilience patterns, saga transactions
  6. Organizational Impact: Driving SRE culture, mentoring, strategic planning

Focus on real-world experience, architectural decisions, and demonstrating how you've improved reliability at scale. Be prepared to discuss trade-offs, failures you've learned from, and your approach to building reliable systems. Good luck!

Newsletter subscription

Weekly career tips that actually work

Get the latest insights delivered straight to your inbox

Decorative doodle

Your Next Interview is Just One Resume Away

Create a professional, optimized resume in minutes. No design skills needed—just proven results.

Create my resume

Share this post

Get Hired 50% Faster

Job seekers using professional, AI-enhanced resumes land roles in an average of 5 weeks compared to the standard 10. Stop waiting and start interviewing.