dicembre 21, 2025
15 min di lettura

Domande per il colloquio di Senior Site Reliability Engineer: Guida Completa

interview
career-advice
job-search
Domande per il colloquio di Senior Site Reliability Engineer: Guida Completa
MB

Milad Bonakdar

Autore

Padroneggia concetti SRE avanzati con domande d'intervista complete che coprono la pianificazione della capacità, l'ingegneria del caos, i sistemi distribuiti, la progettazione di SLO, la leadership negli incident e le pratiche SRE organizzative per ruoli senior.


Introduzione

Ci si aspetta che i Senior Site Reliability Engineer progettino sistemi affidabili su larga scala, guidino le risposte agli incidenti, promuovano la cultura SRE e prendano decisioni strategiche sugli investimenti in affidabilità. Questo ruolo richiede una profonda competenza tecnica, capacità di leadership e la capacità di bilanciare l'affidabilità con la velocità del business.

Questa guida completa copre le domande essenziali per i colloqui per SRE senior, concentrandosi su concetti avanzati, progettazione del sistema e impatto organizzativo. Ogni domanda include spiegazioni dettagliate ed esempi pratici.


Progettazione Avanzata degli SLO

1. Come progetteresti SLI e SLO per un nuovo servizio con dati limitati?

Risposta: Progettare SLO per nuovi servizi richiede di bilanciare l'ambizione con la realizzabilità:

Approccio:

1. Inizia con la mappatura del percorso utente:

# Identifica i percorsi utente critici
user_journeys = {
    'search_product': {
        'steps': ['search_query', 'results_display', 'product_click'],
        'criticality': 'high',
        'expected_latency': '< 500ms'
    },
    'checkout': {
        'steps': ['add_to_cart', 'payment', 'confirmation'],
        'criticality': 'critical',
        'expected_latency': '< 2s'
    },
    'browse_recommendations': {
        'steps': ['load_page', 'fetch_recommendations'],
        'criticality': 'medium',
        'expected_latency': '< 1s'
    }
}

2. Definisci gli SLI in base all'esperienza utente:

# Specifica SLI
slis:
  availability:
    description: "Percentuale di richieste andate a buon fine"
    measurement: "count(http_status < 500) / count(http_requests)"
    
  latency:
    description: "Latenza della richiesta al 95° percentile"
    measurement: "histogram_quantile(0.95, http_request_duration_seconds)"
    
  correctness:
    description: "Percentuale di richieste che restituiscono dati corretti"
    measurement: "count(validation_passed) / count(requests)"

3. Imposta gli SLO iniziali in modo conservativo:

def calculate_initial_slo(service_type, criticality):
    """
    Calcola l'SLO iniziale in base alle caratteristiche del servizio
    """
    base_slos = {
        'critical': {
            'availability': 0.999,  # 99.9%
            'latency_p95': 1.0,     # 1 secondo
            'latency_p99': 2.0      # 2 secondi
        },
        'high': {
            'availability': 0.995,  # 99.5%
            'latency_p95': 2.0,
            'latency_p99': 5.0
        },
        'medium': {
            'availability': 0.99,   # 99%
            'latency_p95': 5.0,
            'latency_p99': 10.0
        }
    }
    
    return base_slos.get(criticality, base_slos['medium'])

# Esempio
checkout_slo = calculate_initial_slo('payment', 'critical')
print(f"Checkout SLO: {checkout_slo}")

4. Pianifica l'iterazione:

  • Inizia con una finestra di misurazione di 4 settimane
  • Rivedi le prestazioni degli SLO settimanalmente
  • Regola in base alle prestazioni effettive e al feedback degli utenti
  • Rendi gli SLO più stringenti man mano che il sistema matura

5. Documenta le ipotesi:

## Ipotesi SLO (Iniziali)

### Disponibilità: 99,9%
- Presuppone: Affidabilità standard dell'infrastruttura cloud
- Budget di errore: 43 minuti/mese
- Revisione: Dopo 3 mesi di dati

