Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Distributed Streaming

How Fungi runs across multiple nodes.

For how to deploy, see Deployment / Kubernetes.

Topology

                ┌─────────────────┐
                │   JobManager    │
                │  (in server)    │
                └────────┬────────┘
                         │ gRPC
          ┌──────────────┼──────────────┐
          ↓              ↓              ↓
    ┌──────────┐   ┌──────────┐   ┌──────────┐
    │ TaskMgr  │   │ TaskMgr  │   │ TaskMgr  │
    │ Slot 1   │   │ Slot 1   │   │ Slot 1   │
    │ Slot 2   │   │ Slot 2   │   │ Slot 2   │
    └──────────┘   └──────────┘   └──────────┘
          │              │              │
          └──────────────┼──────────────┘
                         ↓
                  Object Storage
                  (checkpoints)

Components

ComponentWhereRole
JobManagerEmbedded in fungi-serverSchedules tasks, coordinates checkpoints
TaskManagerSeparate processExecutes tasks, sends heartbeats
SchedulerIn JobManagerAssigns tasks to slots

Control Plane (gRPC)

RPCPurpose
RegisterTaskManagerTaskManager joins cluster
HeartbeatLiveness check
DeployTask / CancelTaskTask lifecycle
TriggerCheckpoint / AcknowledgeCheckpointCheckpoint coordination

Data Plane (gRPC streaming)

ChannelUse Case
LocalChannelSame TaskManager (in-process)
RemoteChannelCross-TaskManager (gRPC)
PartitionerUse Case
RoundRobinLoad balancing
KeyByGroup by key columns
BroadcastSend to all (joins)

Checkpointing

Barriers flow inline with data:

Source-1 ──[data][barrier][data]──→ Operator
Source-2 ──[data][data][barrier]──→ Operator
                                     ↓
                          Wait for ALL barriers
                          Snapshot state
                          Forward barrier

States: TriggeredWaitingForAckCommittingCompleted

Exactly-Once

Two-phase commit for sinks:

  1. Prepare — buffer data
  2. Commit — make visible after checkpoint complete
  3. Abort — discard if checkpoint fails

Recovery:

  1. JobManager detects failure (missed heartbeats)
  2. Reload state from last checkpoint
  3. Resume from checkpoint position

API

#![allow(unused)]
fn main() {
use fungi_scheduler::{JobGraph, Scheduler, JobVertex};
use fungi_cluster::checkpoint::CheckpointCoordinator;

let mut graph = JobGraph::new("my-job");
graph.add_vertex(JobVertex::new("source", "kafka_source", 2));
graph.add_vertex(JobVertex::new("sink", "kafka_sink", 1));
graph.add_edge("source", "sink", PartitionType::Forward);

let mut scheduler = Scheduler::new();
scheduler.register_task_manager("tm-1", 4, 4096, 4);
scheduler.submit_job(graph)?;
let deployments = scheduler.schedule()?;

let coordinator = CheckpointCoordinator::new(storage, 60_000);
let checkpoint_id = coordinator.trigger_checkpoint("job-1", 3).await;
}

Troubleshooting

IssueCheck
TaskManager not registeringJobManager reachable? port 50052 open?
Checkpoints failStorage reachable? Disk space?
Tasks not schedulingFree slots? Job graph valid?