Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Reliability

Production-grade reliability primitives in fungi-reliability (14 tests).

Features

FeaturePurpose
DeduplicationTTL-based dedup store, drop duplicate messages
Dead-Letter QueueRetry with backoff, then park failures in DLQ
Circuit BreakerStop calling failing dependencies
CheckpointingPersist offsets + state for recovery

Configuration

reliability:
  mode: at-least-once
  deduplication:
    enabled: true
    ttl_seconds: 3600
  error_handling:
    max_retries: 3
    retry_backoff_ms: 1000
    retry_backoff_multiplier: 2.0
    max_backoff_ms: 30000
    dead_letter_topic: "{topic}-dlq"
  circuit_breaker:
    enabled: true
    failure_threshold: 5
    recovery_timeout_ms: 30000
    half_open_max_requests: 3

Deduplication

Hash of (key, topic, partition, offset) → TTL store. Duplicates within TTL are dropped.

#![allow(unused)]
fn main() {
use fungi_reliability::DeduplicationStore;
use std::time::Duration;

let store = DeduplicationStore::new(Duration::from_secs(3600));
if !store.is_duplicate("msg-123").await { /* process */ }
}

Dead-Letter Queue

Failed messages retry with exponential backoff. After max_retries, message is sent to the DLQ topic.

#![allow(unused)]
fn main() {
use fungi_reliability::{ErrorHandler, ErrorHandlingConfig, ErrorAction};

let handler = ErrorHandler::new(ErrorHandlingConfig::default());
match handler.handle_failure("msg-123", "parse error", 0).await {
    ErrorAction::Retry => { /* wait and retry */ }
    ErrorAction::DeadLettered => { /* parked in DLQ */ }
}
}

Backoff: 1000ms → 2000ms → 4000ms → … capped at 30000ms.

DLQ payload:

{ "original_id": "msg-123", "error": "...", "failed_at": "...", "retry_count": 3 }

Circuit Breaker

   CLOSED ── failures ≥ threshold ──→ OPEN
     ↑                                  │
     │                          recovery_timeout
     │                                  ↓
   success ←── HALF_OPEN ──────────  one call
#![allow(unused)]
fn main() {
use fungi_reliability::{CircuitBreaker, CircuitBreakerConfig};

let mut cb = CircuitBreaker::new(CircuitBreakerConfig::default());
if cb.should_allow_request() {
    match call_dependency().await {
        Ok(_) => cb.record_success().await,
        Err(_) => cb.record_failure().await,
    }
}
}
StateBehavior
CLOSEDAll calls pass through
OPENFail fast, no calls made
HALF_OPENLimited probes, success → CLOSED

Checkpointing

#![allow(unused)]
fn main() {
use fungi_reliability::{CheckpointManager, InMemoryCheckpointStore};
use std::collections::HashMap;

let mgr = CheckpointManager::new(Box::new(InMemoryCheckpointStore::new()));

let mut offsets = HashMap::new();
offsets.insert("input-topic".to_string(), 1000);
let id = mgr.checkpoint(offsets, state_bytes).await?;

if let Some(cp) = mgr.latest().await? {
    // resume from cp.offsets + cp.state
}
}

See Concepts / Distributed for barrier protocol in distributed mode.