### Latenza (p95): < 1s
- Presuppone: Query al database < 100ms
- Presuppone: Nessun calcolo complesso
- Revisione: Se i modelli di query cambiano

### Dipendenze
- Disponibilità API esterna: 99,95%
- Disponibilità del database: 99,99%

Rarità: Comune Difficoltà: Difficile


2. Come gestisci gli SLO in conflitto tra diversi segmenti di utenti?

Risposta: Segmenti di utenti diversi hanno spesso esigenze di affidabilità diverse:

Strategia: SLO multi-livello

class SLOTier:
    def __init__(self, name, availability, latency_p95, latency_p99):
        self.name = name
        self.availability = availability
        self.latency_p95 = latency_p95
        self.latency_p99 = latency_p99
        self.error_budget = 1 - availability

# Definisci i livelli
tiers = {
    'premium': SLOTier(
        name='Premium',
        availability=0.9999,  # 99.99% - 4.3 min/mese
        latency_p95=0.5,
        latency_p99=1.0
    ),
    'standard': SLOTier(
        name='Standard',
        availability=0.999,   # 99.9% - 43 min/mese
        latency_p95=1.0,
        latency_p99=2.0
    ),
    'free': SLOTier(
        name='Free',
        availability=0.99,    # 99% - 7.2 ore/mese
        latency_p95=2.0,
        latency_p99=5.0
    )
}

# Instrada le richieste in base al livello
def get_user_tier(user_id):
    # Cerca il livello di abbonamento dell'utente
    return user_subscription_tier(user_id)

def apply_slo_policy(user_id, request):
    tier = get_user_tier(user_id)
    slo = tiers[tier]
    
    # Applica policy specifiche per livello
    request.timeout = slo.latency_p99
    request.priority = tier  # Per la priorità della coda
    request.retry_budget = calculate_retry_budget(slo.error_budget)
    
    return request

Implementazione con Routing del Traffico:

# Esempio Kubernetes: Deployments separati per livello
apiVersion: apps/v1
kind: Deployment
metadata:
  name: api-premium
spec:
  replicas: 10
  template:
    spec:
      containers:
      - name: api
        resources:
          requests:
            cpu: "2"
            memory: "4Gi"
          limits:
            cpu: "4"
            memory: "8Gi"
      priorityClassName: high-priority
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: api-standard
spec:
  replicas: 5
  template:
    spec:
      containers:
      - name: api
        resources:
          requests:
            cpu: "1"
            memory: "2Gi"

Monitoraggio per livello:

# Disponibilità per livello
sum(rate(http_requests_total{status!~"5.."}[5m])) by (tier)
/
sum(rate(http_requests_total[5m])) by (tier)

# Latenza per livello
histogram_quantile(0.95,
  rate(http_request_duration_seconds_bucket[5m])
) by (tier)

Rarità: Non comune Difficoltà: Difficile


Pianificazione della Capacità

3. Descrivi il tuo processo di pianificazione della capacità per un servizio in rapida crescita.

Risposta: La pianificazione della capacità garantisce che le risorse soddisfino la domanda ottimizzando i costi:

Framework di Pianificazione della Capacità:

Loading diagram...

1. Misura la baseline:

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression

