Keyboard shortcuts

Press ← or β†’ to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

πŸ„ Fungi

Nature's data processors β€” streaming that grows on you.

Fungi is an AI-native Rust streaming engine with Flink compatibility.

Why Fungi?

FeatureFungiArroyoFlink (JVM)
Flink APIβœ… FullβŒβœ… Full
AI / MCPβœ… Native❌❌
GC pausesβœ… Noneβœ… None❌ Yes
LanguageRustRustJava
StartupInstantFastSlow

Three Differentiators

  1. Flink API Compatibility β€” Drop-in replacement for Flink DataStream and SQL jobs.
  2. High Performance β€” Zero GC, predictable latency, low memory.
  3. AI Native β€” Built-in MCP server for natural-language pipeline management.

Hello World

fungi server &
fungi sql --query "SELECT 1 as id, 'Alice' as name"
id | name
---+------
1  | Alice

Next Steps

Quickstart

5 minutes to your first Fungi pipeline.

1. Install

git clone https://github.com/yourusername/fungi.git
cd fungi
cargo build --release
ln -sf $(pwd)/target/release/fungi ~/.local/bin/
ln -sf $(pwd)/target/release/fungi-server ~/.local/bin/

See Deployment / Local for full install options.

2. Start Server

fungi server
πŸ„ Fungi server started on 0.0.0.0:50051

3. Run SQL

fungi sql --query "SELECT 1 as id, 'Alice' as name"
id | name
---+------
1  | Alice

4. Run a Pipeline

fungi pipeline sample --output my-pipeline.yaml
fungi pipeline run --config my-pipeline.yaml

See Guides / Pipelines for full config reference.

Local Mode (no server)

fungi sql --query "SELECT 1" --local

Next Steps

Local Deployment

Build and run Fungi on your machine.

Prerequisites

  • Rust 1.95+
  • Cargo
  • Git

Build from Source

git clone https://github.com/yourusername/fungi.git
cd fungi
cargo build --release

Install Binaries

Option A: Symlink (recommended)

ln -sf $(pwd)/target/release/fungi ~/.local/bin/
ln -sf $(pwd)/target/release/fungi-server ~/.local/bin/

# Ensure ~/.local/bin in PATH
echo 'export PATH="$HOME/.local/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

Option B: Cargo install

cargo install --path crates/fungi-cli
cargo install --path crates/fungi-server

Run

fungi server                    # default 0.0.0.0:50051
fungi server --port 8080
fungi sql --query "SELECT 1"

Run Tests

cargo test --workspace

Next Steps

Docker Deployment

Run Fungi in a container.

Build Image

# Local build (requires x86_64)
docker build -t fungi-server:latest .

# Remote build (for ARM hosts)
./scripts/build-remote.sh user@x86-server

# Fast build with persistent cache (95s with warm cache)
./scripts/build-fast.sh

Run Container

docker run -d \
  --name fungi-server \
  -p 50051:50051 \
  -p 8080:8080 \
  -v /data/fungi:/data \
  -e FUNGI_AUTH_ENABLED=true \
  -e FUNGI_ADMIN_USERNAME=admin \
  -e FUNGI_ADMIN_PASSWORD=admin123 \
  fungi-server:latest

Verify

curl http://localhost:8080/health

Environment Variables

See Configuration for the full list.

Next Steps

  • Kubernetes β€” orchestrated production deployment

Kubernetes Deployment

Deploy Fungi to a Kubernetes cluster β€” single-node or distributed.

Prerequisites

  • Kubernetes 1.24+
  • kubectl configured
  • A storage class for PVC

Single-Node Deployment

Runs fungi-server with embedded JobManager.

kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/secret.yaml
kubectl apply -f k8s/pvc.yaml
kubectl apply -f k8s/deployment.yaml
kubectl apply -f k8s/service.yaml

kubectl get pods -n fungi

Distributed Deployment

Runs separate JobManager and TaskManager pods with HPA autoscaling.

kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/secret.yaml
kubectl apply -f k8s/pvc.yaml
kubectl apply -f k8s/distributed-deployment.yaml

This creates:

  • fungi-jobmanager (1 replica) β€” scheduling + checkpoint coordination
  • fungi-taskmanager (3 replicas, HPA 1–10) β€” task execution

See Concepts / Distributed for the architecture.

Verify

kubectl get pods -n fungi
kubectl logs -f deployment/fungi-server -n fungi
kubectl run curl-test --rm -i --restart=Never --image=curlimages/curl -n fungi \
  -- curl -s http://fungi-service:50051/health

Expose Externally

NodePort

kubectl patch svc fungi-service -n fungi -p '{"spec":{"type":"NodePort"}}'
kubectl get svc fungi-service -n fungi

LoadBalancer

kubectl apply -f k8s/service-lb.yaml
kubectl get svc fungi-service-lb -n fungi -w

Dashboard

Port-forward then run dashboard locally:

kubectl port-forward svc/fungi-service 50051:50051 -n fungi
cd crates/fungi-dashboard && trunk serve --port 8082

Manifests

FilePurpose
k8s/namespace.yamlNamespace
k8s/secret.yamlJWT + admin credentials
k8s/pvc.yamlPersistent storage
k8s/deployment.yamlSingle-node server
k8s/distributed-deployment.yamlJM + TM + HPA
k8s/service.yamlClusterIP
k8s/service-lb.yamlLoadBalancer

Undeploy

