Kafka Integration
Native Kafka source and sink.
Setup Kafka (Local)
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:latest
Create topics:
docker exec kafka /opt/kafka/bin/kafka-topics.sh \
--create --topic events --bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1
Pipeline (YAML)
name: kafka-pipeline
source:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: events
group_id: my-group
sink:
connector: kafka
bootstrap_servers: 127.0.0.1:9092
topic: processed-events
Use
127.0.0.1(notlocalhost) on macOS to force IPv4.
Rust API
Source
#![allow(unused)] fn main() { use fungi_connector::kafka::{KafkaSource, KafkaSourceConfig}; let config = KafkaSourceConfig { bootstrap_servers: "127.0.0.1:9092".into(), topic: "events".into(), group_id: "my-group".into(), auto_offset_reset: "earliest".into(), }; let mut source = KafkaSource::new(config); source.open(None).await?; let event = source.poll().await?; }
Sink
#![allow(unused)] fn main() { use fungi_connector::kafka::{KafkaSink, KafkaSinkConfig}; let config = KafkaSinkConfig { bootstrap_servers: "127.0.0.1:9092".into(), topic: "processed-events".into(), }; let mut sink = KafkaSink::new(config); sink.open().await?; sink.write_batch(record_batch).await?; }
Config Reference
Source
| Field | Default | Description |
|---|---|---|
bootstrap_servers | (required) | Kafka brokers |
topic | (required) | Topic to consume |
group_id | (required) | Consumer group |
auto_offset_reset | earliest | earliest / latest |
Sink
| Field | Default | Description |
|---|---|---|
bootstrap_servers | (required) | Kafka brokers |
topic | (required) | Topic to produce |
Common Gotchas
| Issue | Fix |
|---|---|
| Connection refused on macOS | Use 127.0.0.1, not localhost |
| Stale offsets | Use a fresh group_id (timestamp-based) |
| Slow startup | Add broker.address.family=v4 |
Examples
cargo run -p fungi-examples --example kafka_test
cargo run -p fungi-examples --example kafka_sql
cargo run -p fungi-examples --example kafka_pipeline