1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Rust + kafka + async

Posted at

kafka rust clientの実装メモ

docker-composeでzookeeper + kafka

version: '3'

services:
  kafka:
    image: confluentinc/cp-kafka:5.3.2
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 3
      CONFLUENT_SUPPORT_METRICS_ENABLE: 0
    ports: ["9092:9092"]
    links: [zookeeper]
    volumes:
      - ./docker-compose-data/kafka:/var/lib/kafka/data
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.2
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports: ["2181:2181"]
    volumes:
      - ./docker-compose-data/zookeeper/data:/var/lib/zookeeper/data
      - ./docker-compose-data/zookeeper/log:/var/lib/zookeeper/log

サンプルコード

Cargo.toml

[dependencies]
# version 0.23 が正式にリリースされるの待ち
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git", branch = "master", features = ["cmake-build"] }
tokio = { version = "0.2.2", features = ["full"] }
futures = "0.3"

main.rs

use futures::*;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::message::{OwnedHeaders, Headers};
use rdkafka::{ClientContext, TopicPartitionList, Message};
use rdkafka::consumer::{ConsumerContext, Rebalance, StreamConsumer, Consumer, CommitMode};
use rdkafka::error::KafkaResult;


// A context can be used to change the behavior of producers and consumers by adding callbacks
// that will be executed by librdkafka.
// This particular context sets up custom callbacks to log rebalancing events.
struct CustomContext;

impl ClientContext for CustomContext {}

impl ConsumerContext for CustomContext {
    fn pre_rebalance(&self, rebalance: &Rebalance) {
        println!("Pre rebalance {:?}", rebalance);
    }

    fn post_rebalance(&self, rebalance: &Rebalance) {
        println!("Post rebalance {:?}", rebalance);
    }

    fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
        println!("Committing offsets: {:?}", result);
    }
}

// A type alias with your custom consumer can be created for convenience.
type LoggingConsumer = StreamConsumer<CustomContext>;

async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
    let context = CustomContext;

    let consumer: LoggingConsumer = ClientConfig::new()
        .set("group.id", group_id)
        .set("bootstrap.servers", brokers)
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "true")
        //.set("statistics.interval.ms", "30000")
        //.set("auto.offset.reset", "smallest")
        .set_log_level(RDKafkaLogLevel::Debug)
        .create_with_context(context)
        .expect("Consumer creation failed");

    consumer
        .subscribe(&topics.to_vec())
        .expect("Can't subscribe to specified topics");

    // consumer.start() returns a stream. The stream can be used ot chain together expensive steps,
    // such as complex computations on a thread pool or asynchronous IO.
    let mut message_stream = consumer.start();

    while let Some(message) = message_stream.next().await {
        match message {
            Err(e) => println!("Kafka error: {}", e),
            Ok(m) => {
                let payload = match m.payload_view::<str>() {
                    None => "",
                    Some(Ok(s)) => s,
                    Some(Err(e)) => {
                        println!("Error while deserializing message payload: {:?}", e);
                        ""
                    }
                };
                println!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
                      m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
                if let Some(headers) = m.headers() {
                    for i in 0..headers.count() {
                        let header = headers.get(i).unwrap();
                        println!("  Header {:#?}: {:?}", header.0, header.1);
                    }
                }
                consumer.commit_message(&m, CommitMode::Async).unwrap();
            }
        };
    }
}


async fn produce(brokers: &str, topic_name: &str) {
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("message.timeout.ms", "5000")
        .create()
        .expect("Producer creation error");

    // This loop is non blocking: all messages will be sent one after the other, without waiting
    // for the results.
    let futures = (0..5)
        .map(|i| {
            // The send operation on the topic returns a future, that will be completed once the
            // result or failure from Kafka will be received.
            producer
                .send(
                    FutureRecord::to(topic_name)
                        .payload(&format!("Message {}", i))
                        .key(&format!("Key {}", i))
                        .headers(OwnedHeaders::new().add("header_key", "header_value")),
                    0,
                )
                .map(move |delivery_status| {
                    // This will be executed onw the result is received
                    println!("Delivery status for message {} received", i);
                    delivery_status
                })
        })
        .collect::<Vec<_>>();

    // This loop will wait until all delivery statuses have been received received.
    for future in futures {
        println!("Future completed. Result: {:?}", future.await);
    }
}

# [tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let rt = tokio::runtime::Runtime::new().unwrap();
    // consume
    let task1 = async {
        let brokers = "localhost:9092";
        let topics = vec!["test"];
        let group_id = "001";
        consume_and_print(brokers, group_id, &topics).await;
    };
    rt.spawn(task1);

    // produce
    let task2 = async {
        let brokers = "localhost:9092";
        let topic = "test";
        produce(brokers, topic).await;
    };
    rt.spawn(task2);

    tokio::signal::ctrl_c().await?;
    println!("ctrl-c received!");
    Ok(())
}
1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?