kubectl delete namespace fungi

Troubleshooting

IssueCheck
Pod Pendingkubectl describe pvc -n fungi β€” storage class, size, zone
Health probe failskubectl logs deployment/fungi-server -n fungi β€” port binding
Cannot access externallyCloud firewall / security group for NodePort range 30000–32767
Dashboard can't reach APICheck Trunk.toml proxy backend URL

Configuration

All env vars and config file options.

Environment Variables

Server

VariableDefaultDescription
FUNGI_PORT50051gRPC/API port
FUNGI_HTTP_PORT8080HTTP port
FUNGI_LOG_LEVELinfotrace / debug / info / warn / error
FUNGI_DATA_DIR/dataData directory

Auth

VariableDefaultDescription
FUNGI_AUTH_ENABLEDfalseEnable auth
FUNGI_JWT_SECRET(random)JWT signing secret
FUNGI_ADMIN_USERNAME""Bootstrap admin
FUNGI_ADMIN_PASSWORD""Bootstrap admin

Cluster

VariableDefaultDescription
FUNGI_CLUSTER_MODEfalseEnable distributed mode
FUNGI_JOB_MANAGER_HOST0.0.0.0JobManager address
FUNGI_JOB_MANAGER_PORT50052JobManager port
FUNGI_TASK_MANAGER_SLOTS4Slots per TaskManager

Kafka

VariableDefaultDescription
KAFKA_BOOTSTRAP_SERVERSlocalhost:9092Kafka brokers

Enterprise

VariableDefaultDescription
FUNGI_LICENSE_KEY(none)Enterprise license key (Ed25519-signed JWT)

License modes:

ModeKeyBehavior
Personal(none)Enterprise features disabled, info log at startup
EnterpriseValid keyAll features enabled
EnterpriseExpiredWarn for 7 days, then disable
EnterpriseInvalidFail fast

Generate test license (dev only):

cargo run -p fungi-enterprise --example generate_test_license

Config File

fungi.toml:

[server]
host = "0.0.0.0"
port = 50051

[checkpoint]
interval_ms = 60000
timeout_ms = 600000
max_concurrent = 1

[state_backend]
type = "memory"               # or "rocksdb"

[state_backend.rocksdb]
path = "/var/lib/fungi/state"

[kafka]
bootstrap_servers = "localhost:9092"
group_id = "fungi-default"

[logging]
level = "info"
format = "pretty"             # or "json"

[cluster]
mode = false
[cluster.job_manager]
host = "0.0.0.0"
port = 50052
[cluster.task_manager]
slots = 4
memory_mb = 4096

Logging

export RUST_LOG=fungi=debug
export RUST_LOG=fungi_server=info,fungi_sql=debug   # per-crate

Performance Tuning

SettingWhereNotes
Parallelismenv.set_parallelism(4)per-job
Channel capacityexecutor.create_channels(1000)backpressure
Checkpoint intervalfungi.toml [checkpoint]trade latency vs durability
Kafka batchproducer/consumer configthroughput vs latency

Architecture

How Fungi's components fit together.

System Overview

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                gRPC / REST API              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚    SQL (DataFusion)  β”‚  DataStream API      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚         Streaming Execution Engine          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚Operator β”‚ β”‚  State   β”‚ β”‚  Watermark  β”‚  β”‚
β”‚  β”‚  Graph  β”‚ β”‚ Manager  β”‚ β”‚  & Windows  β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚   Kafka  β”‚  State Backend  β”‚  Checkpoints   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Crates

CratePurpose
fungi-commonShared types, errors, config
fungi-stateState backends (Memory, RocksDB)
fungi-streamingWatermarks, windows, timers
fungi-executionOperator graph, task executor, Python UDF
fungi-checkpointCheckpoint coordinator
fungi-sqlDataFusion integration
fungi-flinkFlink DataStream API
fungi-connectorKafka source/sink
fungi-servergRPC/HTTP server
fungi-cliCommand-line interface
fungi-clusterDistributed protocol
fungi-schedulerJob/task scheduling
fungi-reliabilityDLQ, retries, circuit breaker
fungi-securityAuth, RBAC, audit
fungi-tenancyMulti-tenant isolation
fungi-dashboardLeptos WASM dashboard
fungi-mcpMCP server for AI agents

Data Flow

Source β†’ Deserialize β†’ Timestamp β†’ KeyBy β†’ Window β†’ Aggregate β†’ Serialize β†’ Sink
  1. Source reads from Kafka / file / API
  2. Deserialize β†’ Arrow RecordBatch
  3. Timestamp assigns event time
  4. KeyBy partitions for parallelism
  5. Window groups by time (tumbling / sliding / session)
  6. Aggregate computes (sum / count / avg / custom)
  7. Serialize β†’ bytes
  8. Sink writes to Kafka / DB / file

All internal data flows use Arrow RecordBatch β€” zero-copy, SIMD-friendly.

Execution Modes

ModeUse Case
EmbeddedSingle process, library API, CLI --local
ServerSingle node, gRPC API, dashboard
DistributedJobManager + N TaskManagers, K8s

See Concepts / Distributed for the multi-node model.

Distributed Streaming

How Fungi runs across multiple nodes.

For how to deploy, see Deployment / Kubernetes.