class CapacityPlanner:
    def __init__(self, metrics_data):
        self.data = pd.DataFrame(metrics_data)
    
    def analyze_trends(self, metric_name, days=30):
        """Analizza i trend storici"""
        metric_data = self.data[metric_name].tail(days * 24)  # Dati orari
        
        # Calcola il tasso di crescita
        start_value = metric_data.iloc[0]
        end_value = metric_data.iloc[-1]
        growth_rate = ((end_value - start_value) / start_value) * 100
        
        # Identifica l'utilizzo di picco
        peak_value = metric_data.max()
        peak_time = metric_data.idxmax()
        
        # Calcola i percentili
        p50 = metric_data.quantile(0.50)
        p95 = metric_data.quantile(0.95)
        p99 = metric_data.quantile(0.99)
        
        return {
            'growth_rate': growth_rate,
            'peak_value': peak_value,
            'peak_time': peak_time,
            'p50': p50,
            'p95': p95,
            'p99': p99
        }
    
    def forecast_capacity(self, metric_name, months_ahead=3):
        """Prevedi le future esigenze di capacità"""
        # Prepara i dati
        df = self.data[[metric_name]].reset_index()
        df['days'] = (df.index / 24).astype(int)  # Converti le ore in giorni
        
        # Addestra il modello
        X = df[['days']].values
        y = df[metric_name].values
        
        model = LinearRegression()
        model.fit(X, y)
        
        # Prevedi
        future_days = np.array([[df['days'].max() + (30 * months_ahead)]])
        forecast = model.predict(future_days)[0]
        
        # Aggiungi un margine di sicurezza (20%)
        forecast_with_margin = forecast * 1.2
        
        return {
            'forecast': forecast,
            'with_margin': forecast_with_margin,
            'current': y[-1],
            'growth_factor': forecast / y[-1]
        }
    
    def calculate_resource_needs(self, requests_per_second, 
                                 requests_per_instance=100,
                                 headroom=0.3):
        """Calcola le istanze richieste"""
        # Capacità di base
        base_instances = np.ceil(requests_per_second / requests_per_instance)
        
        # Aggiungi margine di sicurezza per picchi e manutenzione
        total_instances = np.ceil(base_instances * (1 + headroom))
        
        return {
            'base_instances': int(base_instances),
            'total_instances': int(total_instances),
            'headroom_instances': int(total_instances - base_instances)
        }

# Esempio di utilizzo
metrics = {
    'requests_per_second': [100, 105, 110, 115, 120, ...],  # Dati storici
    'cpu_usage': [45, 48, 50, 52, 55, ...],
    'memory_usage': [60, 62, 65, 67, 70, ...]
}

planner = CapacityPlanner(metrics)

# Analizza i trend
trends = planner.analyze_trends('requests_per_second', days=30)
print(f"Tasso di crescita: {trends['growth_rate']:.2f}%")
print(f"Picco RPS: {trends['peak_value']}")

# Prevedi la capacità
forecast = planner.forecast_capacity('requests_per_second', months_ahead=3)
print(f"RPS previsto tra 3 mesi: {forecast['forecast']:.0f}")
print(f"Con margine di sicurezza: {forecast['with_margin']:.0f}")

# Calcola le esigenze di risorse
resources = planner.calculate_resource_needs(
    requests_per_second=forecast['with_margin'],
    requests_per_instance=100,
    headroom=0.3
)
print(f"Istanze richieste: {resources['total_instances']}")

2. Considera i fattori di crescita:

  • Tasso di crescita degli utenti
  • Lancio di nuove funzionalità
  • Schemi stagionali
  • Campagne di marketing
  • Espansione geografica

3. Pianifica il margine di sicurezza:

  • N+1: Sopravvivi al guasto di un'istanza
  • N+2: Sopravvivi a due guasti o a un'interruzione di zona
  • Picchi di traffico: 2-3 volte la capacità normale
  • Finestre di manutenzione: 20-30% di overhead

4. Ottimizzazione dei costi:

def optimize_instance_mix(workload_profile):
    """
    Ottimizza i tipi di istanza per il costo
    """
    # Mix di tipi di istanza
    instance_types = {
        'on_demand': {
            'cost_per_hour': 0.10,
            'reliability': 1.0,
            'percentage': 0.3  # 30% on-demand per la baseline
        },
        'spot': {
            'cost_per_hour': 0.03,
            'reliability': 0.95,
            'percentage': 0.5  # 50% spot per risparmiare sui costi
        },
        'reserved': {
            'cost_per_hour': 0.06,
            'reliability': 1.0,
            'percentage': 0.2  # 20% reserved per carichi prevedibili
        }
    }
    
    total_instances = workload_profile['total_instances']
    
    allocation = {}
    for instance_type, config in instance_types.items():
        count = int(total_instances * config['percentage'])
        allocation[instance_type] = {
            'count': count,
            'monthly_cost': count * config['cost_per_hour'] * 730
        }
    
    return allocation

