Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Pipelines

Build data pipelines from a YAML config.

CLI

fungi pipeline sample --output my-pipeline.yaml   # generate template
fungi pipeline validate --config my-pipeline.yaml # check syntax
fungi pipeline run --config my-pipeline.yaml      # execute
fungi pipeline list                                # show running

YAML Reference

name: my-pipeline

source:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: input-topic
  group_id: my-group

transforms:
  - transform_type: filter
    filter: "amount > 100"
  - transform_type: map
    map: "SELECT user_id, amount * 1.1 AS adjusted FROM events"
  - transform_type: sql
    sql: "SELECT user_id, SUM(amount) AS total FROM events GROUP BY user_id"

sink:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: output-topic

JSON format is also accepted.

Transform Types

TypePurposeExample
filterDrop rows by predicate"amount > 100"
mapPer-row SQL expression"SELECT id, amount * 1.1 FROM events"
sqlArbitrary SQL (aggregations)"SELECT user_id, SUM(amount) FROM events GROUP BY user_id"
pythonCustom Python scriptsee Python UDF

Example: Order Processing

name: order-processing
source:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: orders
  group_id: order-processor
transforms:
  - transform_type: filter
    filter: "status = 'completed'"
  - transform_type: map
    map: "SELECT order_id, customer_id, total * 1.08 AS total_with_tax FROM orders"
sink:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: processed-orders

Best Practices

  • Use unique group_id per pipeline to avoid offset conflicts
  • Always validate before run
  • Use 127.0.0.1 (not localhost) for Kafka on macOS to force IPv4
  • Monitor via the dashboard

Troubleshooting

SymptomCheck
Won't startfungi pipeline validate --config …
No messagesTopic exists? Consumer group not stuck?
Transform errorSQL syntax? Column names match?