Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

PostgreSQL Connector

Read from and write to PostgreSQL tables.

Quick Start

name: pg-to-pg
source:
  connector: postgres
  url: postgres://user:pass@localhost:5432/mydb
  table: events
  filter: "created_at > now() - interval '1 hour'"
  batch_size: 500

sink:
  connector: postgres
  url: postgres://user:pass@localhost:5432/mydb
  table: processed_events
  write_mode: upsert

Config Reference

Source

FieldDefaultDescription
url(required)Postgres connection URL
table(required)Table name
schemapublicSchema name
columns*Columns to select
filter(none)WHERE clause
batch_size1000Rows per batch

Sink

FieldDefaultDescription
url(required)Postgres connection URL
table(required)Table name
schemapublicSchema name
write_modeinsertinsert or upsert

Type Mapping

PostgresArrow
int2, int4Int32
int8Int64
float4, float8Float64
text, varcharUtf8
boolBoolean

Feature Flag

cargo build --features postgres
cargo run -p fungi-examples --example postgres_pipeline --features postgres

Pipeline Examples

Kafka → SQL → Postgres

name: kafka-to-pg
source:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: events
  group_id: pg-sink
transforms:
  - transform_type: filter
    filter: "amount > 100"
sink:
  connector: postgres
  url: postgres://user:pass@localhost:5432/mydb
  table: high_value_events

Postgres → Transform → Kafka

name: pg-to-kafka
source:
  connector: postgres
  url: postgres://user:pass@localhost:5432/mydb
  table: orders
  filter: "status = 'pending'"
  batch_size: 500
transforms:
  - transform_type: map
    map: "SELECT id, customer_id, total * 1.08 AS total_with_tax FROM orders"
sink:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: processed-orders