で起動したKafkaに繋ぐproducerとconsumerのAPIのサンプルを作ります。
やることはシンプルで、
producer:
- messageの通し番号とeventTimeを送る
consumer:
- consumeしたメッセージをコンソールにprint out
- produceとconsumeの時間差をprintout
という流れです。
Kafka brokerとは別にdocker-composeでこれらのAPIを起動します。
まずは
に従ってtopicを前もって作っておいてください。
ネットワークやKafkaのパラメタもこの設定に従います。
コード
ファイル構成はこんな感じで
$ tree
.
├── api
│ ├── consumer
│ │ ├── Dockerfile
│ │ └── consumer.py
│ ├── docker-compose.yaml
│ └── producer
│ ├── Dockerfile
│ └── producer.py
└── kafka
└── docker-compose.yaml
5 directories, 6 files
kafkaディレクトリにKafka brokerとKafka UIのcomposeファイルを入れ、
apiディレクトリを新しく作っています。
producer
producer.py
import os
import json
import time
from kafka import KafkaProducer
# environmental variable retrieval
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
topic = os.getenv("TOPIC_NAME", "my-topic")
# KafkaProducer initialization
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
print("🚀 Producer is now running...")
message_id = 0
while True:
event_time = time.time()
message = {
"message_id": message_id,
"event_time": event_time,
"content": f"Message {message_id}"
}
producer.send(topic, value=message)
print(f"🚀 Sent: {message}")
message_id += 1
time.sleep(2) # send message every 10 sec.
FROM python:3.10
WORKDIR /app
COPY producer.py .
RUN pip install kafka-python
CMD ["python", "producer.py"]
consumer
consumer.py
import os
import json
import time
from kafka import KafkaConsumer
from kafka.errors import KafkaError
# environmental variable retrieval
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
topic = os.getenv("TOPIC_NAME", "my-topic")
group_id = os.getenv("CONSUMER_GROUP_ID", "my-consumer-group")
print(f"🛠️ Connecting KafkaConsumer to topic '{topic}' at '{bootstrap_servers}' (group: '{group_id}')...")
try:
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='latest',
group_id=group_id,
enable_auto_commit=True,
value_deserializer=lambda v: json.loads(v.decode("utf-8"))
)
print("✅ KafkaConsumer connected. Waiting for new messages...\n")
except KafkaError as e:
print(f"❌ KafkaConsumer error: {type(e).__name__} - {e}")
exit(1)
except Exception as e:
print(f"❌ Unexpected error: {type(e).__name__} - {e}")
exit(1)
# accept messages from the topic
print("📥 Listening for messages...")
for message in consumer:
try:
data = message.value
received_time = time.time()
latency = received_time - data.get("event_time", received_time)
print(f"📨 New message: [ID={data['message_id']}] {data['content']} at {data['event_time']:.3f}")
print(f"⏱️ Latency: {latency:.3f} seconds\n")
except Exception as e:
print(f"⚠️ Message processing error: {type(e).__name__} - {e}")
FROM python:3.10
WORKDIR /app
COPY consumer.py .
RUN pip install kafka-python
CMD ["python", "consumer.py"]
conposeファイル
docker-compose.yaml
services:
kafka-consumer:
build: ./consumer
container_name: kafka-consumer
environment:
- KAFKA_BOOTSTRAP_SERVERS=broker:29092
- TOPIC_NAME=my-topic
- CONSUMER_GROUP_ID=my-consumer-group
networks:
- kafka-net
healthcheck:
test: ["CMD-SHELL", "pgrep -f consumer.py"]
interval: 5s
timeout: 3s
retries: 5
kafka-producer:
build: ./producer
container_name: kafka-producer
environment:
- KAFKA_BOOTSTRAP_SERVERS=broker:29092
- TOPIC_NAME=my-topic
networks:
- kafka-net
depends_on:
kafka-consumer:
condition: service_healthy
networks:
kafka-net:
external: true
起動
Kafka broker起動後に
cd api
docker-compose up -d
コンテナの確認
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
NAMES
4973284a8502 api-kafka-producer "python producer.py" 2 minutes ago Up 2 minutes
kafka-producer
891380fe1964 api-kafka-consumer "python consumer.py" 2 minutes ago Up 2 minutes (healthy)
kafka-consumer
837b3277597e apache/kafka:latest "/__cacert_entrypoin…" About an hour ago Up About an hour (healthy) 0.0.0.0:9092->9092/tcp, [::]:9092->9092/tcp broker
926856f2eb25 provectuslabs/kafka-ui "/bin/sh -c 'java --…" About an hour ago Up About an hour 0.0.0.0:8080->8080/tcp, [::]:8080->8080/tcp kafka-ui
producerのログ
(produceを2秒ごとに生成するように変更しています。)
$ docker logs kafka-producer --tail=5
🚀 Sent: {'message_id': 87, 'event_time': 1752996866.1200306, 'content': 'Message 87'}
🚀 Sent: {'message_id': 88, 'event_time': 1752996868.122785, 'content': 'Message 88'}
🚀 Sent: {'message_id': 89, 'event_time': 1752996870.1256244, 'content': 'Message 89'}
🚀 Sent: {'message_id': 90, 'event_time': 1752996872.128084, 'content': 'Message 90'}
🚀 Sent: {'message_id': 91, 'event_time': 1752996874.2297065, 'content': 'Message 91'}
consumerのログ
$ docker logs kafka-consumer --tail=20
⏱️ Latency: 0.007 seconds
📨 New message: [ID=87] Message 87 at 1752996866.120
⏱️ Latency: 0.005 seconds
📨 New message: [ID=88] Message 88 at 1752996868.123
⏱️ Latency: 0.006 seconds
📨 New message: [ID=89] Message 89 at 1752996870.126
⏱️ Latency: 0.004 seconds
📨 New message: [ID=90] Message 90 at 1752996872.128
⏱️ Latency: 0.005 seconds
📨 New message: [ID=91] Message 91 at 1752996874.230
⏱️ Latency: 0.005 seconds
📨 New message: [ID=92] Message 92 at 1752996876.233
⏱️ Latency: 0.005 seconds
(producerのチェック後に92番目のメッセージが送られていたみたいです。)
こんな感じで送受信が確認できました。