Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Python UDF

Run custom Python code as a pipeline operator.

How It Works

Fungi Operator ──[Arrow IPC stdin]──→ Python subprocess
                                        │ (your script)
Fungi Operator ←─[Arrow IPC stdout]──┘
  • Subprocess stays alive between batches (stateful UDAF works)
  • Auto-restart on crash (configurable)
  • Zero-copy Arrow RecordBatch exchange

Quick Start

Script (transform.py)

import pyarrow as pa
import sys

reader = pa.ipc.open_stream(sys.stdin.buffer)
writer = None
for batch in reader:
    result = batch.filter(pa.compute.greater(batch['amount'], 100))
    if writer is None:
        writer = pa.ipc.new_stream(sys.stdout.buffer, result.schema)
    writer.write_batch(result)

Pipeline config

transform:
  type: python
  source: transform.py
  env:
    THRESHOLD: "100"

Multi-File Projects (Zip)

Package as my_transform.zip:

my_transform/
├── __init__.py
├── main.py          # entry point
└── utils.py
zip -r my_transform.zip my_transform/
transform:
  type: python
  source: my_transform.zip
  entry: main.py

Fungi extracts to a temp dir, sets PYTHONPATH, runs entry.

UDAF (Stateful)

The subprocess stays alive — keep state in memory:

import pyarrow as pa
import sys

class RunningAverage:
    def __init__(self): self.total, self.count = 0, 0
    def accumulate(self, batch):
        self.total += sum(batch['value'].to_pylist())
        self.count += len(batch)
    def result(self): return self.total / max(self.count, 1)

udaf = RunningAverage()
reader = pa.ipc.open_stream(sys.stdin.buffer)
writer = None
for batch in reader:
    udaf.accumulate(batch)
    out = pa.RecordBatch.from_pydict({'avg': [udaf.result()]})
    if writer is None:
        writer = pa.ipc.new_stream(sys.stdout.buffer, out.schema)
    writer.write_batch(out)

Config Reference

transform:
  type: python
  source: transform.py | my_transform.zip
  entry: main.py            # required for zip
  python: python3           # python executable
  env:
    KEY: "value"

Error Handling

ScenarioBehavior
Script not foundFail fast
Script crashAuto-restart (max 3)
Invalid outputLog + skip batch
Python missingFail fast

Prerequisites

pip install pyarrow      # Python 3.8+

Rust API

#![allow(unused)]
fn main() {
use fungi_execution::python_udf::{PythonUdfConfig, PythonUdfStreamingOperator};

let config = PythonUdfConfig::from_file("transform.py")
    .with_env("THRESHOLD", "100");

// or zip:
let config = PythonUdfConfig::from_zip("my_transform.zip", "main.py");

let mut op = PythonUdfStreamingOperator::new("my-udf", config);
op.open().await?;
let out = op.process_element(batch, watermark).await?;
op.close().await?;
}