LoginSignup
10

More than 5 years have passed since last update.

Kafka in DockerにNode.jsのproducerとconsumerコンテナから接続する

Posted at

前回作成したKafkaクラスタをテストするために、簡単なNode.jsのproducerとconsumer用のコンテナを作成します。追加コンテナもKafkaとZooKeeperと同じdocker-compose.ymlに含めたかったのですが、うまく動かせませんでした。producerとconsumerのコンテナは通常のdocker runコマンドで起動することにします。

kafka-node

Node.jsのKafkaクライアントはいくつかGitHubにあがっています。

今回のKafkaのバージョンは0.8.2.1です。Prozessは0.6のままので、0.8に対応しているkafka-nodを使うことにします。

プロジェクト

最初に作成したファイルのディレクトリ構造です。適当なディレクトリを作成します。

$ cd ~/docker_apps
$ tree
.
├── docker-compose.yml
├── kafka_consumer
│   ├── Dockerfile
│   ├── app.js
│   └── package.json
├── kafka_docker
│   ├── Dockerfile
│   ├── LICENSE
│   ├── README.md
│   ├── broker-list.sh
│   ├── docker-compose-single-broker.yml
│   ├── docker-compose.yml
│   ├── download-kafka.sh
│   ├── start-kafka-shell.sh
│   └── start-kafka.sh
└── kafka_producer
    ├── Dockerfile
    ├── app.js
    └── package.json

kafka_dockerはgit cloneします。

$ cd ~/docker_apps
$ git clone https://github.com/SOHU-Co/kafka-node.git

プログラム

docker-compose.ymlは前回と変わりません。ここにproducerとconsumerのコンテナも追加したいのですが、起動の順番が制御できず動作できませんでした。

docker-compose.yml

~/docker_apps/docker-compose.yml
zookeeper:
  image: wurstmeister/zookeeper
  ports:
    - "2181"
kafka:
  build: ./kafka_docker
  ports:
    - "9092"
  links:
    - zookeeper:zk
  environment:
    KAFKA_ADVERTISED_HOST_NAME: 10.3.0.165
  volumes:
    - /var/run/docker.sock:/var/run/docker.sock

Dockerfileとpackage.json

producerとconsumerのDockerfile、package.jsonは同じです。

~/docker_apps/kafka_producer/Dockerfile
FROM node:0.12-onbuild

package.jsonはnameとdescriptionを変更します。

~/docker_apps/kafka_producer/package.json
{
  "name": "kafka-node-producer-app",
  "description": "kafka-node-producer app",
  "version": "0.0.1",
  "private": true,
  "dependencies": {
    "kafka-node": "0.2.26"
  },
  "scripts": {"start": "node app.js"}
}

kafka_producer/app.js

High Levelのproducerのサンプルはhigh-level-producer.jsにあります。ZooKeeperのホストとポート番号は環境変数より取得します。

~/docker_apps/kafka_producer/app.js
'use strict';
var kafka = require('kafka-node'),
    HighLevelProducer = kafka.HighLevelProducer,
    Client = kafka.Client,
    host = [process.env.ZK_PORT_2181_TCP_ADDR,':',
            process.env.ZK_PORT_2181_TCP_PORT],
    client = new Client(host.join('')),
    producer = new HighLevelProducer(client),
    count = 10, rets = 0;

producer.on('ready', function () {
    setInterval(send, 1000);
});

producer.on('error', function (err) {
    console.log('error', err)
});

function send() {
    var payloads = [
        {topic: 'topic1', messages: ['hello','world']}
    ];
    producer.send(payloads, function (err, data) {
        if (err) console.log(err);
        else console.log('send %d messages', ++rets);
        if (rets === count) process.exit();
    });
}

kafka_consumer

High Levelのconsumerのサンプルもhigh-level-consumer.jsにあります。ZooKeeperの情報やオリジナルからtopic名を固定にするなど少し変更しています。

~/docker_apps/kafka_consumer/app.js
'use strict';
var kafka = require('kafka-node'),
    HighLevelConsumer = kafka.HighLevelConsumer,
    Client = kafka.Client,
    host = [process.env.ZK_PORT_2181_TCP_ADDR,':',
            process.env.ZK_PORT_2181_TCP_PORT],
    client = new Client(host.join('')),
    topics = [ { topic: 'topic1' }],
    options = { autoCommit: true, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 },
    consumer = new HighLevelConsumer(client, topics, options);

consumer.on('message', function (message) {
    console.log(message);
});

consumer.on('error', function (err) {
    console.log('error', err);
});

Dockerコンテナの起動

Docker ComposeからKafkaとZooKeeperのコンテナを起動します。

$ cd ~/docker_apps
$ docker-compose up

producerのDockerイメージをビルドしてコンテナを起動します。メッセージは10回送信します。--linksフラグを追加してプログラムから環境変数を通してZooKeeperのIPアドレスとポート番号を取得できるようにします。ZooKeeperの名前はDocker Composeが自動的に設定しているのでdocker-compose psコマンドから名前を確認しておきます。

producerのコンテナを起動します。

$ cd ~/docker_apps/kafka_producer
$ docker build -t kafka_producer .
$ docker run --rm --link dockerapps_zookeeper_1:zk kafka_producer
...
send 9 messages
send 10 messages

consumerのDockerイメージをビルドしてコンテナを起動します。producerは毎回メッセージを2つ送信しているのでconsumerには20個のメッセージが届きます。

$ cd ~/docker_apps/kafka_consumer
$ docker build -t kafka_consumer .
$ docker run --rm --link dockerapps_zookeeper_1:zk kafka_consumer
...
{ topic: 'topic1',
  value: 'hello',
  offset: 18,
  partition: 0,
  key: <Buffer > }
{ topic: 'topic1',
  value: 'world',
  offset: 19,
  partition: 0,
  key: <Buffer > }

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10