Topology

                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                β”‚   JobManager    β”‚
                β”‚  (in server)    β”‚
                β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚ gRPC
          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
          ↓              ↓              ↓
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚ TaskMgr  β”‚   β”‚ TaskMgr  β”‚   β”‚ TaskMgr  β”‚
    β”‚ Slot 1   β”‚   β”‚ Slot 1   β”‚   β”‚ Slot 1   β”‚
    β”‚ Slot 2   β”‚   β”‚ Slot 2   β”‚   β”‚ Slot 2   β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
          β”‚              β”‚              β”‚
          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         ↓
                  Object Storage
                  (checkpoints)

Components

ComponentWhereRole
JobManagerEmbedded in fungi-serverSchedules tasks, coordinates checkpoints
TaskManagerSeparate processExecutes tasks, sends heartbeats
SchedulerIn JobManagerAssigns tasks to slots

Control Plane (gRPC)

RPCPurpose
RegisterTaskManagerTaskManager joins cluster
HeartbeatLiveness check
DeployTask / CancelTaskTask lifecycle
TriggerCheckpoint / AcknowledgeCheckpointCheckpoint coordination

Data Plane (gRPC streaming)

ChannelUse Case
LocalChannelSame TaskManager (in-process)
RemoteChannelCross-TaskManager (gRPC)
PartitionerUse Case
RoundRobinLoad balancing
KeyByGroup by key columns
BroadcastSend to all (joins)

Checkpointing

Barriers flow inline with data:

Source-1 ──[data][barrier][data]──→ Operator
Source-2 ──[data][data][barrier]──→ Operator
                                     ↓
                          Wait for ALL barriers
                          Snapshot state
                          Forward barrier

States: Triggered β†’ WaitingForAck β†’ Committing β†’ Completed

Exactly-Once

Two-phase commit for sinks:

  1. Prepare β€” buffer data
  2. Commit β€” make visible after checkpoint complete
  3. Abort β€” discard if checkpoint fails

Recovery:

  1. JobManager detects failure (missed heartbeats)
  2. Reload state from last checkpoint
  3. Resume from checkpoint position

API

#![allow(unused)]
fn main() {
use fungi_scheduler::{JobGraph, Scheduler, JobVertex};
use fungi_cluster::checkpoint::CheckpointCoordinator;

let mut graph = JobGraph::new("my-job");
graph.add_vertex(JobVertex::new("source", "kafka_source", 2));
graph.add_vertex(JobVertex::new("sink", "kafka_sink", 1));
graph.add_edge("source", "sink", PartitionType::Forward);

let mut scheduler = Scheduler::new();
scheduler.register_task_manager("tm-1", 4, 4096, 4);
scheduler.submit_job(graph)?;
let deployments = scheduler.schedule()?;

let coordinator = CheckpointCoordinator::new(storage, 60_000);
let checkpoint_id = coordinator.trigger_checkpoint("job-1", 3).await;
}

Troubleshooting

IssueCheck
TaskManager not registeringJobManager reachable? port 50052 open?
Checkpoints failStorage reachable? Disk space?
Tasks not schedulingFree slots? Job graph valid?

Reliability

Production-grade reliability primitives in fungi-reliability (14 tests).

Features

FeaturePurpose
DeduplicationTTL-based dedup store, drop duplicate messages
Dead-Letter QueueRetry with backoff, then park failures in DLQ
Circuit BreakerStop calling failing dependencies
CheckpointingPersist offsets + state for recovery

Configuration

reliability:
  mode: at-least-once
  deduplication:
    enabled: true
    ttl_seconds: 3600
  error_handling:
    max_retries: 3
    retry_backoff_ms: 1000
    retry_backoff_multiplier: 2.0
    max_backoff_ms: 30000
    dead_letter_topic: "{topic}-dlq"
  circuit_breaker:
    enabled: true
    failure_threshold: 5
    recovery_timeout_ms: 30000
    half_open_max_requests: 3

Deduplication

Hash of (key, topic, partition, offset) β†’ TTL store. Duplicates within TTL are dropped.

#![allow(unused)]
fn main() {
use fungi_reliability::DeduplicationStore;
use std::time::Duration;

let store = DeduplicationStore::new(Duration::from_secs(3600));
if !store.is_duplicate("msg-123").await { /* process */ }
}

Dead-Letter Queue

Failed messages retry with exponential backoff. After max_retries, message is sent to the DLQ topic.

#![allow(unused)]
fn main() {
use fungi_reliability::{ErrorHandler, ErrorHandlingConfig, ErrorAction};

let handler = ErrorHandler::new(ErrorHandlingConfig::default());
match handler.handle_failure("msg-123", "parse error", 0).await {
    ErrorAction::Retry => { /* wait and retry */ }
    ErrorAction::DeadLettered => { /* parked in DLQ */ }
}
}

Backoff: 1000ms β†’ 2000ms β†’ 4000ms β†’ … capped at 30000ms.

DLQ payload:

{ "original_id": "msg-123", "error": "...", "failed_at": "...", "retry_count": 3 }

Circuit Breaker

   CLOSED ── failures β‰₯ threshold ──→ OPEN
     ↑                                  β”‚
     β”‚                          recovery_timeout
     β”‚                                  ↓
   success ←── HALF_OPEN ──────────  one call
#![allow(unused)]
fn main() {
use fungi_reliability::{CircuitBreaker, CircuitBreakerConfig};

let mut cb = CircuitBreaker::new(CircuitBreakerConfig::default());
if cb.should_allow_request() {
    match call_dependency().await {
        Ok(_) => cb.record_success().await,
        Err(_) => cb.record_failure().await,
    }
}
}
StateBehavior
CLOSEDAll calls pass through
OPENFail fast, no calls made
HALF_OPENLimited probes, success β†’ CLOSED