Rarità: Molto comune Difficoltà: Difficile


Chaos Engineering

4. Come implementeresti il chaos engineering in produzione?

Risposta: Il chaos engineering testa in modo proattivo la resilienza del sistema iniettando guasti:

Principi del Chaos Engineering:

  1. Costruisci ipotesi attorno allo stato stazionario
  2. Varia eventi del mondo reale
  3. Esegui esperimenti in produzione
  4. Automatizza gli esperimenti
  5. Minimizza il raggio d'azione

Implementazione:

# Framework per esperimenti di chaos
from dataclasses import dataclass
from enum import Enum
import random
import time

class ExperimentStatus(Enum):
    PLANNED = "pianificato"
    RUNNING = "in esecuzione"
    COMPLETED = "completato"
    ABORTED = "interrotto"

@dataclass
class ChaosExperiment:
    name: str
    hypothesis: str
    blast_radius: float  # Percentuale di traffico interessato
    duration_seconds: int
    rollback_criteria: dict
    
    def __post_init__(self):
        self.status = ExperimentStatus.PLANNED
        self.metrics_before = {}
        self.metrics_during = {}
        self.metrics_after = {}

class ChaosRunner:
    def __init__(self, monitoring_client):
        self.monitoring = monitoring_client
        self.experiments = []
    
    def run_experiment(self, experiment: ChaosExperiment):
        """Esegui l'esperimento di chaos con controlli di sicurezza"""
        print(f"Avvio dell'esperimento: {experiment.name}")
        
        # 1. Misura la baseline
        experiment.metrics_before = self.measure_metrics()
        print(f"Metriche di baseline: {experiment.metrics_before}")
        
        # 2. Verifica che il sistema sia in salute
        if not self.is_system_healthy(experiment.metrics_before):
            print("Sistema non in salute, interruzione dell'esperimento")
            return False
        
        # 3. Inietta il guasto
        experiment.status = ExperimentStatus.RUNNING
        failure_injection = self.inject_failure(experiment)
        
        try:
            # 4. Monitora durante l'esperimento
            start_time = time.time()
            while time.time() - start_time < experiment.duration_seconds:
                experiment.metrics_during = self.measure_metrics()
                
                # Controlla i criteri di rollback
                if self.should_rollback(experiment):
                    print("Criteri di rollback soddisfatti, arresto dell'esperimento")
                    self.rollback(failure_injection)
                    experiment.status = ExperimentStatus.ABORTED
                    return False
                
                time.sleep(10)  # Controlla ogni 10 secondi
            
            # 5. Esegui il rollback dell'iniezione di guasto
            self.rollback(failure_injection)
            
            # 6. Misura il ripristino
            time.sleep(60)  # Attendi che il sistema si stabilizzi
            experiment.metrics_after = self.measure_metrics()
            
            # 7. Analizza i risultati
            experiment.status = ExperimentStatus.COMPLETED
            return self.analyze_results(experiment)
            
        except Exception as e:
            print(f"Esperimento fallito: {e}")
            self.rollback(failure_injection)
            experiment.status = ExperimentStatus.ABORTED
            return False
    
    def inject_failure(self, experiment):
        """Inietta un tipo di guasto specifico"""
        # L'implementazione dipende dal tipo di guasto
        pass
    
    def measure_metrics(self):
        """Misura le metriche chiave del sistema"""
        return {
            'error_rate': self.monitoring.get_error_rate(),
            'latency_p95': self.monitoring.get_latency_p95(),
            'requests_per_second': self.monitoring.get_rps(),
            'availability': self.monitoring.get_availability()
        }
    
    def is_system_healthy(self, metrics):
        """Verifica se il sistema soddisfa gli SLO"""
        return (
            metrics['error_rate'] < 0.01 and  # < 1% di errori
            metrics['latency_p95'] < 1.0 and  # < 1s di latenza
            metrics['availability'] > 0.999   # > 99.9% di disponibilità
        )
    
    def should_rollback(self, experiment):
        """Verifica se l'esperimento deve essere interrotto"""
        current = experiment.metrics_during
        criteria = experiment.rollback_criteria
        
        return (
            current['error_rate'] > criteria.get('max_error_rate', 0.05) or
            current['latency_p95'] > criteria.get('max_latency', 5.0) or
            current['availability'] < criteria.get('min_availability', 0.99)
        )
    
    def rollback(self, failure_injection):
        """Rimuovi l'iniezione di guasto"""
        print("Esecuzione del rollback dell'iniezione di guasto")
        # L'implementazione dipende dal tipo di guasto
    
    def analyze_results(self, experiment):
        """Analizza i risultati dell'esperimento"""
        before = experiment.metrics_before
        during = experiment.metrics_during
        after = experiment.metrics_after
        
        print(f"\nRisultati dell'esperimento: {experiment.name}")
        print(f"Ipotesi: {experiment.hypothesis}")
        print(f"\nMetriche:")
        print(f"  Prima: {before}")
        print(f"  Durante: {during}")
        print(f"  Dopo: {after}")
        
        # Determina se l'ipotesi è stata convalidata
        hypothesis_validated = (
            during['availability'] >= experiment.rollback_criteria['min_availability']
        )
        
        return hypothesis_validated

