Python UDF
Run custom Python code as a pipeline operator.
How It Works
Fungi Operator ──[Arrow IPC stdin]──→ Python subprocess
│ (your script)
Fungi Operator ←─[Arrow IPC stdout]──┘
- Subprocess stays alive between batches (stateful UDAF works)
- Auto-restart on crash (configurable)
- Zero-copy Arrow
RecordBatchexchange
Quick Start
Script (transform.py)
import pyarrow as pa
import sys
reader = pa.ipc.open_stream(sys.stdin.buffer)
writer = None
for batch in reader:
result = batch.filter(pa.compute.greater(batch['amount'], 100))
if writer is None:
writer = pa.ipc.new_stream(sys.stdout.buffer, result.schema)
writer.write_batch(result)
Pipeline config
transform:
type: python
source: transform.py
env:
THRESHOLD: "100"
Multi-File Projects (Zip)
Package as my_transform.zip:
my_transform/
├── __init__.py
├── main.py # entry point
└── utils.py
zip -r my_transform.zip my_transform/
transform:
type: python
source: my_transform.zip
entry: main.py
Fungi extracts to a temp dir, sets PYTHONPATH, runs entry.
UDAF (Stateful)
The subprocess stays alive — keep state in memory:
import pyarrow as pa
import sys
class RunningAverage:
def __init__(self): self.total, self.count = 0, 0
def accumulate(self, batch):
self.total += sum(batch['value'].to_pylist())
self.count += len(batch)
def result(self): return self.total / max(self.count, 1)
udaf = RunningAverage()
reader = pa.ipc.open_stream(sys.stdin.buffer)
writer = None
for batch in reader:
udaf.accumulate(batch)
out = pa.RecordBatch.from_pydict({'avg': [udaf.result()]})
if writer is None:
writer = pa.ipc.new_stream(sys.stdout.buffer, out.schema)
writer.write_batch(out)
Config Reference
transform:
type: python
source: transform.py | my_transform.zip
entry: main.py # required for zip
python: python3 # python executable
env:
KEY: "value"
Error Handling
| Scenario | Behavior |
|---|---|
| Script not found | Fail fast |
| Script crash | Auto-restart (max 3) |
| Invalid output | Log + skip batch |
| Python missing | Fail fast |
Prerequisites
pip install pyarrow # Python 3.8+
Rust API
#![allow(unused)] fn main() { use fungi_execution::python_udf::{PythonUdfConfig, PythonUdfStreamingOperator}; let config = PythonUdfConfig::from_file("transform.py") .with_env("THRESHOLD", "100"); // or zip: let config = PythonUdfConfig::from_zip("my_transform.zip", "main.py"); let mut op = PythonUdfStreamingOperator::new("my-udf", config); op.open().await?; let out = op.process_element(batch, watermark).await?; op.close().await?; }
Related
- Guides / Pipelines — embed UDF in a pipeline
- Reference / SDK — full Rust API