Checkpointing

#![allow(unused)]
fn main() {
use fungi_reliability::{CheckpointManager, InMemoryCheckpointStore};
use std::collections::HashMap;

let mgr = CheckpointManager::new(Box::new(InMemoryCheckpointStore::new()));

let mut offsets = HashMap::new();
offsets.insert("input-topic".to_string(), 1000);
let id = mgr.checkpoint(offsets, state_bytes).await?;

if let Some(cp) = mgr.latest().await? {
    // resume from cp.offsets + cp.state
}
}

See Concepts / Distributed for barrier protocol in distributed mode.

Security

Enterprise features: RBAC, audit→DB, Google SSO, LDAP, and DB-backed user store require a valid license key. In personal mode, all permissions are allowed and audit logs go to stdout only.

Authentication, authorization, and audit in fungi-security (10 tests).

Features

FeaturePurpose
API Key AuthLong-lived keys for CLI / automation
JWT AuthShort-lived tokens for web UI
RBACRole-based access control with wildcards
Audit LoggingTrack every authenticated action

Configuration

security:
  auth:
    api_keys:
      - key: "fungi-admin-xxxx"
        name: "admin"
        tenant: "team-a"
        permissions: ["*"]
      - key: "fungi-readonly-xxxx"
        name: "viewer"
        tenant: "team-a"
        permissions: ["pipeline:read", "metrics:read"]
    jwt:
      secret: "${JWT_SECRET}"
      expiry_hours: 24
      issuer: "fungi"
  rbac:
    roles:
      admin: { permissions: ["*"] }
      operator:
        permissions:
          - "pipeline:create"
          - "pipeline:read"
          - "pipeline:update"
          - "pipeline:start"
          - "pipeline:stop"
      viewer:
        permissions: ["pipeline:read", "metrics:read"]
  audit:
    enabled: true
    store: stdout

Bootstrap (Env)

export FUNGI_AUTH_ENABLED=true
export FUNGI_JWT_SECRET="your-secret"
export FUNGI_ADMIN_USERNAME=admin
export FUNGI_ADMIN_PASSWORD=admin123
fungi server

API Keys (CLI / Automation)

curl -H "X-API-Key: fungi-admin-xxxx" http://localhost:8080/api/jobs
fungi --api-key fungi-admin-xxxx job list

JWT (Web UI)

TOKEN=$(curl -X POST http://localhost:8080/auth/login \
  -H "Content-Type: application/json" \
  -d '{"user":"admin","key":"fungi-admin-xxxx"}' | jq -r .token)

curl -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/jobs

The dashboard stores the JWT in localStorage and attaches it to every request.

RBAC

Permissions are resource:action. * matches anything.

RolePermissions
admin*
operatorpipeline:create, pipeline:read, pipeline:update, pipeline:start, pipeline:stop
viewerpipeline:read, metrics:read

Custom roles in YAML:

rbac:
  roles:
    data-engineer:
      permissions: ["pipeline:create", "pipeline:read", "pipeline:update", "topic:read"]
    sre:
      permissions: ["pipeline:*", "metrics:*", "audit:read"]

Audit Log

{
  "timestamp": "2026-05-27T10:00:00Z",
  "user": "alice",
  "tenant": "team-a",
  "action": "pipeline:start",
  "resource": "analytics-pipeline",
  "ip": "10.0.0.1",
  "success": true
}
StoreStatus
stdoutAvailable
filePlanned
databasePlanned

Best Practices

  • API keys for machines, JWT for humans
  • Rotate FUNGI_JWT_SECRET regularly
  • Store secrets in K8s Secret
  • Pipe audit logs to a SIEM
  • Grant least privilege; start with viewer

Multi-Tenancy

Enterprise feature: Multi-tenancy requires a valid license key. In personal mode, all requests use the "default" tenant.

Logical isolation and resource quotas in fungi-tenancy (11 tests).

Features

FeaturePurpose
Tenant NamespacesLogical isolation of pipelines and resources
Resource QuotasPer-tenant limits on pipelines, throughput, state, topics
Tenant ContextAutomatic tenant extraction from request headers

Logical isolation β€” single binary, separate namespaces. For physical isolation, deploy one Fungi instance per tenant in a K8s namespace.

Configuration

tenancy:
  enabled: true
  default_tenant: "default"
  tenants:
    team-a:
      name: "Team Alpha"
      quotas:
        max_pipelines: 10
        max_throughput_mbps: 100
        max_state_size_gb: 10
        max_topics: 20
    team-b:
      name: "Team Beta"
      quotas:
        max_pipelines: 5
        max_throughput_mbps: 50
        max_state_size_gb: 5
        max_topics: 10

Tenant Context

Pass tenant via header or CLI flag:

curl -H "X-Tenant-ID: team-a" http://localhost:8080/api/jobs
fungi --tenant team-a job list

Scope a pipeline to a tenant:

name: team-a-pipeline
tenant: team-a
source:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: team-a-events

Quotas

QuotaDefaultDescription
max_pipelines10Max pipelines per tenant
max_throughput_mbps100Max aggregate throughput
max_state_size_gb10Max state store size
max_topics20Max Kafka topics

Quota breach β†’ 429 Too Many Requests.

Audit per Tenant

