Pipelines
Build data pipelines from a YAML config.
CLI
fungi pipeline sample --output my-pipeline.yaml # generate template
fungi pipeline validate --config my-pipeline.yaml # check syntax
fungi pipeline run --config my-pipeline.yaml # execute
fungi pipeline list # show running
YAML Reference
name: my-pipeline
source:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: input-topic
group_id: my-group
transforms:
- transform_type: filter
filter: "amount > 100"
- transform_type: map
map: "SELECT user_id, amount * 1.1 AS adjusted FROM events"
- transform_type: sql
sql: "SELECT user_id, SUM(amount) AS total FROM events GROUP BY user_id"
sink:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: output-topic
JSON format is also accepted.
Transform Types
| Type | Purpose | Example |
|---|---|---|
filter | Drop rows by predicate | "amount > 100" |
map | Per-row SQL expression | "SELECT id, amount * 1.1 FROM events" |
sql | Arbitrary SQL (aggregations) | "SELECT user_id, SUM(amount) FROM events GROUP BY user_id" |
python | Custom Python script | see Python UDF |
Example: Order Processing
name: order-processing
source:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: orders
group_id: order-processor
transforms:
- transform_type: filter
filter: "status = 'completed'"
- transform_type: map
map: "SELECT order_id, customer_id, total * 1.08 AS total_with_tax FROM orders"
sink:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: processed-orders
Best Practices
- Use unique
group_idper pipeline to avoid offset conflicts - Always
validatebeforerun - Use
127.0.0.1(notlocalhost) for Kafka on macOS to force IPv4 - Monitor via the dashboard
Troubleshooting
| Symptom | Check |
|---|---|
| Won't start | fungi pipeline validate --config … |
| No messages | Topic exists? Consumer group not stuck? |
| Transform error | SQL syntax? Column names match? |
Related
- Guides / Kafka — Kafka source/sink reference
- Guides / Python UDF — custom Python transforms
- Operations / CLI — full CLI reference