LoginSignup
5
4

More than 3 years have passed since last update.

kafka-nodeのテスト

Last updated at Posted at 2019-04-14
はじめに

マイクロサービスにおけるサービス間連携でnodejs × kafkaの利用シーンは増えています。
というわけでnodejs × kafkaの概念実証(PoC)メモ。
あくまでPoCなんで超簡易的です。

ディレクトリ構成

kafka/
 ├ docker-compose.yml
 ├ node_modules
 ├ mackage.json
 └ test
 │ └ kafka-test.js
 └ src
 │ └ consumer
 │ └ app.js
 │ └ producer
 │ └ app.js

ファイル 説明 実行コマンド
test/kafka-test.js Unit Test ava -v -u
consumer/app.js Consumer実行ファイル node src/consumer/app.js
producer/app.js Producer実行ファイル node src/producer/app.js

セッティング

  • docker-compose.yml
version: "2"

services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    links:
      - zookeeper
  • kafkaトピック作成
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test1
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
# トピックからメッセージ取得
$ docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test1 --from-beginning
  • package.json
{
  "name": "kafka",
  "version": "1.0.0",
  "description": "kafka-node",
  "main": "index.js",
  "dependencies": {
    "ava": "^1.4.1",
    "chai": "^4.2.0",
    "kafka-node": "^4.1.0"
  },
  "devDependencies": {},
  "scripts": {
    "test": "ava -v -u"
  },
  "author": "",
  "license": "ISC"
}
  • test/kafka-test.js
import test from "ava";
import chai from "chai";
import kafka from "kafka-node";

// producerテスト
test.serial.cb("producer test", t => {
    const Producer = kafka.Producer;
    const client = new kafka.KafkaClient({
        kafkaHost: "192.168.33.50:9092"
    });
    const producer = new Producer(client, {
        partitionerType: 1
    });

    producer.on("ready", () => {
        const payloads = [
            {
                topic: "test1",
                messages: JSON.stringify({name: "神崎・H・アリア", age: 16})
            }
            ,{
                topic: "test",
                messages: [
                    JSON.stringify({name: "間宮あかり", age: 15}),
                   JSON.stringify({name: "佐々木志乃", age: 15})
               ]
            }
        ];

        producer.send(payloads, (err, data) => {
            t.end();
        });
    });
});

// consumerテスト
test.serial.cb("consumer test", t => {
    const Consumer = kafka.Consumer;
    const client = new kafka.KafkaClient({
        kafkaHost: "192.168.33.50:9092"
    });
    const consumer = new Consumer(
        client,
        [{topic: "test1", partision:0}],
        {
            groupId: "simple-consumer1",
            autoCommit: true,
            fromOffset: true
        }
    );

    consumer.on("message", (message, err) => {
        const json = JSON.stringify(message.value);
            chai.assert.isString(json.name);
            chai.assert.isNumber(json.age);
            if (json.age === 16) {
               chai.assert.propertyVal(json.attr,'name','神崎・H・アリア');
            }
    });
    t.end();
});

Unit Test

$ npm test

> kafka@1.0.0 test /home/vagrant/kafka
> ava -v -u


  ✔ producer test (291ms)
  ✔ consumer test

  2 tests passed

Kafka Producer

  • src/producer/app.js
"use strict";
var kafka = require("kafka-node");

const Producer = kafka.HighLevelProducer;
const client = new kafka.KafkaClient({
    kafkaHost: "192.168.33.50:9092"
});
const producer = new Producer(client, {
    partitionerType: 1
});

producer.on("ready", () => {
    const payloads = [
        {
            topic: "mytopic-1",
            messages: JSON.stringify({name: "tanaka takeshi", age: 24, sex: "M"})
        }
        ,{
            topic: "mytopic-2",
            messages: [
                JSON.stringify({name: "suzuki aiko", age: 20, sex: "F"}),
                JSON.stringify({name: "yamashita yuji", age: 28, sex: "M"})
            ]
        }
    ];

    let sends = 0;

    producer.send(payloads, (err, data) => {
        if (err) console.log(err);
        else console.log('send %d messages', ++sends);
        process.exit();
    });
});

Kafka Consumer

  • src/consumer/app.js
'use strict';
var kafka = require('kafka-node');

const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({kafkaHost: "192.168.33.50:9092"});
const consumer = new Consumer(
    client,
    [{topic: "mytopic-2", partision:0}],
    {
        groupId: "simple-consumer",
        autoCommit: true,
        fromOffset: true
    }
);

consumer.on("message", (message, err) => {
    if (err) console.log("error : " + err);

    const json = JSON.parse(message.value);
    console.log("JSON:" + JSON.stringify(json));
    console.log("Name:" + json.name);
    console.log("Age:" + json.age);
    console.log("Sex Type:" + json.sex);
});

consumer.on('error', function (err) {
    console.log('error', err);
});

終わりに

本格的に使う機会がないので取りあえずNodejsで動かしてみただけっす。

参考

apache kafka quickstart(kafkaの基本的な使い方)
kafka-node

5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4