Distributed Streaming
How Fungi runs across multiple nodes.
For how to deploy, see Deployment / Kubernetes.
Topology
┌─────────────────┐
│ JobManager │
│ (in server) │
└────────┬────────┘
│ gRPC
┌──────────────┼──────────────┐
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│ TaskMgr │ │ TaskMgr │ │ TaskMgr │
│ Slot 1 │ │ Slot 1 │ │ Slot 1 │
│ Slot 2 │ │ Slot 2 │ │ Slot 2 │
└──────────┘ └──────────┘ └──────────┘
│ │ │
└──────────────┼──────────────┘
↓
Object Storage
(checkpoints)
Components
| Component | Where | Role |
|---|---|---|
| JobManager | Embedded in fungi-server | Schedules tasks, coordinates checkpoints |
| TaskManager | Separate process | Executes tasks, sends heartbeats |
| Scheduler | In JobManager | Assigns tasks to slots |
Control Plane (gRPC)
| RPC | Purpose |
|---|---|
RegisterTaskManager | TaskManager joins cluster |
Heartbeat | Liveness check |
DeployTask / CancelTask | Task lifecycle |
TriggerCheckpoint / AcknowledgeCheckpoint | Checkpoint coordination |
Data Plane (gRPC streaming)
| Channel | Use Case |
|---|---|
LocalChannel | Same TaskManager (in-process) |
RemoteChannel | Cross-TaskManager (gRPC) |
| Partitioner | Use Case |
|---|---|
RoundRobin | Load balancing |
KeyBy | Group by key columns |
Broadcast | Send to all (joins) |
Checkpointing
Barriers flow inline with data:
Source-1 ──[data][barrier][data]──→ Operator
Source-2 ──[data][data][barrier]──→ Operator
↓
Wait for ALL barriers
Snapshot state
Forward barrier
States: Triggered → WaitingForAck → Committing → Completed
Exactly-Once
Two-phase commit for sinks:
- Prepare — buffer data
- Commit — make visible after checkpoint complete
- Abort — discard if checkpoint fails
Recovery:
- JobManager detects failure (missed heartbeats)
- Reload state from last checkpoint
- Resume from checkpoint position
API
#![allow(unused)] fn main() { use fungi_scheduler::{JobGraph, Scheduler, JobVertex}; use fungi_cluster::checkpoint::CheckpointCoordinator; let mut graph = JobGraph::new("my-job"); graph.add_vertex(JobVertex::new("source", "kafka_source", 2)); graph.add_vertex(JobVertex::new("sink", "kafka_sink", 1)); graph.add_edge("source", "sink", PartitionType::Forward); let mut scheduler = Scheduler::new(); scheduler.register_task_manager("tm-1", 4, 4096, 4); scheduler.submit_job(graph)?; let deployments = scheduler.schedule()?; let coordinator = CheckpointCoordinator::new(storage, 60_000); let checkpoint_id = coordinator.trigger_checkpoint("job-1", 3).await; }
Troubleshooting
| Issue | Check |
|---|---|
| TaskManager not registering | JobManager reachable? port 50052 open? |
| Checkpoints fail | Storage reachable? Disk space? |
| Tasks not scheduling | Free slots? Job graph valid? |