Help us understand the problem. What is going on with this article?

Apache Kafkaを触った

MDC Advent Calendar 2019 の15日目です。

Kafkaとは何か

LinkedInが作ったOSSの分散メッセージングシステム(メッセージングキュー)。
高スループット、高スケーラビリティ。
Java(Scala)で書かれている。

Producer、Broker、Consumerの3つのコンポーネントで構成される。
ProducerからストリーミングされたデータをConsumerへ中継する。対障害のためにデータの永続化もする。送達保証も実現。
  ・ Producer: メッセージを配信する
  ・ Broker: ProducerからConsumerへメッセージの受け渡しをする
  ・ Consumer: メッセージを受け取る

※仔細な仕組みやアーキテクチャ等については以下が参考になります。
 ・Apache Kafkaの概要とアーキテクチャ
 ・Apache Kafkaに入門した

何に使うのか

ユースケースとしては以下のようなものが挙げられます。

  1. システムがサイロ化するのを防ぐためにデータハブとしてアーキテクチャに組み込む(マイクロサービスとかで)
  2. Fluentdなどと連携してログ収集に使う
  3. Webサイトのユーザのページ移動とかを収集してWebアクティビティ分析に使う
  4. IoTデバイスのセンサーの値を集約し、可視化や分析、他のデバイスの制御などに使う
  5. ビッグデータ、機械学習、etc

具体的なところだと以下。
LINEの大規模データパイプライン
Yahooリアルタイム検索
大手ヘルスケアIT企業 Cerner社のKafka活用事例

動かしてみる

簡単なサンプルを作って試してみます。
今回はKafka本体はKafka-dockerを使って環境構築をして、ProducerとConsumerのクライアント側はkafka-pythonを使いました。

Kafka-dockerのインストール・起動

公式にあるとおり、kafka-dockerをダウンロードして、docker-compose.ymlのKAFKA_ADVERTISED_HOST_NAMEにdocker hostのIPアドレスを書いたあと、

docker-compose up -d

すればOKです。

参考: kafka in dockerのチュートリアル

Producerを実装する

ほんとうはTwitter Striming APIみたいなデータをとってきたり、IoTのセンサーの値を取得したかったですが、今回は用意がないので一旦適当に数値を取れるものを、ということでマウスのx座標を取得して1秒おきにKafkaに送付するスクリプトを書きました。

KafkaProducerの引数bootstrap_serverに渡す値は、docker-compose.ymlにも書いたdocker hostのIPアドレスと、kafkaのコンテナに割り当てられたPort番号を指定します。
割り当てられてるPort番号はdocker psで確認できます。
スクリーンショット 2019-12-14 21.42.04.png
上の場合は32783がそれです。
以下が作成したProducer側のソースコードです。
procuer.send(testというTopicに現在のマウスのx座標を投げています。

from kafka import KafkaProducer
import pyautogui
import time

def main():
    producer = KafkaProducer(bootstrap_servers='{Docker HostのIPアドレス}:{Port}')
    while True:
        result = producer.send('test', str(pyautogui.position().x).encode()).get(timeout=60)
        print(result)
        time.sleep(1)

if __name__ == '__main__':
    main()

Consumerを実装する

次にConsumer側の実装です。同様にKafkaのIPアドレスとPortを指定します。
for message in consumer:でKafkaからデータを逐次pullしてきてくれます。

from kafka import KafkaConsumer

def main():
    consumer = KafkaConsumer(
            'test', 
            bootstrap_servers=['{Docker HostのIPアドレス}:{Port}'])

    for message in consumer:
        print("x = " + message.value.decode())

if __name__ == '__main__':
    main()

実行

下の左側でProducer側、右側でConsumerを実行しています。
Producer側からKafkaに送った値(マウスのx座標)をConsumer側でKafkaから取得して表示できているのが確認できます。

test.gif

今後やりたいこと

・ラズパイ、Arduinoとか使ってセンサーの値を組み込む。
・取得したデータをグラフにしたり、解析したりする。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away