Reliability
Production-grade reliability primitives in fungi-reliability (14 tests).
Features
| Feature | Purpose |
|---|---|
| Deduplication | TTL-based dedup store, drop duplicate messages |
| Dead-Letter Queue | Retry with backoff, then park failures in DLQ |
| Circuit Breaker | Stop calling failing dependencies |
| Checkpointing | Persist offsets + state for recovery |
Configuration
reliability:
mode: at-least-once
deduplication:
enabled: true
ttl_seconds: 3600
error_handling:
max_retries: 3
retry_backoff_ms: 1000
retry_backoff_multiplier: 2.0
max_backoff_ms: 30000
dead_letter_topic: "{topic}-dlq"
circuit_breaker:
enabled: true
failure_threshold: 5
recovery_timeout_ms: 30000
half_open_max_requests: 3
Deduplication
Hash of (key, topic, partition, offset) → TTL store. Duplicates within TTL are dropped.
#![allow(unused)] fn main() { use fungi_reliability::DeduplicationStore; use std::time::Duration; let store = DeduplicationStore::new(Duration::from_secs(3600)); if !store.is_duplicate("msg-123").await { /* process */ } }
Dead-Letter Queue
Failed messages retry with exponential backoff. After max_retries, message is sent to the DLQ topic.
#![allow(unused)] fn main() { use fungi_reliability::{ErrorHandler, ErrorHandlingConfig, ErrorAction}; let handler = ErrorHandler::new(ErrorHandlingConfig::default()); match handler.handle_failure("msg-123", "parse error", 0).await { ErrorAction::Retry => { /* wait and retry */ } ErrorAction::DeadLettered => { /* parked in DLQ */ } } }
Backoff: 1000ms → 2000ms → 4000ms → … capped at 30000ms.
DLQ payload:
{ "original_id": "msg-123", "error": "...", "failed_at": "...", "retry_count": 3 }
Circuit Breaker
CLOSED ── failures ≥ threshold ──→ OPEN
↑ │
│ recovery_timeout
│ ↓
success ←── HALF_OPEN ────────── one call
#![allow(unused)] fn main() { use fungi_reliability::{CircuitBreaker, CircuitBreakerConfig}; let mut cb = CircuitBreaker::new(CircuitBreakerConfig::default()); if cb.should_allow_request() { match call_dependency().await { Ok(_) => cb.record_success().await, Err(_) => cb.record_failure().await, } } }
| State | Behavior |
|---|---|
| CLOSED | All calls pass through |
| OPEN | Fail fast, no calls made |
| HALF_OPEN | Limited probes, success → CLOSED |
Checkpointing
#![allow(unused)] fn main() { use fungi_reliability::{CheckpointManager, InMemoryCheckpointStore}; use std::collections::HashMap; let mgr = CheckpointManager::new(Box::new(InMemoryCheckpointStore::new())); let mut offsets = HashMap::new(); offsets.insert("input-topic".to_string(), 1000); let id = mgr.checkpoint(offsets, state_bytes).await?; if let Some(cp) = mgr.latest().await? { // resume from cp.offsets + cp.state } }
See Concepts / Distributed for barrier protocol in distributed mode.