Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Rust SDK

Rust APIs for embedding Fungi or building operators.

use fungi_flink::{DataStream, StreamExecutionEnvironment};
use fungi_streaming::TumblingWindowAssigner;

#[tokio::main]
async fn main() {
    let env = StreamExecutionEnvironment::new();
    let stream: DataStream<Event> = env.from_source("events");

    stream
        .filter(|e| e.amount > 100)
        .map(|e| (e.user_id, e.amount))
        .key_by(|(uid, _)| uid.clone())
        .window(Box::new(TumblingWindowAssigner::new(5000)))
        .aggregate(SumAggregator)
        .sink("output");

    env.execute("my-pipeline").await.unwrap();
}

SQL

#![allow(unused)]
fn main() {
use fungi_sql::FungiSqlContext;

let ctx = FungiSqlContext::new();
ctx.register_table("events", table).await?;
ctx.sql("SELECT user_id, SUM(amount) FROM events GROUP BY user_id").await?;
}

State

#![allow(unused)]
fn main() {
use fungi_state::{InMemoryStateBackend, StateBackend};

let backend = InMemoryStateBackend::new();
let mut state = backend
    .create_keyed_state::<String, i64>("op-1", "counts").await?;

state.put("key".into(), 42).await?;
let v = state.get(&"key".into()).await?;   // Some(42)
}

RocksDB backend:

#![allow(unused)]
fn main() {
let backend = RocksDBStateBackend::new("/var/lib/fungi/state")?;
}

Execution Graph

#![allow(unused)]
fn main() {
use fungi_execution::{ExecutionGraph, OperatorId};

let mut graph = ExecutionGraph::new();
let src = OperatorId("source".into());
let snk = OperatorId("sink".into());
graph.add_operator(src.clone());
graph.add_operator(snk.clone());
graph.add_edge(src, snk);

let order = graph.topological_order();
}

Checkpointing

#![allow(unused)]
fn main() {
use fungi_checkpoint::{CheckpointCoordinator, InMemoryCheckpointStore};
use std::sync::Arc;

let store = Arc::new(InMemoryCheckpointStore::new());
let coord = CheckpointCoordinator::new(store, 60_000, 1);

let id = coord.trigger_checkpoint().await?;
coord.report_operator_checkpoint(id, "op-1".into(), vec![1, 2, 3]).await?;
coord.complete_checkpoint(id).await?;
}

Watermarks & Windows

#![allow(unused)]
fn main() {
use fungi_streaming::{BoundedOutOfOrderness, TumblingWindowAssigner, WatermarkStrategy, WindowAssigner};

let wm = BoundedOutOfOrderness::new(5000);
let watermark = wm.generate_watermark(100_000);            // 95000

let win = TumblingWindowAssigner::new(1000);
let windows = win.assign_windows(2500);                    // [2000..3000]
}

Python UDF

#![allow(unused)]
fn main() {
use fungi_execution::python_udf::{PythonUdfConfig, PythonUdfStreamingOperator};

let cfg = PythonUdfConfig::from_zip("transform.zip", "main.py")
    .with_env("THRESHOLD", "100");

let mut op = PythonUdfStreamingOperator::new("py-udf", cfg);
op.open().await?;
let out = op.process_element(batch, watermark).await?;
op.close().await?;
}

See Reference / Examples for runnable demos.