Audit log entries include tenant:

{ "timestamp": "...", "tenant": "team-a", "user": "alice", "action": "pipeline:run" }

Best Practices

  • One tenant per team / product line
  • Start with conservative quotas; raise on demand
  • Combine with RBAC for fine-grained control
  • For hard isolation, deploy one Fungi per tenant in separate K8s namespaces

Pipelines

Build data pipelines from a YAML config.

CLI

fungi pipeline sample --output my-pipeline.yaml   # generate template
fungi pipeline validate --config my-pipeline.yaml # check syntax
fungi pipeline run --config my-pipeline.yaml      # execute
fungi pipeline list                                # show running

YAML Reference

name: my-pipeline

source:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: input-topic
  group_id: my-group

transforms:
  - transform_type: filter
    filter: "amount > 100"
  - transform_type: map
    map: "SELECT user_id, amount * 1.1 AS adjusted FROM events"
  - transform_type: sql
    sql: "SELECT user_id, SUM(amount) AS total FROM events GROUP BY user_id"

sink:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: output-topic

JSON format is also accepted.

Transform Types

TypePurposeExample
filterDrop rows by predicate"amount > 100"
mapPer-row SQL expression"SELECT id, amount * 1.1 FROM events"
sqlArbitrary SQL (aggregations)"SELECT user_id, SUM(amount) FROM events GROUP BY user_id"
pythonCustom Python scriptsee Python UDF

Example: Order Processing

name: order-processing
source:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: orders
  group_id: order-processor
transforms:
  - transform_type: filter
    filter: "status = 'completed'"
  - transform_type: map
    map: "SELECT order_id, customer_id, total * 1.08 AS total_with_tax FROM orders"
sink:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: processed-orders

Best Practices

  • Use unique group_id per pipeline to avoid offset conflicts
  • Always validate before run
  • Use 127.0.0.1 (not localhost) for Kafka on macOS to force IPv4
  • Monitor via the dashboard

Troubleshooting

SymptomCheck
Won't startfungi pipeline validate --config …
No messagesTopic exists? Consumer group not stuck?
Transform errorSQL syntax? Column names match?

Kafka Integration

Native Kafka source and sink.

Setup Kafka (Local)

docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:latest

Create topics:

docker exec kafka /opt/kafka/bin/kafka-topics.sh \
  --create --topic events --bootstrap-server localhost:9092 \
  --partitions 1 --replication-factor 1

Pipeline (YAML)

name: kafka-pipeline
source:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: events
  group_id: my-group
sink:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: processed-events

Use 127.0.0.1 (not localhost) on macOS to force IPv4.

Rust API

Source

#![allow(unused)]
fn main() {
use fungi_connector::kafka::{KafkaSource, KafkaSourceConfig};

let config = KafkaSourceConfig {
    bootstrap_servers: "127.0.0.1:9092".into(),
    topic: "events".into(),
    group_id: "my-group".into(),
    auto_offset_reset: "earliest".into(),
};

let mut source = KafkaSource::new(config);
source.open(None).await?;
let event = source.poll().await?;
}

Sink

#![allow(unused)]
fn main() {
use fungi_connector::kafka::{KafkaSink, KafkaSinkConfig};

let config = KafkaSinkConfig {
    bootstrap_servers: "127.0.0.1:9092".into(),
    topic: "processed-events".into(),
};

let mut sink = KafkaSink::new(config);
sink.open().await?;
sink.write_batch(record_batch).await?;
}

Config Reference

Source

FieldDefaultDescription
bootstrap_servers(required)Kafka brokers
topic(required)Topic to consume
group_id(required)Consumer group
auto_offset_resetearliestearliest / latest

Sink

FieldDefaultDescription
bootstrap_servers(required)Kafka brokers
topic(required)Topic to produce

Common Gotchas

IssueFix
Connection refused on macOSUse 127.0.0.1, not localhost
Stale offsetsUse a fresh group_id (timestamp-based)
Slow startupAdd broker.address.family=v4

Examples

cargo run -p fungi-examples --example kafka_test
cargo run -p fungi-examples --example kafka_sql
cargo run -p fungi-examples --example kafka_pipeline

PostgreSQL Connector

Read from and write to PostgreSQL tables.

Quick Start

name: pg-to-pg
source:
  connector: postgres
  url: postgres://user:pass@localhost:5432/mydb
  table: events
  filter: "created_at > now() - interval '1 hour'"
  batch_size: 500

sink:
  connector: postgres
  url: postgres://user:pass@localhost:5432/mydb
  table: processed_events
  write_mode: upsert

Config Reference

Source

FieldDefaultDescription
url(required)Postgres connection URL
table(required)Table name
schemapublicSchema name
columns*Columns to select
filter(none)WHERE clause
batch_size1000Rows per batch

Sink

FieldDefaultDescription
url(required)Postgres connection URL
table(required)Table name
schemapublicSchema name
write_modeinsertinsert or upsert

Type Mapping

PostgresArrow
int2, int4Int32
int8Int64
float4, float8Float64
text, varcharUtf8
boolBoolean

Feature Flag

cargo build --features postgres
cargo run -p fungi-examples --example postgres_pipeline --features postgres

Pipeline Examples

Kafka β†’ SQL β†’ Postgres

name: kafka-to-pg
source:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: events
  group_id: pg-sink
transforms:
  - transform_type: filter
    filter: "amount > 100"
