Read from and write to PostgreSQL tables.
name: pg-to-pg
source:
connector: postgres
url: postgres://user:pass@localhost:5432/mydb
table: events
filter: "created_at > now() - interval '1 hour'"
batch_size: 500
sink:
connector: postgres
url: postgres://user:pass@localhost:5432/mydb
table: processed_events
write_mode: upsert
| Field | Default | Description |
url | (required) | Postgres connection URL |
table | (required) | Table name |
schema | public | Schema name |
columns | * | Columns to select |
filter | (none) | WHERE clause |
batch_size | 1000 | Rows per batch |
| Field | Default | Description |
url | (required) | Postgres connection URL |
table | (required) | Table name |
schema | public | Schema name |
write_mode | insert | insert or upsert |
| Postgres | Arrow |
int2, int4 | Int32 |
int8 | Int64 |
float4, float8 | Float64 |
text, varchar | Utf8 |
bool | Boolean |
cargo build --features postgres
cargo run -p fungi-examples --example postgres_pipeline --features postgres
name: kafka-to-pg
source:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: events
group_id: pg-sink
transforms:
- transform_type: filter
filter: "amount > 100"
sink:
connector: postgres
url: postgres://user:pass@localhost:5432/mydb
table: high_value_events
name: pg-to-kafka
source:
connector: postgres
url: postgres://user:pass@localhost:5432/mydb
table: orders
filter: "status = 'pending'"
batch_size: 500
transforms:
- transform_type: map
map: "SELECT id, customer_id, total * 1.08 AS total_with_tax FROM orders"
sink:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: processed-orders