# Esempio di esperimento
experiment = ChaosExperiment(
    name="Test di Failover del Database",
    hypothesis="Il sistema rimane disponibile durante il failover del database",
    blast_radius=0.1,  # 10% del traffico
    duration_seconds=300,  # 5 minuti
    rollback_criteria={
        'max_error_rate': 0.05,
        'max_latency': 5.0,
        'min_availability': 0.99
    }
)

Esperimenti di Chaos Comuni:

1. Latenza di Rete:

# Utilizzo di tc (traffic control) per aggiungere latenza
tc qdisc add dev eth0 root netem delay 100ms 20ms

# Rollback
tc qdisc del dev eth0 root

2. Guasto del Pod (Kubernetes):

# Uccidi pod casuali
kubectl delete pod -l app=myapp --random=1

# Utilizzo di Chaos Mesh
kubectl apply -f - <<EOF
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
  name: pod-failure
spec:
  action: pod-failure
  mode: one
  selector:
    namespaces:
      - production
    labelSelectors:
      app: myapp
  duration: "30s"
EOF

3. Esaurimento delle Risorse:

# Stress della CPU
stress-ng --cpu 4 --timeout 60s

# Stress della memoria
stress-ng --vm 2 --vm-bytes 2G --timeout 60s

Rarità: Comune Difficoltà: Difficile


Leadership negli Incidenti

5. Come guideresti un incidente ad alta gravità dal rilevamento al postmortem?

Risposta: I Senior SRE spesso fungono da responsabili degli incidenti per interruzioni critiche:

Struttura di Comando dell'Incidente:

Loading diagram...

Responsabilità del Responsabile dell'Incidente:

1. Risposta Iniziale (0-5 minuti):

## Checklist del Responsabile dell'Incidente

### Azioni Immediate
- [ ] Riconosci l'incidente
- [ ] Valuta la gravità (SEV-1, SEV-2, SEV-3)
- [ ] Crea un canale per l'incidente (#incident-AAAA-MM-GG-NNN)
- [ ] Notifica ai team appropriati
- [ ] Designa i ruoli:
  - [ ] Responsabile Tecnico
  - [ ] Responsabile delle Comunicazioni
  - [ ] Scrivano (documenta la timeline)

### Valutazione della Gravità
**SEV-1 (Critico):**
- Interruzione completa del servizio
- Perdita di dati
- Violazione della sicurezza
- > 50% degli utenti interessati