sink:
  connector: postgres
  url: postgres://user:pass@localhost:5432/mydb
  table: high_value_events

Postgres β†’ Transform β†’ Kafka

name: pg-to-kafka
source:
  connector: postgres
  url: postgres://user:pass@localhost:5432/mydb
  table: orders
  filter: "status = 'pending'"
  batch_size: 500
transforms:
  - transform_type: map
    map: "SELECT id, customer_id, total * 1.08 AS total_with_tax FROM orders"
sink:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: processed-orders

Python UDF

Run custom Python code as a pipeline operator.

How It Works

Fungi Operator ──[Arrow IPC stdin]──→ Python subprocess
                                        β”‚ (your script)
Fungi Operator ←─[Arrow IPC stdout]β”€β”€β”˜
  • Subprocess stays alive between batches (stateful UDAF works)
  • Auto-restart on crash (configurable)
  • Zero-copy Arrow RecordBatch exchange

Quick Start

Script (transform.py)

import pyarrow as pa
import sys

reader = pa.ipc.open_stream(sys.stdin.buffer)
writer = None
for batch in reader:
    result = batch.filter(pa.compute.greater(batch['amount'], 100))
    if writer is None:
        writer = pa.ipc.new_stream(sys.stdout.buffer, result.schema)
    writer.write_batch(result)

Pipeline config

transform:
  type: python
  source: transform.py
  env:
    THRESHOLD: "100"

Multi-File Projects (Zip)

Package as my_transform.zip:

my_transform/
β”œβ”€β”€ __init__.py
β”œβ”€β”€ main.py          # entry point
└── utils.py
zip -r my_transform.zip my_transform/
transform:
  type: python
  source: my_transform.zip
  entry: main.py

Fungi extracts to a temp dir, sets PYTHONPATH, runs entry.

UDAF (Stateful)

The subprocess stays alive β€” keep state in memory:

import pyarrow as pa
import sys

class RunningAverage:
    def __init__(self): self.total, self.count = 0, 0
    def accumulate(self, batch):
        self.total += sum(batch['value'].to_pylist())
        self.count += len(batch)
    def result(self): return self.total / max(self.count, 1)

udaf = RunningAverage()
reader = pa.ipc.open_stream(sys.stdin.buffer)
writer = None
for batch in reader:
    udaf.accumulate(batch)
    out = pa.RecordBatch.from_pydict({'avg': [udaf.result()]})
    if writer is None:
        writer = pa.ipc.new_stream(sys.stdout.buffer, out.schema)
    writer.write_batch(out)

Config Reference

transform:
  type: python
  source: transform.py | my_transform.zip
  entry: main.py            # required for zip
  python: python3           # python executable
  env:
    KEY: "value"

Error Handling

ScenarioBehavior
Script not foundFail fast
Script crashAuto-restart (max 3)
Invalid outputLog + skip batch
Python missingFail fast

Prerequisites

pip install pyarrow      # Python 3.8+

Rust API

#![allow(unused)]
fn main() {
use fungi_execution::python_udf::{PythonUdfConfig, PythonUdfStreamingOperator};

let config = PythonUdfConfig::from_file("transform.py")
    .with_env("THRESHOLD", "100");

// or zip:
let config = PythonUdfConfig::from_zip("my_transform.zip", "main.py");

let mut op = PythonUdfStreamingOperator::new("my-udf", config);
op.open().await?;
let out = op.process_element(batch, watermark).await?;
op.close().await?;
}

CLI Reference

Commands

CommandPurpose
fungi serverStart server
fungi sqlExecute SQL
fungi jobManage jobs
fungi pipelineManage pipelines
fungi mcpMCP server / tools
fungi --helpShow help
fungi --versionShow version

Global Options

OptionDefaultDescription
--hostlocalhostServer host
--port50051Server port
--localfalseRun embedded, no server

server

fungi server                          # default 0.0.0.0:50051
fungi server --port 8080
fungi server --host 0.0.0.0 --port 50051

sql

fungi sql --query "SELECT 1"
fungi sql --query "SELECT 1" --local
fungi sql --file query.sql
fungi sql --query "SELECT 1" --host myserver --port 50051

job

fungi job list
fungi job status --id job-123
fungi job cancel --id job-123

pipeline

fungi pipeline sample --output my-pipeline.yaml
fungi pipeline validate --config my-pipeline.yaml
fungi pipeline run --config my-pipeline.yaml
fungi pipeline list

See Guides / Pipelines for config reference.

mcp

fungi mcp serve                       # start MCP server
fungi mcp tools                       # list tools
fungi mcp call <tool> --args '{...}'  # invoke a tool

See Operations / MCP.

Examples

$ fungi sql --query "SELECT 1 as id, 'Alice' as name"
id | name
---+------
1  | Alice

$ fungi sql --query "SELECT user_id, SUM(amount) AS total FROM events GROUP BY user_id"
user_id | total
--------+------
user-1  | 300
user-2  | 300

Dashboard

Leptos WASM web dashboard for monitoring and managing pipelines.

Setup

cargo install trunk
rustup target add wasm32-unknown-unknown

Run

# Terminal 1: API server
fungi server

# Terminal 2: dashboard
cd crates/fungi-dashboard
trunk serve --port 8082

Open http://localhost:8082.

Login

If FUNGI_AUTH_ENABLED=true, log in with the bootstrap admin credentials. The JWT is stored in localStorage and attached to every API call.

Pages

