0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

kafkaのproducer&consumerのサンプル

Posted at

で起動したKafkaに繋ぐproducerとconsumerのAPIのサンプルを作ります。

やることはシンプルで、

producer:

  • messageの通し番号とeventTimeを送る

consumer:

  • consumeしたメッセージをコンソールにprint out
  • produceとconsumeの時間差をprintout
    という流れです。

Kafka brokerとは別にdocker-composeでこれらのAPIを起動します。

まずは

に従ってtopicを前もって作っておいてください。
ネットワークやKafkaのパラメタもこの設定に従います。

コード

ファイル構成はこんな感じで

$ tree
.
├── api
│   ├── consumer
│   │   ├── Dockerfile
│   │   └── consumer.py
│   ├── docker-compose.yaml
│   └── producer
│       ├── Dockerfile
│       └── producer.py
└── kafka
    └── docker-compose.yaml

5 directories, 6 files

kafkaディレクトリにKafka brokerとKafka UIのcomposeファイルを入れ、
apiディレクトリを新しく作っています。

producer

producer.py
import os
import json
import time
from kafka import KafkaProducer

# environmental variable retrieval
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
topic = os.getenv("TOPIC_NAME", "my-topic")

# KafkaProducer initialization
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

print("🚀 Producer is now running...")

message_id = 0
while True:
    event_time = time.time()
    message = {
        "message_id": message_id,
        "event_time": event_time,
        "content": f"Message {message_id}"
    }

    producer.send(topic, value=message)
    print(f"🚀 Sent: {message}")

    message_id += 1
    time.sleep(2)  # send message every 10 sec.
FROM python:3.10
WORKDIR /app
COPY producer.py .
RUN pip install kafka-python
CMD ["python", "producer.py"]

consumer

consumer.py
import os
import json
import time
from kafka import KafkaConsumer
from kafka.errors import KafkaError

# environmental variable retrieval
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
topic = os.getenv("TOPIC_NAME", "my-topic")
group_id = os.getenv("CONSUMER_GROUP_ID", "my-consumer-group")

print(f"🛠️ Connecting KafkaConsumer to topic '{topic}' at '{bootstrap_servers}' (group: '{group_id}')...")

try:
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset='latest',
        group_id=group_id,
        enable_auto_commit=True,
        value_deserializer=lambda v: json.loads(v.decode("utf-8"))
    )
    print("✅ KafkaConsumer connected. Waiting for new messages...\n")
except KafkaError as e:
    print(f"❌ KafkaConsumer error: {type(e).__name__} - {e}")
    exit(1)
except Exception as e:
    print(f"❌ Unexpected error: {type(e).__name__} - {e}")
    exit(1)

# accept messages from the topic
print("📥 Listening for messages...")
for message in consumer:
    try:
        data = message.value
        received_time = time.time()
        latency = received_time - data.get("event_time", received_time)
        print(f"📨 New message: [ID={data['message_id']}] {data['content']} at {data['event_time']:.3f}")
        print(f"⏱️ Latency: {latency:.3f} seconds\n")
    except Exception as e:
        print(f"⚠️ Message processing error: {type(e).__name__} - {e}")

FROM python:3.10
WORKDIR /app
COPY consumer.py .
RUN pip install kafka-python
CMD ["python", "consumer.py"]

conposeファイル

docker-compose.yaml
services:
  kafka-consumer:
    build: ./consumer
    container_name: kafka-consumer
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=broker:29092
      - TOPIC_NAME=my-topic
      - CONSUMER_GROUP_ID=my-consumer-group
    networks:
      - kafka-net
    healthcheck:
      test: ["CMD-SHELL", "pgrep -f consumer.py"]
      interval: 5s
      timeout: 3s
      retries: 5

  kafka-producer:
    build: ./producer
    container_name: kafka-producer
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=broker:29092
      - TOPIC_NAME=my-topic
    networks:
      - kafka-net
    depends_on:
      kafka-consumer:
        condition: service_healthy

networks:
  kafka-net:
    external: true

起動

Kafka broker起動後に

cd api

docker-compose up -d

コンテナの確認

$ docker ps
CONTAINER ID   IMAGE                    COMMAND                  CREATED             STATUS                       PORTS               
                          NAMES
4973284a8502   api-kafka-producer       "python producer.py"     2 minutes ago       Up 2 minutes                                     
                          kafka-producer
891380fe1964   api-kafka-consumer       "python consumer.py"     2 minutes ago       Up 2 minutes (healthy)                           
                          kafka-consumer
837b3277597e   apache/kafka:latest      "/__cacert_entrypoin…"   About an hour ago   Up About an hour (healthy)   0.0.0.0:9092->9092/tcp, [::]:9092->9092/tcp   broker
926856f2eb25   provectuslabs/kafka-ui   "/bin/sh -c 'java --…"   About an hour ago   Up About an hour             0.0.0.0:8080->8080/tcp, [::]:8080->8080/tcp   kafka-ui

producerのログ
(produceを2秒ごとに生成するように変更しています。)

$ docker logs kafka-producer --tail=5
🚀 Sent: {'message_id': 87, 'event_time': 1752996866.1200306, 'content': 'Message 87'}
🚀 Sent: {'message_id': 88, 'event_time': 1752996868.122785, 'content': 'Message 88'}
🚀 Sent: {'message_id': 89, 'event_time': 1752996870.1256244, 'content': 'Message 89'}
🚀 Sent: {'message_id': 90, 'event_time': 1752996872.128084, 'content': 'Message 90'}
🚀 Sent: {'message_id': 91, 'event_time': 1752996874.2297065, 'content': 'Message 91'}

consumerのログ

$ docker logs kafka-consumer --tail=20
⏱️ Latency: 0.007 seconds

📨 New message: [ID=87] Message 87 at 1752996866.120
⏱️ Latency: 0.005 seconds

📨 New message: [ID=88] Message 88 at 1752996868.123
⏱️ Latency: 0.006 seconds

📨 New message: [ID=89] Message 89 at 1752996870.126
⏱️ Latency: 0.004 seconds

📨 New message: [ID=90] Message 90 at 1752996872.128
⏱️ Latency: 0.005 seconds

📨 New message: [ID=91] Message 91 at 1752996874.230
⏱️ Latency: 0.005 seconds

📨 New message: [ID=92] Message 92 at 1752996876.233
⏱️ Latency: 0.005 seconds

(producerのチェック後に92番目のメッセージが送られていたみたいです。)

こんな感じで送受信が確認できました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?