0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Docker kafka コンテイナー設置

Last updated at Posted at 2022-04-24

docker kafkaでコンテイナーでメッセージングを開発する人のためここに記録を残します。

この投稿の環境は以下のようです。

  • Ubuntu 20.04 LTS
  • Docker version 20.10.12, build e91ed57

docker コンテイナーはzookeeperまで入ってる https://hub.docker.com/r/bitnami/kafka を使用しました。

上のリンクのoverviewの通り直接githubにあるyamlファイルをダウンして実行しました。

$ curl -sSL https://raw.githubusercontent.com/bitnami/bitnami-docker-kafka/master/docker-compose.yml > docker-compose.yml
$ docker-compose up -d

ここで適用されたdocker-compose.ymlは以下のようです。

version: "2"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:3.1
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

docker-compose up -dを実行するとzookeeperkafka二つのコンテイナーが立ち上がられます。

Pythonでproducerとconsumerを作ってテストをして見ました。

# Consumer.py
from kafka import KafkaConsumer
from json import loads
import time

topic_name = 'topic_test'
consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=['localhost:9092'],
        value_deserializer = lambda x: loads(x.decode('utf-8')),
        )

start = time.time()
print("[begin] Topic: %sで consumerがメッセージを受け取る。" % (topic_name))

for message in consumer:
    print("Partition: %d, Offset: %d, Value: %s" % (message.partition, message.offset, message.value))

print("[end]掛かる時間 : ", time.time() - start)
# Producer.py
from kafka import KafkaProducer
from json import dumps
import time

topic_name = 'topic_test'

producer = KafkaProducer(
        acks=0,
        compression_type = 'gzip',
        bootstrap_servers=['localhost:9092'],
        value_serializer = lambda x: dumps(x).encode('utf-8')
        )

start = time.time()

print("[begin] producerからメッセージ転送スタート")

for i in range(100):
    data = {'str': 'result'+str(i)}
    print("メッセージ転送中..." + data['str'])
    producer.send(topic_name, value=data)


producer.flush()

print("[end] 掛かる時間:", time.time() - start)

でも実行するとproducer.pyはflush()で動作が止まってるし、consumer.pyではkafka接続で止まってしまいます。

原因はlocahostがコンテイナーとubuntuサーバーの解析が異なってるからでした。https://hub.docker.com/r/bitnami/kafkaのoverview をちゃんと見たら Apache Kafka development setup exampleのセックションがありました。以下は開発用のdocker-compose.ymlです。

version: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

以前で設置したコンテイナーを消してこれで再び実行。

でも、kafkaのコンテイナーが立ち上げられないです。

docker logsで誤りを確認して見ると

[2022-04-22 16:00:52,785] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentBrokerIdException: Configured broker.id 1 doesn't match stored broker.id ...

ブロッカーが問題だったのか。。。一応上のdocker-compose.ymlからKAFKA BROKER ID=1を削除してコンテイナーを立ち上げ直します。

先にpythonコードをもう一度実行すると

$ python3 Producer.py
[begin] producerからメッセージ転送スタート
メッセージ転送中...result0
メッセージ転送中...result1
メッセージ転送中...result2
...
メッセージ転送中...result97
メッセージ転送中...result98
メッセージ転送中...result99
[end] 掛かる時間: 0.03677988052368164
$ python3 Comsumer.py
[begin] Topic: topic_testで consumerがメッセージを受け取る
Partition: 0, Offset: 200, Value: {'str': 'result0'}
Partition: 0, Offset: 201, Value: {'str': 'result1'}
Partition: 0, Offset: 202, Value: {'str': 'result2'}
Partition: 0, Offset: 203, Value: {'str': 'result3'}
Partition: 0, Offset: 204, Value: {'str': 'result4'}
...
Partition: 0, Offset: 296, Value: {'str': 'result96'}
Partition: 0, Offset: 297, Value: {'str': 'result97'}
Partition: 0, Offset: 298, Value: {'str': 'result98'}
Partition: 0, Offset: 299, Value: {'str': 'result99'}

Docker kafkaコンテイナーを通じてproducer, consumerの間にメッセージが正常的にやりとるをするのを確認できました。

参考で最終のdocker-compose.ymlは以下のようです。

version: "2"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:3.1
    hostname: kafka
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?