kafka rust clientの実装メモ
docker-composeでzookeeper + kafka
version: '3'
services:
kafka:
image: confluentinc/cp-kafka:5.3.2
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 3
CONFLUENT_SUPPORT_METRICS_ENABLE: 0
ports: ["9092:9092"]
links: [zookeeper]
volumes:
- ./docker-compose-data/kafka:/var/lib/kafka/data
zookeeper:
image: confluentinc/cp-zookeeper:5.3.2
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports: ["2181:2181"]
volumes:
- ./docker-compose-data/zookeeper/data:/var/lib/zookeeper/data
- ./docker-compose-data/zookeeper/log:/var/lib/zookeeper/log
サンプルコード
Cargo.toml
[dependencies]
# version 0.23 が正式にリリースされるの待ち
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git", branch = "master", features = ["cmake-build"] }
tokio = { version = "0.2.2", features = ["full"] }
futures = "0.3"
main.rs
use futures::*;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::message::{OwnedHeaders, Headers};
use rdkafka::{ClientContext, TopicPartitionList, Message};
use rdkafka::consumer::{ConsumerContext, Rebalance, StreamConsumer, Consumer, CommitMode};
use rdkafka::error::KafkaResult;
// A context can be used to change the behavior of producers and consumers by adding callbacks
// that will be executed by librdkafka.
// This particular context sets up custom callbacks to log rebalancing events.
struct CustomContext;
impl ClientContext for CustomContext {}
impl ConsumerContext for CustomContext {
fn pre_rebalance(&self, rebalance: &Rebalance) {
println!("Pre rebalance {:?}", rebalance);
}
fn post_rebalance(&self, rebalance: &Rebalance) {
println!("Post rebalance {:?}", rebalance);
}
fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
println!("Committing offsets: {:?}", result);
}
}
// A type alias with your custom consumer can be created for convenience.
type LoggingConsumer = StreamConsumer<CustomContext>;
async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
let context = CustomContext;
let consumer: LoggingConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
//.set("statistics.interval.ms", "30000")
//.set("auto.offset.reset", "smallest")
.set_log_level(RDKafkaLogLevel::Debug)
.create_with_context(context)
.expect("Consumer creation failed");
consumer
.subscribe(&topics.to_vec())
.expect("Can't subscribe to specified topics");
// consumer.start() returns a stream. The stream can be used ot chain together expensive steps,
// such as complex computations on a thread pool or asynchronous IO.
let mut message_stream = consumer.start();
while let Some(message) = message_stream.next().await {
match message {
Err(e) => println!("Kafka error: {}", e),
Ok(m) => {
let payload = match m.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => {
println!("Error while deserializing message payload: {:?}", e);
""
}
};
println!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
if let Some(headers) = m.headers() {
for i in 0..headers.count() {
let header = headers.get(i).unwrap();
println!(" Header {:#?}: {:?}", header.0, header.1);
}
}
consumer.commit_message(&m, CommitMode::Async).unwrap();
}
};
}
}
async fn produce(brokers: &str, topic_name: &str) {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
// This loop is non blocking: all messages will be sent one after the other, without waiting
// for the results.
let futures = (0..5)
.map(|i| {
// The send operation on the topic returns a future, that will be completed once the
// result or failure from Kafka will be received.
producer
.send(
FutureRecord::to(topic_name)
.payload(&format!("Message {}", i))
.key(&format!("Key {}", i))
.headers(OwnedHeaders::new().add("header_key", "header_value")),
0,
)
.map(move |delivery_status| {
// This will be executed onw the result is received
println!("Delivery status for message {} received", i);
delivery_status
})
})
.collect::<Vec<_>>();
// This loop will wait until all delivery statuses have been received received.
for future in futures {
println!("Future completed. Result: {:?}", future.await);
}
}
# [tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let rt = tokio::runtime::Runtime::new().unwrap();
// consume
let task1 = async {
let brokers = "localhost:9092";
let topics = vec!["test"];
let group_id = "001";
consume_and_print(brokers, group_id, &topics).await;
};
rt.spawn(task1);
// produce
let task2 = async {
let brokers = "localhost:9092";
let topic = "test";
produce(brokers, topic).await;
};
rt.spawn(task2);
tokio::signal::ctrl_c().await?;
println!("ctrl-c received!");
Ok(())
}