PageShows
DashboardStatus, version, uptime, JM/TM counts, recent jobs
PipelinesAll pipelines with status, throughput, errors
SQL IDEInteractive query editor
SettingsServer info, API keys, tenants

API Endpoints (consumed)

EndpointPurpose
GET /healthStatus, version, uptime, cluster info
GET /api/jobsList jobs
POST /api/sqlRun SQL
POST /api/auth/loginLogin

Trunk Proxy

crates/fungi-dashboard/Trunk.toml forwards /api/* and /health to the backend (default localhost:8080).

[[proxy]]
backend = "http://localhost:8080/api"
[[proxy]]
backend = "http://localhost:8080/health"

For remote clusters:

[[proxy]]
backend = "http://101.200.74.232:30051/api"

Production Build

cd crates/fungi-dashboard
trunk build --release
# Static files in dist/ β€” serve via nginx, Axum, or any static server

Tech Stack

ComponentPurpose
Leptos 0.6 (CSR)Reactive Rust UI
TrunkWASM bundler / dev server
Tailwind CSSStyling
reqwasmHTTP client

Customization

  • API base β€” edit Trunk.toml [[proxy]] blocks
  • Theme β€” edit index.html Tailwind config / classes
  • Pages β€” src/pages/, components in src/components/

Troubleshooting

SymptomFix
Cannot reach servercurl http://localhost:8080/health then check Trunk.toml
401 on every requestToken expired β€” log out and back in
Dashboard emptyServer has no jobs/pipelines yet β€” run fungi pipeline run

MCP Integration

Model Context Protocol server for AI agents.

What MCP Enables

AI agents (Claude, GPT, custom) can:

  • Create pipelines from natural language
  • Query pipeline state and metrics
  • Diagnose issues
  • Suggest optimizations

Start the Server

fungi mcp serve

CLI Tools

fungi mcp tools                     # list available tools
fungi mcp call <tool> --args '{...}' # invoke a tool

Available Tools

ToolPurpose
create-pipelineBuild a pipeline from a description
query-stateGet pipeline metrics / state
optimizeSuggest pipeline improvements
debugDiagnose pipeline issues
deployDeploy to a cluster

Configuration

# fungi.toml
[mcp]
enabled = true
host = "0.0.0.0"
port = 3000

[mcp.skills]
enabled = ["create-pipeline", "query-state", "optimize", "debug", "deploy"]

Connect Claude

fungi mcp install --agent claude

This registers Fungi as an MCP server in Claude's config.

Custom Agent (Python)

import fungi_mcp

client = fungi_mcp.Client("http://localhost:3000")

pipeline = client.create_pipeline(
    source="kafka://events",
    filter="amount > 100",
    aggregate="GROUP BY user_id, SUM(amount)",
    sink="kafka://processed-events",
)

state = client.query_state(pipeline.id)
print(state.throughput)

Example Conversation

User: Filter purchases over $100 from Kafka and aggregate by user.

Agent β†’ create-pipeline β†’ returns pipeline-abc123
       β†’ query-state    β†’ throughput, errors, lag
  • Operations / CLI β€” fungi mcp commands
  • ~/.pi/agent/skills/fungi/SKILL.md β€” Pi agent skill bundled with this repo

Rust SDK

Rust APIs for embedding Fungi or building operators.

use fungi_flink::{DataStream, StreamExecutionEnvironment};
use fungi_streaming::TumblingWindowAssigner;

#[tokio::main]
async fn main() {
    let env = StreamExecutionEnvironment::new();
    let stream: DataStream<Event> = env.from_source("events");

    stream
        .filter(|e| e.amount > 100)
        .map(|e| (e.user_id, e.amount))
        .key_by(|(uid, _)| uid.clone())
        .window(Box::new(TumblingWindowAssigner::new(5000)))
        .aggregate(SumAggregator)
        .sink("output");

    env.execute("my-pipeline").await.unwrap();
}

SQL

#![allow(unused)]
fn main() {
use fungi_sql::FungiSqlContext;

let ctx = FungiSqlContext::new();
ctx.register_table("events", table).await?;
ctx.sql("SELECT user_id, SUM(amount) FROM events GROUP BY user_id").await?;
}

State

#![allow(unused)]
fn main() {
use fungi_state::{InMemoryStateBackend, StateBackend};

let backend = InMemoryStateBackend::new();
let mut state = backend
    .create_keyed_state::<String, i64>("op-1", "counts").await?;

state.put("key".into(), 42).await?;
let v = state.get(&"key".into()).await?;   // Some(42)
}

RocksDB backend:

#![allow(unused)]
fn main() {
let backend = RocksDBStateBackend::new("/var/lib/fungi/state")?;
}

Execution Graph

#![allow(unused)]
fn main() {
use fungi_execution::{ExecutionGraph, OperatorId};

let mut graph = ExecutionGraph::new();
let src = OperatorId("source".into());
let snk = OperatorId("sink".into());
graph.add_operator(src.clone());
graph.add_operator(snk.clone());
graph.add_edge(src, snk);

let order = graph.topological_order();
}

Checkpointing

#![allow(unused)]
fn main() {
use fungi_checkpoint::{CheckpointCoordinator, InMemoryCheckpointStore};
use std::sync::Arc;

let store = Arc::new(InMemoryCheckpointStore::new());
let coord = CheckpointCoordinator::new(store, 60_000, 1);

let id = coord.trigger_checkpoint().await?;
coord.report_operator_checkpoint(id, "op-1".into(), vec![1, 2, 3]).await?;
coord.complete_checkpoint(id).await?;
}

Watermarks & Windows

#![allow(unused)]
fn main() {
use fungi_streaming::{BoundedOutOfOrderness, TumblingWindowAssigner, WatermarkStrategy, WindowAssigner};

let wm = BoundedOutOfOrderness::new(5000);
let watermark = wm.generate_watermark(100_000);            // 95000

let win = TumblingWindowAssigner::new(1000);
let windows = win.assign_windows(2500);                    // [2000..3000]
}

Python UDF

#![allow(unused)]
fn main() {
use fungi_execution::python_udf::{PythonUdfConfig, PythonUdfStreamingOperator};

let cfg = PythonUdfConfig::from_zip("transform.zip", "main.py")
    .with_env("THRESHOLD", "100");

let mut op = PythonUdfStreamingOperator::new("py-udf", cfg);
op.open().await?;
let out = op.process_element(batch, watermark).await?;
op.close().await?;
}

See Reference / Examples for runnable demos.

Examples

Runnable examples in crates/fungi-examples/.

Run

cargo run -p fungi-examples --example <name>

List

ExampleDemonstrates
state_backendKeyed state, snapshot/restore
streaming_primitivesWatermarks, windows, timers
execution_graphDAG construction + topological sort
checkpointingCoordinator, barrier, store
kafka_testKafka connectivity
kafka_streamingStreaming from Kafka
kafka_pipelinePipeline with transforms
kafka_sqlKafka β†’ SQL β†’ Kafka

kafka_sql (Full Pipeline)

# Setup Kafka
docker run -d --name kafka -p 9092:9092 apache/kafka:latest
docker exec kafka /opt/kafka/bin/kafka-topics.sh \
  --create --topic events --bootstrap-server localhost:9092 \
  --partitions 1 --replication-factor 1

# Run
cargo run -p fungi-examples --example kafka_sql

Output:

SQL Query Results:
  Total amount by user:
    user-1: 300
    user-2: 300
  Pipeline: Kafka β†’ SQL β†’ Kafka βœ…

See Guides / Kafka for full Kafka setup.

Code Locations

ExampleFile
state_backendcrates/fungi-examples/examples/state_backend.rs
streaming_primitivescrates/fungi-examples/examples/streaming_primitives.rs
execution_graphcrates/fungi-examples/examples/execution_graph.rs
checkpointingcrates/fungi-examples/examples/checkpointing.rs
kafka_*crates/fungi-examples/examples/kafka_*.rs

Changelog

[Unreleased]

Added

  • MCP server integration for AI agents
  • More SQL functions
  • Performance optimizations

[0.1.0] - 2026-05-27

Added

  • Core streaming engine
  • SQL support via DataFusion
  • Kafka source/sink
  • State management (Memory, RocksDB)
  • Checkpointing
  • CLI tool
  • gRPC server
  • Flink DataStream API compatibility
  • Examples and documentation

Architecture

  • fungi-common: Shared types, errors, configuration
  • fungi-state: State backends (Memory, RocksDB)
  • fungi-streaming: Watermarks, windows, timers
  • fungi-execution: Operator graph, task executor
  • fungi-checkpoint: Checkpoint coordinator
  • fungi-sql: DataFusion integration
  • fungi-flink: Flink DataStream API compatibility
  • fungi-server: gRPC server with REST API
  • fungi-runtime: Embedded and server modes
  • fungi-cli: Command-line interface
  • fungi-connector: Kafka source/sink

Features

  • SQL query execution
  • Kafka integration
  • State management
  • Checkpointing
  • CLI tool
  • gRPC API
  • Flink-compatible DataStream API

Performance

  • Zero GC pauses
  • Predictable latency
  • Low memory footprint
  • SIMD-optimized (via Arrow)

Documentation

  • Quickstart guide
  • Architecture documentation
  • CLI reference
  • SDK reference
  • Kafka integration guide
  • MCP integration guide
  • Examples

[0.0.1] - 2026-05-20

Added

  • Initial project setup
  • Basic crate structure

Contributing

Getting Started

  1. Fork the repository
  2. Clone your fork
  3. Create a feature branch
  4. Make your changes
  5. Run tests
  6. Submit a pull request

Development Setup

git clone https://github.com/yourusername/fungi.git
cd fungi
cargo build
cargo test --workspace

Code Style

# Format code
cargo fmt

# Check formatting
cargo fmt --check

# Run lints
cargo clippy --workspace -- -D warnings

Testing

# Run all tests
cargo test --workspace

# Run specific crate tests
cargo test -p fungi-state

# Run with output
cargo test --workspace -- --nocapture

Documentation

# Generate docs
cargo doc --workspace --open

# Build mdbook
mdbook build

# Serve mdbook locally
mdbook serve

Commit Messages

Format: <type>: <description>

Types:

  • feat: New feature
  • fix: Bug fix
  • docs: Documentation
  • test: Tests
  • refactor: Code refactoring
  • chore: Maintenance tasks

Example: feat: add Kafka source implementation

Pull Requests

  1. Keep PRs focused on a single feature/fix
  2. Add tests for new functionality
  3. Update documentation
  4. Ensure all tests pass
  5. Follow existing code style

Architecture

See Architecture for system design.

License

By contributing, you agree that your contributions will be licensed under the Apache License 2.0.