**SEV-2 (Alta):**
- Interruzione parziale
- Prestazioni degradate
- 10-50% degli utenti interessati

**SEV-3 (Media):**
- Degradazione minore
- < 10% degli utenti interessati
- Soluzione alternativa disponibile

2. Fase di Investigazione:

class IncidentCommander:
    def __init__(self, incident_id):
        self.incident_id = incident_id
        self.timeline = []
        self.status_updates = []
        self.action_items = []
    
    def coordinate_investigation(self):
        """Coordina l'investigazione tecnica"""
        # Tracce di investigazione parallele
        tracks = {
            'recent_changes': self.check_recent_deployments(),
            'infrastructure': self.check_infrastructure_health(),
            'dependencies': self.check_external_dependencies(),
            'metrics': self.analyze_metrics_anomalies()
        }
        
        return tracks
    
    def make_decision(self, options, deadline_minutes=5):
        """Prendi una decisione vincolata dal tempo"""
        print(f"Decisione necessaria entro {deadline_minutes} minuti")
        print(f"Opzioni: {options}")
        
        # Raccogli input dai responsabili tecnici
        # Prendi una decisione basata su:
        # - Impatto sull'utente
        # - Rischio di ogni opzione
        # - Tempo per l'implementazione
        # - Reversibilità
        
        return selected_option
    
    def communicate_status(self, interval_minutes=15):
        """Aggiornamenti di stato regolari"""
        update = {
            'timestamp': datetime.now(),
            'status': self.get_current_status(),
            'impact': self.assess_user_impact(),
            'eta': self.estimate_resolution_time(),
            'next_update': datetime.now() + timedelta(minutes=interval_minutes)
        }
        
        # Invia agli stakeholder
        self.send_status_update(update)
        self.status_updates.append(update)
    
    def log_timeline_event(self, event, timestamp=None):
        """Documenta la timeline dell'incidente"""
        self.timeline.append({
            'timestamp': timestamp or datetime.now(),
            'event': event,
            'logged_by': self.get_current_user()
        })

3. Strategie di Mitigazione:

def evaluate_mitigation_options():
    """Valuta e dai priorità alle opzioni di mitigazione"""
    options = [
        {
            'action': 'Rollback del deployment',
            'time_to_implement': 5,  # minuti
            'risk': 'low',
            'effectiveness': 'high',
            'reversible': True
        },
        {
            'action': 'Aumenta le risorse',
            'time_to_implement': 2,
            'risk': 'low',
            'effectiveness': 'medium',
            'reversible': True
        },
        {
            'action': 'Disabilita la feature flag',
            'time_to_implement': 1,
            'risk': 'low',
            'effectiveness': 'high',
            'reversible': True
        },
        {
            'action': 'Failover del database',
            'time_to_implement': 10,
            'risk': 'medium',
            'effectiveness': 'high',
            'reversible': False
        }
    ]
    
    # Ordina per: basso rischio, alta efficacia, implementazione rapida
    sorted_options = sorted(
        options,
        key=lambda x: (
            x['risk'] == 'low',
            x['effectiveness'] == 'high',
            -x['time_to_implement']
        ),
        reverse=True
    )
    
    return sorted_options

4. Postmortem (Senza Colpe):

# Postmortem dell'Incidente: Interruzione dell'API

**Data:** 2024-11-25
**Durata:** 45 minuti
**Gravità:** SEV-1
**Responsabile dell'Incidente:** Alice
**Responsabile Tecnico:** Bob

## Riepilogo Esecutivo
Interruzione completa dell'API che interessa tutti gli utenti a causa dell'esaurimento del pool di connessioni al database.

## Impatto
- **Utenti interessati:** 100% (tutti gli utenti)
- **Durata:** 45 minuti
- **Impatto sui ricavi:** ~$50.000
- **Impatto sugli SLO:** Consumato il 75% del budget di errore mensile

