Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Kafka Integration

Native Kafka source and sink.

Setup Kafka (Local)

docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:latest

Create topics:

docker exec kafka /opt/kafka/bin/kafka-topics.sh \
  --create --topic events --bootstrap-server localhost:9092 \
  --partitions 1 --replication-factor 1

Pipeline (YAML)

name: kafka-pipeline
source:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: events
  group_id: my-group
sink:
  connector: kafka
  bootstrap_servers: 127.0.0.1:9092
  topic: processed-events

Use 127.0.0.1 (not localhost) on macOS to force IPv4.

Rust API

Source

#![allow(unused)]
fn main() {
use fungi_connector::kafka::{KafkaSource, KafkaSourceConfig};

let config = KafkaSourceConfig {
    bootstrap_servers: "127.0.0.1:9092".into(),
    topic: "events".into(),
    group_id: "my-group".into(),
    auto_offset_reset: "earliest".into(),
};

let mut source = KafkaSource::new(config);
source.open(None).await?;
let event = source.poll().await?;
}

Sink

#![allow(unused)]
fn main() {
use fungi_connector::kafka::{KafkaSink, KafkaSinkConfig};

let config = KafkaSinkConfig {
    bootstrap_servers: "127.0.0.1:9092".into(),
    topic: "processed-events".into(),
};

let mut sink = KafkaSink::new(config);
sink.open().await?;
sink.write_batch(record_batch).await?;
}

Config Reference

Source

FieldDefaultDescription
bootstrap_servers(required)Kafka brokers
topic(required)Topic to consume
group_id(required)Consumer group
auto_offset_resetearliestearliest / latest

Sink

FieldDefaultDescription
bootstrap_servers(required)Kafka brokers
topic(required)Topic to produce

Common Gotchas

IssueFix
Connection refused on macOSUse 127.0.0.1, not localhost
Stale offsetsUse a fresh group_id (timestamp-based)
Slow startupAdd broker.address.family=v4

Examples

cargo run -p fungi-examples --example kafka_test
cargo run -p fungi-examples --example kafka_sql
cargo run -p fungi-examples --example kafka_pipeline