Rust SDK
Rust APIs for embedding Fungi or building operators.
DataStream (Flink-compatible)
use fungi_flink::{DataStream, StreamExecutionEnvironment}; use fungi_streaming::TumblingWindowAssigner; #[tokio::main] async fn main() { let env = StreamExecutionEnvironment::new(); let stream: DataStream<Event> = env.from_source("events"); stream .filter(|e| e.amount > 100) .map(|e| (e.user_id, e.amount)) .key_by(|(uid, _)| uid.clone()) .window(Box::new(TumblingWindowAssigner::new(5000))) .aggregate(SumAggregator) .sink("output"); env.execute("my-pipeline").await.unwrap(); }
SQL
#![allow(unused)] fn main() { use fungi_sql::FungiSqlContext; let ctx = FungiSqlContext::new(); ctx.register_table("events", table).await?; ctx.sql("SELECT user_id, SUM(amount) FROM events GROUP BY user_id").await?; }
State
#![allow(unused)] fn main() { use fungi_state::{InMemoryStateBackend, StateBackend}; let backend = InMemoryStateBackend::new(); let mut state = backend .create_keyed_state::<String, i64>("op-1", "counts").await?; state.put("key".into(), 42).await?; let v = state.get(&"key".into()).await?; // Some(42) }
RocksDB backend:
#![allow(unused)] fn main() { let backend = RocksDBStateBackend::new("/var/lib/fungi/state")?; }
Execution Graph
#![allow(unused)] fn main() { use fungi_execution::{ExecutionGraph, OperatorId}; let mut graph = ExecutionGraph::new(); let src = OperatorId("source".into()); let snk = OperatorId("sink".into()); graph.add_operator(src.clone()); graph.add_operator(snk.clone()); graph.add_edge(src, snk); let order = graph.topological_order(); }
Checkpointing
#![allow(unused)] fn main() { use fungi_checkpoint::{CheckpointCoordinator, InMemoryCheckpointStore}; use std::sync::Arc; let store = Arc::new(InMemoryCheckpointStore::new()); let coord = CheckpointCoordinator::new(store, 60_000, 1); let id = coord.trigger_checkpoint().await?; coord.report_operator_checkpoint(id, "op-1".into(), vec![1, 2, 3]).await?; coord.complete_checkpoint(id).await?; }
Watermarks & Windows
#![allow(unused)] fn main() { use fungi_streaming::{BoundedOutOfOrderness, TumblingWindowAssigner, WatermarkStrategy, WindowAssigner}; let wm = BoundedOutOfOrderness::new(5000); let watermark = wm.generate_watermark(100_000); // 95000 let win = TumblingWindowAssigner::new(1000); let windows = win.assign_windows(2500); // [2000..3000] }
Python UDF
#![allow(unused)] fn main() { use fungi_execution::python_udf::{PythonUdfConfig, PythonUdfStreamingOperator}; let cfg = PythonUdfConfig::from_zip("transform.zip", "main.py") .with_env("THRESHOLD", "100"); let mut op = PythonUdfStreamingOperator::new("py-udf", cfg); op.open().await?; let out = op.process_element(batch, watermark).await?; op.close().await?; }
See Reference / Examples for runnable demos.