## Causa Principale
Pool di connessioni al database esaurito a causa di una perdita di connessione nella nuova funzionalità distribuita 2 ore prima dell'incidente.

## Timeline
| Ora | Evento |
|------|-------|
| 14:00 | Deployment di v2.5.0 |
| 15:45 | Primi avvisi per aumento della latenza |
| 15:50 | Interruzione completa dell'API |
| 15:52 | Incidente dichiarato (SEV-1) |
| 15:55 | Identificato il database come collo di bottiglia |
| 16:05 | Decisione di eseguire il rollback del deployment |
| 16:15 | Rollback completato |
| 16:20 | Servizio in ripresa |
| 16:35 | Ripristino completo confermato |

## Cosa è Andato Bene
- Rilevamento rapido (5 minuti dal primo sintomo)
- Struttura di comando dell'incidente chiara
- Decisione rapida di eseguire il rollback
- Buona comunicazione con gli stakeholder

## Cosa è Andato Storto
- Perdita di connessione non rilevata nei test
- Nessun monitoraggio del pool di connessioni
- Deployment durante le ore di punta

## Azioni da Intraprendere
| Azione | Proprietario | Data di Scadenza | Stato |
|--------|-------|----------|--------|
| Aggiungi monitoraggio del pool di connessioni | Alice | 2024-12-01 | Aperto |
| Implementa il rilevamento delle perdite di connessione nei test | Bob | 2024-12-05 | Aperto |
| Aggiorna la policy di deployment (evita le ore di punta) | Charlie | 2024-11-30 | Aperto |
| Aggiungi un circuit breaker per le connessioni al database | David | 2024-12-10 | Aperto |

## Lezioni Apprese
- Le lacune nel monitoraggio possono nascondere problemi critici
- Il tempismo del deployment è importante
- Necessità di una migliore integrazione dei test per le perdite di risorse

Rarità: Molto comune Difficoltà: Difficile


Affidabilità dei Sistemi Distribuiti

6. Come garantisci l'affidabilità in un'architettura a microservizi distribuita?

Risposta: I sistemi distribuiti introducono sfide di affidabilità uniche:

Modelli Chiave:

1. Service Mesh per la Resilienza:

# Esempio Istio: Circuit breaking e tentativi
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: api-service
spec:
  host: api-service
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 50
        http2MaxRequests: 100
        maxRequestsPerConnection: 2
    outlierDetection:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
      minHealthPercent: 40
    loadBalancer:
      simple: LEAST_REQUEST
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: api-service
spec:
  hosts:
  - api-service
  http:
  - retries:
      attempts: 3
      perTryTimeout: 2s
      retryOn: 5xx,reset,connect-failure,refused-stream
    timeout: 10s
    route:
    - destination:
        host: api-service

2. Distributed Tracing:

from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

# Imposta il tracing
trace.set_tracer_provider(TracerProvider())
jaeger_exporter = JaegerExporter(
    agent_host_name="jaeger",
    agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

# Strumenta la libreria requests
RequestsInstrumentor().instrument()

# Utilizza nel codice
tracer = trace.get_tracer(__name__)

def process_order(order_id):
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)
        
        # Chiama il servizio di pagamento
        with tracer.start_as_current_span("call_payment_service"):
Newsletter subscription

Consigli di carriera settimanali che funzionano davvero

Ricevi le ultime idee direttamente nella tua casella di posta

Decorative doodle

Distinguiti dai Reclutatori e Ottieni il Lavoro dei Tuoi Sogni

Unisciti a migliaia di persone che hanno trasformato la loro carriera con curriculum potenziati dall'IA che superano l'ATS e impressionano i responsabili delle assunzioni.

Inizia a creare ora

Condividi questo post

Fai Contare i Tuoi 6 Secondi

I reclutatori scansionano i curriculum per una media di soli 6-7 secondi. I nostri modelli comprovati sono progettati per catturare l'attenzione istantaneamente e farli continuare a leggere.