π Fungi
Nature's data processors β streaming that grows on you.
Fungi is an AI-native Rust streaming engine with Flink compatibility.
Why Fungi?
| Feature | Fungi | Arroyo | Flink (JVM) |
|---|---|---|---|
| Flink API | β Full | β | β Full |
| AI / MCP | β Native | β | β |
| GC pauses | β None | β None | β Yes |
| Language | Rust | Rust | Java |
| Startup | Instant | Fast | Slow |
Three Differentiators
- Flink API Compatibility β Drop-in replacement for Flink DataStream and SQL jobs.
- High Performance β Zero GC, predictable latency, low memory.
- AI Native β Built-in MCP server for natural-language pipeline management.
Hello World
fungi server &
fungi sql --query "SELECT 1 as id, 'Alice' as name"
id | name
---+------
1 | Alice
Next Steps
- Quickstart β 5-minute walkthrough
- Deployment / Local β install and run
- Concepts / Architecture β how it works
Quickstart
5 minutes to your first Fungi pipeline.
1. Install
git clone https://github.com/yourusername/fungi.git
cd fungi
cargo build --release
ln -sf $(pwd)/target/release/fungi ~/.local/bin/
ln -sf $(pwd)/target/release/fungi-server ~/.local/bin/
See Deployment / Local for full install options.
2. Start Server
fungi server
π Fungi server started on 0.0.0.0:50051
3. Run SQL
fungi sql --query "SELECT 1 as id, 'Alice' as name"
id | name
---+------
1 | Alice
4. Run a Pipeline
fungi pipeline sample --output my-pipeline.yaml
fungi pipeline run --config my-pipeline.yaml
See Guides / Pipelines for full config reference.
Local Mode (no server)
fungi sql --query "SELECT 1" --local
Next Steps
- Guides / Pipelines β build a real pipeline
- Guides / Kafka β stream from Kafka
- Guides / Python UDF β custom Python logic
- Operations / Dashboard β monitor visually
Local Deployment
Build and run Fungi on your machine.
Prerequisites
- Rust 1.95+
- Cargo
- Git
Build from Source
git clone https://github.com/yourusername/fungi.git
cd fungi
cargo build --release
Install Binaries
Option A: Symlink (recommended)
ln -sf $(pwd)/target/release/fungi ~/.local/bin/
ln -sf $(pwd)/target/release/fungi-server ~/.local/bin/
# Ensure ~/.local/bin in PATH
echo 'export PATH="$HOME/.local/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
Option B: Cargo install
cargo install --path crates/fungi-cli
cargo install --path crates/fungi-server
Run
fungi server # default 0.0.0.0:50051
fungi server --port 8080
fungi sql --query "SELECT 1"
Run Tests
cargo test --workspace
Next Steps
- Docker β containerized deployment
- Kubernetes β production cluster
- Configuration β env vars and config file
Docker Deployment
Run Fungi in a container.
Build Image
# Local build (requires x86_64)
docker build -t fungi-server:latest .
# Remote build (for ARM hosts)
./scripts/build-remote.sh user@x86-server
# Fast build with persistent cache (95s with warm cache)
./scripts/build-fast.sh
Run Container
docker run -d \
--name fungi-server \
-p 50051:50051 \
-p 8080:8080 \
-v /data/fungi:/data \
-e FUNGI_AUTH_ENABLED=true \
-e FUNGI_ADMIN_USERNAME=admin \
-e FUNGI_ADMIN_PASSWORD=admin123 \
fungi-server:latest
Verify
curl http://localhost:8080/health
Environment Variables
See Configuration for the full list.
Next Steps
- Kubernetes β orchestrated production deployment
Kubernetes Deployment
Deploy Fungi to a Kubernetes cluster β single-node or distributed.
Prerequisites
- Kubernetes 1.24+
kubectlconfigured- A storage class for PVC
Single-Node Deployment
Runs fungi-server with embedded JobManager.
kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/secret.yaml
kubectl apply -f k8s/pvc.yaml
kubectl apply -f k8s/deployment.yaml
kubectl apply -f k8s/service.yaml
kubectl get pods -n fungi
Distributed Deployment
Runs separate JobManager and TaskManager pods with HPA autoscaling.
kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/secret.yaml
kubectl apply -f k8s/pvc.yaml
kubectl apply -f k8s/distributed-deployment.yaml
This creates:
fungi-jobmanager(1 replica) β scheduling + checkpoint coordinationfungi-taskmanager(3 replicas, HPA 1β10) β task execution
See Concepts / Distributed for the architecture.
Verify
kubectl get pods -n fungi
kubectl logs -f deployment/fungi-server -n fungi
kubectl run curl-test --rm -i --restart=Never --image=curlimages/curl -n fungi \
-- curl -s http://fungi-service:50051/health
Expose Externally
NodePort
kubectl patch svc fungi-service -n fungi -p '{"spec":{"type":"NodePort"}}'
kubectl get svc fungi-service -n fungi
LoadBalancer
kubectl apply -f k8s/service-lb.yaml
kubectl get svc fungi-service-lb -n fungi -w
Dashboard
Port-forward then run dashboard locally:
kubectl port-forward svc/fungi-service 50051:50051 -n fungi
cd crates/fungi-dashboard && trunk serve --port 8082
Manifests
| File | Purpose |
|---|---|
k8s/namespace.yaml | Namespace |
k8s/secret.yaml | JWT + admin credentials |
k8s/pvc.yaml | Persistent storage |
k8s/deployment.yaml | Single-node server |
k8s/distributed-deployment.yaml | JM + TM + HPA |
k8s/service.yaml | ClusterIP |
k8s/service-lb.yaml | LoadBalancer |
Undeploy
kubectl delete namespace fungi
Troubleshooting
| Issue | Check |
|---|---|
| Pod Pending | kubectl describe pvc -n fungi β storage class, size, zone |
| Health probe fails | kubectl logs deployment/fungi-server -n fungi β port binding |
| Cannot access externally | Cloud firewall / security group for NodePort range 30000β32767 |
| Dashboard can't reach API | Check Trunk.toml proxy backend URL |
Configuration
All env vars and config file options.
Environment Variables
Server
| Variable | Default | Description |
|---|---|---|
FUNGI_PORT | 50051 | gRPC/API port |
FUNGI_HTTP_PORT | 8080 | HTTP port |
FUNGI_LOG_LEVEL | info | trace / debug / info / warn / error |
FUNGI_DATA_DIR | /data | Data directory |
Auth
| Variable | Default | Description |
|---|---|---|
FUNGI_AUTH_ENABLED | false | Enable auth |
FUNGI_JWT_SECRET | (random) | JWT signing secret |
FUNGI_ADMIN_USERNAME | "" | Bootstrap admin |
FUNGI_ADMIN_PASSWORD | "" | Bootstrap admin |
Cluster
| Variable | Default | Description |
|---|---|---|
FUNGI_CLUSTER_MODE | false | Enable distributed mode |
FUNGI_JOB_MANAGER_HOST | 0.0.0.0 | JobManager address |
FUNGI_JOB_MANAGER_PORT | 50052 | JobManager port |
FUNGI_TASK_MANAGER_SLOTS | 4 | Slots per TaskManager |
Kafka
| Variable | Default | Description |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS | localhost:9092 | Kafka brokers |
Enterprise
| Variable | Default | Description |
|---|---|---|
FUNGI_LICENSE_KEY | (none) | Enterprise license key (Ed25519-signed JWT) |
License modes:
| Mode | Key | Behavior |
|---|---|---|
| Personal | (none) | Enterprise features disabled, info log at startup |
| Enterprise | Valid key | All features enabled |
| Enterprise | Expired | Warn for 7 days, then disable |
| Enterprise | Invalid | Fail fast |
Generate test license (dev only):
cargo run -p fungi-enterprise --example generate_test_license
Config File
fungi.toml:
[server]
host = "0.0.0.0"
port = 50051
[checkpoint]
interval_ms = 60000
timeout_ms = 600000
max_concurrent = 1
[state_backend]
type = "memory" # or "rocksdb"
[state_backend.rocksdb]
path = "/var/lib/fungi/state"
[kafka]
bootstrap_servers = "localhost:9092"
group_id = "fungi-default"
[logging]
level = "info"
format = "pretty" # or "json"
[cluster]
mode = false
[cluster.job_manager]
host = "0.0.0.0"
port = 50052
[cluster.task_manager]
slots = 4
memory_mb = 4096
Logging
export RUST_LOG=fungi=debug
export RUST_LOG=fungi_server=info,fungi_sql=debug # per-crate
Performance Tuning
| Setting | Where | Notes |
|---|---|---|
| Parallelism | env.set_parallelism(4) | per-job |
| Channel capacity | executor.create_channels(1000) | backpressure |
| Checkpoint interval | fungi.toml [checkpoint] | trade latency vs durability |
| Kafka batch | producer/consumer config | throughput vs latency |
Architecture
How Fungi's components fit together.
System Overview
βββββββββββββββββββββββββββββββββββββββββββββββ
β gRPC / REST API β
βββββββββββββββββββββββββββββββββββββββββββββββ€
β SQL (DataFusion) β DataStream API β
βββββββββββββββββββββββββββββββββββββββββββββββ€
β Streaming Execution Engine β
β βββββββββββ ββββββββββββ βββββββββββββββ β
β βOperator β β State β β Watermark β β
β β Graph β β Manager β β & Windows β β
β βββββββββββ ββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββ€
β Kafka β State Backend β Checkpoints β
βββββββββββββββββββββββββββββββββββββββββββββββ
Crates
| Crate | Purpose |
|---|---|
fungi-common | Shared types, errors, config |
fungi-state | State backends (Memory, RocksDB) |
fungi-streaming | Watermarks, windows, timers |
fungi-execution | Operator graph, task executor, Python UDF |
fungi-checkpoint | Checkpoint coordinator |
fungi-sql | DataFusion integration |
fungi-flink | Flink DataStream API |
fungi-connector | Kafka source/sink |
fungi-server | gRPC/HTTP server |
fungi-cli | Command-line interface |
fungi-cluster | Distributed protocol |
fungi-scheduler | Job/task scheduling |
fungi-reliability | DLQ, retries, circuit breaker |
fungi-security | Auth, RBAC, audit |
fungi-tenancy | Multi-tenant isolation |
fungi-dashboard | Leptos WASM dashboard |
fungi-mcp | MCP server for AI agents |
Data Flow
Source β Deserialize β Timestamp β KeyBy β Window β Aggregate β Serialize β Sink
- Source reads from Kafka / file / API
- Deserialize β Arrow
RecordBatch - Timestamp assigns event time
- KeyBy partitions for parallelism
- Window groups by time (tumbling / sliding / session)
- Aggregate computes (sum / count / avg / custom)
- Serialize β bytes
- Sink writes to Kafka / DB / file
All internal data flows use Arrow RecordBatch β zero-copy, SIMD-friendly.
Execution Modes
| Mode | Use Case |
|---|---|
| Embedded | Single process, library API, CLI --local |
| Server | Single node, gRPC API, dashboard |
| Distributed | JobManager + N TaskManagers, K8s |
See Concepts / Distributed for the multi-node model.
Related
- Concepts / Distributed
- Concepts / Reliability
- Concepts / Security
- Concepts / Multi-Tenancy
- Reference / SDK β code examples
Distributed Streaming
How Fungi runs across multiple nodes.
For how to deploy, see Deployment / Kubernetes.
Topology
βββββββββββββββββββ
β JobManager β
β (in server) β
ββββββββββ¬βββββββββ
β gRPC
ββββββββββββββββΌβββββββββββββββ
β β β
ββββββββββββ ββββββββββββ ββββββββββββ
β TaskMgr β β TaskMgr β β TaskMgr β
β Slot 1 β β Slot 1 β β Slot 1 β
β Slot 2 β β Slot 2 β β Slot 2 β
ββββββββββββ ββββββββββββ ββββββββββββ
β β β
ββββββββββββββββΌβββββββββββββββ
β
Object Storage
(checkpoints)
Components
| Component | Where | Role |
|---|---|---|
| JobManager | Embedded in fungi-server | Schedules tasks, coordinates checkpoints |
| TaskManager | Separate process | Executes tasks, sends heartbeats |
| Scheduler | In JobManager | Assigns tasks to slots |
Control Plane (gRPC)
| RPC | Purpose |
|---|---|
RegisterTaskManager | TaskManager joins cluster |
Heartbeat | Liveness check |
DeployTask / CancelTask | Task lifecycle |
TriggerCheckpoint / AcknowledgeCheckpoint | Checkpoint coordination |
Data Plane (gRPC streaming)
| Channel | Use Case |
|---|---|
LocalChannel | Same TaskManager (in-process) |
RemoteChannel | Cross-TaskManager (gRPC) |
| Partitioner | Use Case |
|---|---|
RoundRobin | Load balancing |
KeyBy | Group by key columns |
Broadcast | Send to all (joins) |
Checkpointing
Barriers flow inline with data:
Source-1 ββ[data][barrier][data]βββ Operator
Source-2 ββ[data][data][barrier]βββ Operator
β
Wait for ALL barriers
Snapshot state
Forward barrier
States: Triggered β WaitingForAck β Committing β Completed
Exactly-Once
Two-phase commit for sinks:
- Prepare β buffer data
- Commit β make visible after checkpoint complete
- Abort β discard if checkpoint fails
Recovery:
- JobManager detects failure (missed heartbeats)
- Reload state from last checkpoint
- Resume from checkpoint position
API
#![allow(unused)] fn main() { use fungi_scheduler::{JobGraph, Scheduler, JobVertex}; use fungi_cluster::checkpoint::CheckpointCoordinator; let mut graph = JobGraph::new("my-job"); graph.add_vertex(JobVertex::new("source", "kafka_source", 2)); graph.add_vertex(JobVertex::new("sink", "kafka_sink", 1)); graph.add_edge("source", "sink", PartitionType::Forward); let mut scheduler = Scheduler::new(); scheduler.register_task_manager("tm-1", 4, 4096, 4); scheduler.submit_job(graph)?; let deployments = scheduler.schedule()?; let coordinator = CheckpointCoordinator::new(storage, 60_000); let checkpoint_id = coordinator.trigger_checkpoint("job-1", 3).await; }
Troubleshooting
| Issue | Check |
|---|---|
| TaskManager not registering | JobManager reachable? port 50052 open? |
| Checkpoints fail | Storage reachable? Disk space? |
| Tasks not scheduling | Free slots? Job graph valid? |
Reliability
Production-grade reliability primitives in fungi-reliability (14 tests).
Features
| Feature | Purpose |
|---|---|
| Deduplication | TTL-based dedup store, drop duplicate messages |
| Dead-Letter Queue | Retry with backoff, then park failures in DLQ |
| Circuit Breaker | Stop calling failing dependencies |
| Checkpointing | Persist offsets + state for recovery |
Configuration
reliability:
mode: at-least-once
deduplication:
enabled: true
ttl_seconds: 3600
error_handling:
max_retries: 3
retry_backoff_ms: 1000
retry_backoff_multiplier: 2.0
max_backoff_ms: 30000
dead_letter_topic: "{topic}-dlq"
circuit_breaker:
enabled: true
failure_threshold: 5
recovery_timeout_ms: 30000
half_open_max_requests: 3
Deduplication
Hash of (key, topic, partition, offset) β TTL store. Duplicates within TTL are dropped.
#![allow(unused)] fn main() { use fungi_reliability::DeduplicationStore; use std::time::Duration; let store = DeduplicationStore::new(Duration::from_secs(3600)); if !store.is_duplicate("msg-123").await { /* process */ } }
Dead-Letter Queue
Failed messages retry with exponential backoff. After max_retries, message is sent to the DLQ topic.
#![allow(unused)] fn main() { use fungi_reliability::{ErrorHandler, ErrorHandlingConfig, ErrorAction}; let handler = ErrorHandler::new(ErrorHandlingConfig::default()); match handler.handle_failure("msg-123", "parse error", 0).await { ErrorAction::Retry => { /* wait and retry */ } ErrorAction::DeadLettered => { /* parked in DLQ */ } } }
Backoff: 1000ms β 2000ms β 4000ms β β¦ capped at 30000ms.
DLQ payload:
{ "original_id": "msg-123", "error": "...", "failed_at": "...", "retry_count": 3 }
Circuit Breaker
CLOSED ββ failures β₯ threshold βββ OPEN
β β
β recovery_timeout
β β
success βββ HALF_OPEN ββββββββββ one call
#![allow(unused)] fn main() { use fungi_reliability::{CircuitBreaker, CircuitBreakerConfig}; let mut cb = CircuitBreaker::new(CircuitBreakerConfig::default()); if cb.should_allow_request() { match call_dependency().await { Ok(_) => cb.record_success().await, Err(_) => cb.record_failure().await, } } }
| State | Behavior |
|---|---|
| CLOSED | All calls pass through |
| OPEN | Fail fast, no calls made |
| HALF_OPEN | Limited probes, success β CLOSED |
Checkpointing
#![allow(unused)] fn main() { use fungi_reliability::{CheckpointManager, InMemoryCheckpointStore}; use std::collections::HashMap; let mgr = CheckpointManager::new(Box::new(InMemoryCheckpointStore::new())); let mut offsets = HashMap::new(); offsets.insert("input-topic".to_string(), 1000); let id = mgr.checkpoint(offsets, state_bytes).await?; if let Some(cp) = mgr.latest().await? { // resume from cp.offsets + cp.state } }
See Concepts / Distributed for barrier protocol in distributed mode.
Related
Security
Enterprise features: RBAC, auditβDB, Google SSO, LDAP, and DB-backed user store require a valid license key. In personal mode, all permissions are allowed and audit logs go to stdout only.
Authentication, authorization, and audit in fungi-security (10 tests).
Features
| Feature | Purpose |
|---|---|
| API Key Auth | Long-lived keys for CLI / automation |
| JWT Auth | Short-lived tokens for web UI |
| RBAC | Role-based access control with wildcards |
| Audit Logging | Track every authenticated action |
Configuration
security:
auth:
api_keys:
- key: "fungi-admin-xxxx"
name: "admin"
tenant: "team-a"
permissions: ["*"]
- key: "fungi-readonly-xxxx"
name: "viewer"
tenant: "team-a"
permissions: ["pipeline:read", "metrics:read"]
jwt:
secret: "${JWT_SECRET}"
expiry_hours: 24
issuer: "fungi"
rbac:
roles:
admin: { permissions: ["*"] }
operator:
permissions:
- "pipeline:create"
- "pipeline:read"
- "pipeline:update"
- "pipeline:start"
- "pipeline:stop"
viewer:
permissions: ["pipeline:read", "metrics:read"]
audit:
enabled: true
store: stdout
Bootstrap (Env)
export FUNGI_AUTH_ENABLED=true
export FUNGI_JWT_SECRET="your-secret"
export FUNGI_ADMIN_USERNAME=admin
export FUNGI_ADMIN_PASSWORD=admin123
fungi server
API Keys (CLI / Automation)
curl -H "X-API-Key: fungi-admin-xxxx" http://localhost:8080/api/jobs
fungi --api-key fungi-admin-xxxx job list
JWT (Web UI)
TOKEN=$(curl -X POST http://localhost:8080/auth/login \
-H "Content-Type: application/json" \
-d '{"user":"admin","key":"fungi-admin-xxxx"}' | jq -r .token)
curl -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/jobs
The dashboard stores the JWT in localStorage and attaches it to every request.
RBAC
Permissions are resource:action. * matches anything.
| Role | Permissions |
|---|---|
admin | * |
operator | pipeline:create, pipeline:read, pipeline:update, pipeline:start, pipeline:stop |
viewer | pipeline:read, metrics:read |
Custom roles in YAML:
rbac:
roles:
data-engineer:
permissions: ["pipeline:create", "pipeline:read", "pipeline:update", "topic:read"]
sre:
permissions: ["pipeline:*", "metrics:*", "audit:read"]
Audit Log
{
"timestamp": "2026-05-27T10:00:00Z",
"user": "alice",
"tenant": "team-a",
"action": "pipeline:start",
"resource": "analytics-pipeline",
"ip": "10.0.0.1",
"success": true
}
| Store | Status |
|---|---|
stdout | Available |
file | Planned |
database | Planned |
Best Practices
- API keys for machines, JWT for humans
- Rotate
FUNGI_JWT_SECRETregularly - Store secrets in K8s
Secret - Pipe audit logs to a SIEM
- Grant least privilege; start with
viewer
Related
Multi-Tenancy
Enterprise feature: Multi-tenancy requires a valid license key. In personal mode, all requests use the
"default"tenant.
Logical isolation and resource quotas in fungi-tenancy (11 tests).
Features
| Feature | Purpose |
|---|---|
| Tenant Namespaces | Logical isolation of pipelines and resources |
| Resource Quotas | Per-tenant limits on pipelines, throughput, state, topics |
| Tenant Context | Automatic tenant extraction from request headers |
Logical isolation β single binary, separate namespaces. For physical isolation, deploy one Fungi instance per tenant in a K8s namespace.
Configuration
tenancy:
enabled: true
default_tenant: "default"
tenants:
team-a:
name: "Team Alpha"
quotas:
max_pipelines: 10
max_throughput_mbps: 100
max_state_size_gb: 10
max_topics: 20
team-b:
name: "Team Beta"
quotas:
max_pipelines: 5
max_throughput_mbps: 50
max_state_size_gb: 5
max_topics: 10
Tenant Context
Pass tenant via header or CLI flag:
curl -H "X-Tenant-ID: team-a" http://localhost:8080/api/jobs
fungi --tenant team-a job list
Scope a pipeline to a tenant:
name: team-a-pipeline
tenant: team-a
source:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: team-a-events
Quotas
| Quota | Default | Description |
|---|---|---|
max_pipelines | 10 | Max pipelines per tenant |
max_throughput_mbps | 100 | Max aggregate throughput |
max_state_size_gb | 10 | Max state store size |
max_topics | 20 | Max Kafka topics |
Quota breach β 429 Too Many Requests.
Audit per Tenant
Audit log entries include tenant:
{ "timestamp": "...", "tenant": "team-a", "user": "alice", "action": "pipeline:run" }
Best Practices
- One tenant per team / product line
- Start with conservative quotas; raise on demand
- Combine with RBAC for fine-grained control
- For hard isolation, deploy one Fungi per tenant in separate K8s namespaces
Related
Pipelines
Build data pipelines from a YAML config.
CLI
fungi pipeline sample --output my-pipeline.yaml # generate template
fungi pipeline validate --config my-pipeline.yaml # check syntax
fungi pipeline run --config my-pipeline.yaml # execute
fungi pipeline list # show running
YAML Reference
name: my-pipeline
source:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: input-topic
group_id: my-group
transforms:
- transform_type: filter
filter: "amount > 100"
- transform_type: map
map: "SELECT user_id, amount * 1.1 AS adjusted FROM events"
- transform_type: sql
sql: "SELECT user_id, SUM(amount) AS total FROM events GROUP BY user_id"
sink:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: output-topic
JSON format is also accepted.
Transform Types
| Type | Purpose | Example |
|---|---|---|
filter | Drop rows by predicate | "amount > 100" |
map | Per-row SQL expression | "SELECT id, amount * 1.1 FROM events" |
sql | Arbitrary SQL (aggregations) | "SELECT user_id, SUM(amount) FROM events GROUP BY user_id" |
python | Custom Python script | see Python UDF |
Example: Order Processing
name: order-processing
source:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: orders
group_id: order-processor
transforms:
- transform_type: filter
filter: "status = 'completed'"
- transform_type: map
map: "SELECT order_id, customer_id, total * 1.08 AS total_with_tax FROM orders"
sink:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: processed-orders
Best Practices
- Use unique
group_idper pipeline to avoid offset conflicts - Always
validatebeforerun - Use
127.0.0.1(notlocalhost) for Kafka on macOS to force IPv4 - Monitor via the dashboard
Troubleshooting
| Symptom | Check |
|---|---|
| Won't start | fungi pipeline validate --config β¦ |
| No messages | Topic exists? Consumer group not stuck? |
| Transform error | SQL syntax? Column names match? |
Related
- Guides / Kafka β Kafka source/sink reference
- Guides / Python UDF β custom Python transforms
- Operations / CLI β full CLI reference
Kafka Integration
Native Kafka source and sink.
Setup Kafka (Local)
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:latest
Create topics:
docker exec kafka /opt/kafka/bin/kafka-topics.sh \
--create --topic events --bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1
Pipeline (YAML)
name: kafka-pipeline
source:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: events
group_id: my-group
sink:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: processed-events
Use
127.0.0.1(notlocalhost) on macOS to force IPv4.
Rust API
Source
#![allow(unused)] fn main() { use fungi_connector::kafka::{KafkaSource, KafkaSourceConfig}; let config = KafkaSourceConfig { bootstrap_servers: "127.0.0.1:9092".into(), topic: "events".into(), group_id: "my-group".into(), auto_offset_reset: "earliest".into(), }; let mut source = KafkaSource::new(config); source.open(None).await?; let event = source.poll().await?; }
Sink
#![allow(unused)] fn main() { use fungi_connector::kafka::{KafkaSink, KafkaSinkConfig}; let config = KafkaSinkConfig { bootstrap_servers: "127.0.0.1:9092".into(), topic: "processed-events".into(), }; let mut sink = KafkaSink::new(config); sink.open().await?; sink.write_batch(record_batch).await?; }
Config Reference
Source
| Field | Default | Description |
|---|---|---|
bootstrap_servers | (required) | Kafka brokers |
topic | (required) | Topic to consume |
group_id | (required) | Consumer group |
auto_offset_reset | earliest | earliest / latest |
Sink
| Field | Default | Description |
|---|---|---|
bootstrap_servers | (required) | Kafka brokers |
topic | (required) | Topic to produce |
Common Gotchas
| Issue | Fix |
|---|---|
| Connection refused on macOS | Use 127.0.0.1, not localhost |
| Stale offsets | Use a fresh group_id (timestamp-based) |
| Slow startup | Add broker.address.family=v4 |
Examples
cargo run -p fungi-examples --example kafka_test
cargo run -p fungi-examples --example kafka_sql
cargo run -p fungi-examples --example kafka_pipeline
PostgreSQL Connector
Read from and write to PostgreSQL tables.
Quick Start
name: pg-to-pg
source:
connector: postgres
url: postgres://user:pass@localhost:5432/mydb
table: events
filter: "created_at > now() - interval '1 hour'"
batch_size: 500
sink:
connector: postgres
url: postgres://user:pass@localhost:5432/mydb
table: processed_events
write_mode: upsert
Config Reference
Source
| Field | Default | Description |
|---|---|---|
url | (required) | Postgres connection URL |
table | (required) | Table name |
schema | public | Schema name |
columns | * | Columns to select |
filter | (none) | WHERE clause |
batch_size | 1000 | Rows per batch |
Sink
| Field | Default | Description |
|---|---|---|
url | (required) | Postgres connection URL |
table | (required) | Table name |
schema | public | Schema name |
write_mode | insert | insert or upsert |
Type Mapping
| Postgres | Arrow |
|---|---|
int2, int4 | Int32 |
int8 | Int64 |
float4, float8 | Float64 |
text, varchar | Utf8 |
bool | Boolean |
Feature Flag
cargo build --features postgres
cargo run -p fungi-examples --example postgres_pipeline --features postgres
Pipeline Examples
Kafka β SQL β Postgres
name: kafka-to-pg
source:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: events
group_id: pg-sink
transforms:
- transform_type: filter
filter: "amount > 100"
sink:
connector: postgres
url: postgres://user:pass@localhost:5432/mydb
table: high_value_events
Postgres β Transform β Kafka
name: pg-to-kafka
source:
connector: postgres
url: postgres://user:pass@localhost:5432/mydb
table: orders
filter: "status = 'pending'"
batch_size: 500
transforms:
- transform_type: map
map: "SELECT id, customer_id, total * 1.08 AS total_with_tax FROM orders"
sink:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: processed-orders
Python UDF
Run custom Python code as a pipeline operator.
How It Works
Fungi Operator ββ[Arrow IPC stdin]βββ Python subprocess
β (your script)
Fungi Operator ββ[Arrow IPC stdout]βββ
- Subprocess stays alive between batches (stateful UDAF works)
- Auto-restart on crash (configurable)
- Zero-copy Arrow
RecordBatchexchange
Quick Start
Script (transform.py)
import pyarrow as pa
import sys
reader = pa.ipc.open_stream(sys.stdin.buffer)
writer = None
for batch in reader:
result = batch.filter(pa.compute.greater(batch['amount'], 100))
if writer is None:
writer = pa.ipc.new_stream(sys.stdout.buffer, result.schema)
writer.write_batch(result)
Pipeline config
transform:
type: python
source: transform.py
env:
THRESHOLD: "100"
Multi-File Projects (Zip)
Package as my_transform.zip:
my_transform/
βββ __init__.py
βββ main.py # entry point
βββ utils.py
zip -r my_transform.zip my_transform/
transform:
type: python
source: my_transform.zip
entry: main.py
Fungi extracts to a temp dir, sets PYTHONPATH, runs entry.
UDAF (Stateful)
The subprocess stays alive β keep state in memory:
import pyarrow as pa
import sys
class RunningAverage:
def __init__(self): self.total, self.count = 0, 0
def accumulate(self, batch):
self.total += sum(batch['value'].to_pylist())
self.count += len(batch)
def result(self): return self.total / max(self.count, 1)
udaf = RunningAverage()
reader = pa.ipc.open_stream(sys.stdin.buffer)
writer = None
for batch in reader:
udaf.accumulate(batch)
out = pa.RecordBatch.from_pydict({'avg': [udaf.result()]})
if writer is None:
writer = pa.ipc.new_stream(sys.stdout.buffer, out.schema)
writer.write_batch(out)
Config Reference
transform:
type: python
source: transform.py | my_transform.zip
entry: main.py # required for zip
python: python3 # python executable
env:
KEY: "value"
Error Handling
| Scenario | Behavior |
|---|---|
| Script not found | Fail fast |
| Script crash | Auto-restart (max 3) |
| Invalid output | Log + skip batch |
| Python missing | Fail fast |
Prerequisites
pip install pyarrow # Python 3.8+
Rust API
#![allow(unused)] fn main() { use fungi_execution::python_udf::{PythonUdfConfig, PythonUdfStreamingOperator}; let config = PythonUdfConfig::from_file("transform.py") .with_env("THRESHOLD", "100"); // or zip: let config = PythonUdfConfig::from_zip("my_transform.zip", "main.py"); let mut op = PythonUdfStreamingOperator::new("my-udf", config); op.open().await?; let out = op.process_element(batch, watermark).await?; op.close().await?; }
Related
- Guides / Pipelines β embed UDF in a pipeline
- Reference / SDK β full Rust API
CLI Reference
Commands
| Command | Purpose |
|---|---|
fungi server | Start server |
fungi sql | Execute SQL |
fungi job | Manage jobs |
fungi pipeline | Manage pipelines |
fungi mcp | MCP server / tools |
fungi --help | Show help |
fungi --version | Show version |
Global Options
| Option | Default | Description |
|---|---|---|
--host | localhost | Server host |
--port | 50051 | Server port |
--local | false | Run embedded, no server |
server
fungi server # default 0.0.0.0:50051
fungi server --port 8080
fungi server --host 0.0.0.0 --port 50051
sql
fungi sql --query "SELECT 1"
fungi sql --query "SELECT 1" --local
fungi sql --file query.sql
fungi sql --query "SELECT 1" --host myserver --port 50051
job
fungi job list
fungi job status --id job-123
fungi job cancel --id job-123
pipeline
fungi pipeline sample --output my-pipeline.yaml
fungi pipeline validate --config my-pipeline.yaml
fungi pipeline run --config my-pipeline.yaml
fungi pipeline list
See Guides / Pipelines for config reference.
mcp
fungi mcp serve # start MCP server
fungi mcp tools # list tools
fungi mcp call <tool> --args '{...}' # invoke a tool
See Operations / MCP.
Examples
$ fungi sql --query "SELECT 1 as id, 'Alice' as name"
id | name
---+------
1 | Alice
$ fungi sql --query "SELECT user_id, SUM(amount) AS total FROM events GROUP BY user_id"
user_id | total
--------+------
user-1 | 300
user-2 | 300
Dashboard
Leptos WASM web dashboard for monitoring and managing pipelines.
Setup
cargo install trunk
rustup target add wasm32-unknown-unknown
Run
# Terminal 1: API server
fungi server
# Terminal 2: dashboard
cd crates/fungi-dashboard
trunk serve --port 8082
Open http://localhost:8082.
Login
If FUNGI_AUTH_ENABLED=true, log in with the bootstrap admin credentials. The JWT is stored in localStorage and attached to every API call.
Pages
| Page | Shows |
|---|---|
| Dashboard | Status, version, uptime, JM/TM counts, recent jobs |
| Pipelines | All pipelines with status, throughput, errors |
| SQL IDE | Interactive query editor |
| Settings | Server info, API keys, tenants |
API Endpoints (consumed)
| Endpoint | Purpose |
|---|---|
GET /health | Status, version, uptime, cluster info |
GET /api/jobs | List jobs |
POST /api/sql | Run SQL |
POST /api/auth/login | Login |
Trunk Proxy
crates/fungi-dashboard/Trunk.toml forwards /api/* and /health to the backend (default localhost:8080).
[[proxy]]
backend = "http://localhost:8080/api"
[[proxy]]
backend = "http://localhost:8080/health"
For remote clusters:
[[proxy]]
backend = "http://101.200.74.232:30051/api"
Production Build
cd crates/fungi-dashboard
trunk build --release
# Static files in dist/ β serve via nginx, Axum, or any static server
Tech Stack
| Component | Purpose |
|---|---|
| Leptos 0.6 (CSR) | Reactive Rust UI |
| Trunk | WASM bundler / dev server |
| Tailwind CSS | Styling |
| reqwasm | HTTP client |
Customization
- API base β edit
Trunk.toml[[proxy]]blocks - Theme β edit
index.htmlTailwind config / classes - Pages β
src/pages/, components insrc/components/
Troubleshooting
| Symptom | Fix |
|---|---|
| Cannot reach server | curl http://localhost:8080/health then check Trunk.toml |
| 401 on every request | Token expired β log out and back in |
| Dashboard empty | Server has no jobs/pipelines yet β run fungi pipeline run |
MCP Integration
Model Context Protocol server for AI agents.
What MCP Enables
AI agents (Claude, GPT, custom) can:
- Create pipelines from natural language
- Query pipeline state and metrics
- Diagnose issues
- Suggest optimizations
Start the Server
fungi mcp serve
CLI Tools
fungi mcp tools # list available tools
fungi mcp call <tool> --args '{...}' # invoke a tool
Available Tools
| Tool | Purpose |
|---|---|
create-pipeline | Build a pipeline from a description |
query-state | Get pipeline metrics / state |
optimize | Suggest pipeline improvements |
debug | Diagnose pipeline issues |
deploy | Deploy to a cluster |
Configuration
# fungi.toml
[mcp]
enabled = true
host = "0.0.0.0"
port = 3000
[mcp.skills]
enabled = ["create-pipeline", "query-state", "optimize", "debug", "deploy"]
Connect Claude
fungi mcp install --agent claude
This registers Fungi as an MCP server in Claude's config.
Custom Agent (Python)
import fungi_mcp
client = fungi_mcp.Client("http://localhost:3000")
pipeline = client.create_pipeline(
source="kafka://events",
filter="amount > 100",
aggregate="GROUP BY user_id, SUM(amount)",
sink="kafka://processed-events",
)
state = client.query_state(pipeline.id)
print(state.throughput)
Example Conversation
User: Filter purchases over $100 from Kafka and aggregate by user.
Agent β create-pipeline β returns pipeline-abc123
β query-state β throughput, errors, lag
Related
- Operations / CLI β
fungi mcpcommands ~/.pi/agent/skills/fungi/SKILL.mdβ Pi agent skill bundled with this repo
Rust SDK
Rust APIs for embedding Fungi or building operators.
DataStream (Flink-compatible)
use fungi_flink::{DataStream, StreamExecutionEnvironment}; use fungi_streaming::TumblingWindowAssigner; #[tokio::main] async fn main() { let env = StreamExecutionEnvironment::new(); let stream: DataStream<Event> = env.from_source("events"); stream .filter(|e| e.amount > 100) .map(|e| (e.user_id, e.amount)) .key_by(|(uid, _)| uid.clone()) .window(Box::new(TumblingWindowAssigner::new(5000))) .aggregate(SumAggregator) .sink("output"); env.execute("my-pipeline").await.unwrap(); }
SQL
#![allow(unused)] fn main() { use fungi_sql::FungiSqlContext; let ctx = FungiSqlContext::new(); ctx.register_table("events", table).await?; ctx.sql("SELECT user_id, SUM(amount) FROM events GROUP BY user_id").await?; }
State
#![allow(unused)] fn main() { use fungi_state::{InMemoryStateBackend, StateBackend}; let backend = InMemoryStateBackend::new(); let mut state = backend .create_keyed_state::<String, i64>("op-1", "counts").await?; state.put("key".into(), 42).await?; let v = state.get(&"key".into()).await?; // Some(42) }
RocksDB backend:
#![allow(unused)] fn main() { let backend = RocksDBStateBackend::new("/var/lib/fungi/state")?; }
Execution Graph
#![allow(unused)] fn main() { use fungi_execution::{ExecutionGraph, OperatorId}; let mut graph = ExecutionGraph::new(); let src = OperatorId("source".into()); let snk = OperatorId("sink".into()); graph.add_operator(src.clone()); graph.add_operator(snk.clone()); graph.add_edge(src, snk); let order = graph.topological_order(); }
Checkpointing
#![allow(unused)] fn main() { use fungi_checkpoint::{CheckpointCoordinator, InMemoryCheckpointStore}; use std::sync::Arc; let store = Arc::new(InMemoryCheckpointStore::new()); let coord = CheckpointCoordinator::new(store, 60_000, 1); let id = coord.trigger_checkpoint().await?; coord.report_operator_checkpoint(id, "op-1".into(), vec![1, 2, 3]).await?; coord.complete_checkpoint(id).await?; }
Watermarks & Windows
#![allow(unused)] fn main() { use fungi_streaming::{BoundedOutOfOrderness, TumblingWindowAssigner, WatermarkStrategy, WindowAssigner}; let wm = BoundedOutOfOrderness::new(5000); let watermark = wm.generate_watermark(100_000); // 95000 let win = TumblingWindowAssigner::new(1000); let windows = win.assign_windows(2500); // [2000..3000] }
Python UDF
#![allow(unused)] fn main() { use fungi_execution::python_udf::{PythonUdfConfig, PythonUdfStreamingOperator}; let cfg = PythonUdfConfig::from_zip("transform.zip", "main.py") .with_env("THRESHOLD", "100"); let mut op = PythonUdfStreamingOperator::new("py-udf", cfg); op.open().await?; let out = op.process_element(batch, watermark).await?; op.close().await?; }
See Reference / Examples for runnable demos.
Examples
Runnable examples in crates/fungi-examples/.
Run
cargo run -p fungi-examples --example <name>
List
| Example | Demonstrates |
|---|---|
state_backend | Keyed state, snapshot/restore |
streaming_primitives | Watermarks, windows, timers |
execution_graph | DAG construction + topological sort |
checkpointing | Coordinator, barrier, store |
kafka_test | Kafka connectivity |
kafka_streaming | Streaming from Kafka |
kafka_pipeline | Pipeline with transforms |
kafka_sql | Kafka β SQL β Kafka |
kafka_sql (Full Pipeline)
# Setup Kafka
docker run -d --name kafka -p 9092:9092 apache/kafka:latest
docker exec kafka /opt/kafka/bin/kafka-topics.sh \
--create --topic events --bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1
# Run
cargo run -p fungi-examples --example kafka_sql
Output:
SQL Query Results:
Total amount by user:
user-1: 300
user-2: 300
Pipeline: Kafka β SQL β Kafka β
See Guides / Kafka for full Kafka setup.
Code Locations
| Example | File |
|---|---|
state_backend | crates/fungi-examples/examples/state_backend.rs |
streaming_primitives | crates/fungi-examples/examples/streaming_primitives.rs |
execution_graph | crates/fungi-examples/examples/execution_graph.rs |
checkpointing | crates/fungi-examples/examples/checkpointing.rs |
kafka_* | crates/fungi-examples/examples/kafka_*.rs |
Changelog
[Unreleased]
Added
- MCP server integration for AI agents
- More SQL functions
- Performance optimizations
[0.1.0] - 2026-05-27
Added
- Core streaming engine
- SQL support via DataFusion
- Kafka source/sink
- State management (Memory, RocksDB)
- Checkpointing
- CLI tool
- gRPC server
- Flink DataStream API compatibility
- Examples and documentation
Architecture
fungi-common: Shared types, errors, configurationfungi-state: State backends (Memory, RocksDB)fungi-streaming: Watermarks, windows, timersfungi-execution: Operator graph, task executorfungi-checkpoint: Checkpoint coordinatorfungi-sql: DataFusion integrationfungi-flink: Flink DataStream API compatibilityfungi-server: gRPC server with REST APIfungi-runtime: Embedded and server modesfungi-cli: Command-line interfacefungi-connector: Kafka source/sink
Features
- SQL query execution
- Kafka integration
- State management
- Checkpointing
- CLI tool
- gRPC API
- Flink-compatible DataStream API
Performance
- Zero GC pauses
- Predictable latency
- Low memory footprint
- SIMD-optimized (via Arrow)
Documentation
- Quickstart guide
- Architecture documentation
- CLI reference
- SDK reference
- Kafka integration guide
- MCP integration guide
- Examples
[0.0.1] - 2026-05-20
Added
- Initial project setup
- Basic crate structure
Contributing
Getting Started
- Fork the repository
- Clone your fork
- Create a feature branch
- Make your changes
- Run tests
- Submit a pull request
Development Setup
git clone https://github.com/yourusername/fungi.git
cd fungi
cargo build
cargo test --workspace
Code Style
# Format code
cargo fmt
# Check formatting
cargo fmt --check
# Run lints
cargo clippy --workspace -- -D warnings
Testing
# Run all tests
cargo test --workspace
# Run specific crate tests
cargo test -p fungi-state
# Run with output
cargo test --workspace -- --nocapture
Documentation
# Generate docs
cargo doc --workspace --open
# Build mdbook
mdbook build
# Serve mdbook locally
mdbook serve
Commit Messages
Format: <type>: <description>
Types:
feat: New featurefix: Bug fixdocs: Documentationtest: Testsrefactor: Code refactoringchore: Maintenance tasks
Example: feat: add Kafka source implementation
Pull Requests
- Keep PRs focused on a single feature/fix
- Add tests for new functionality
- Update documentation
- Ensure all tests pass
- Follow existing code style
Architecture
See Architecture for system design.
License
By contributing, you agree that your contributions will be licensed under the Apache License 2.0.