16
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

MDCAdvent Calendar 2019

Day 15

Apache Kafkaを触った

Last updated at Posted at 2019-12-14

MDC Advent Calendar 2019 の15日目です。

Kafkaとは何か

LinkedInが作ったOSSの分散メッセージングシステム(メッセージングキュー)。
高スループット、高スケーラビリティ。
Java(Scala)で書かれている。

Producer、Broker、Consumerの3つのコンポーネントで構成される。
ProducerからストリーミングされたデータをConsumerへ中継する。対障害のためにデータの永続化もする。送達保証も実現。
  ・ Producer: メッセージを配信する
  ・ Broker: ProducerからConsumerへメッセージの受け渡しをする
  ・ Consumer: メッセージを受け取る

※仔細な仕組みやアーキテクチャ等については以下が参考になります。
 ・Apache Kafkaの概要とアーキテクチャ
 ・Apache Kafkaに入門した

何に使うのか

ユースケースとしては以下のようなものが挙げられます。

  1. システムがサイロ化するのを防ぐためにデータハブとしてアーキテクチャに組み込む(マイクロサービスとかで)
  2. Fluentdなどと連携してログ収集に使う
  3. Webサイトのユーザのページ移動とかを収集してWebアクティビティ分析に使う
  4. IoTデバイスのセンサーの値を集約し、可視化や分析、他のデバイスの制御などに使う
  5. ビッグデータ、機械学習、etc

具体的なところだと以下。
LINEの大規模データパイプライン
Yahooリアルタイム検索
大手ヘルスケアIT企業 Cerner社のKafka活用事例

動かしてみる

簡単なサンプルを作って試してみます。
今回はKafka本体はKafka-dockerを使って環境構築をして、ProducerとConsumerのクライアント側はkafka-pythonを使いました。

Kafka-dockerのインストール・起動

公式にあるとおり、kafka-dockerをダウンロードして、docker-compose.ymlのKAFKA_ADVERTISED_HOST_NAMEにdocker hostのIPアドレスを書いたあと、

docker-compose up -d

すればOKです。

参考: kafka in dockerのチュートリアル

Producerを実装する

ほんとうはTwitter Striming APIみたいなデータをとってきたり、IoTのセンサーの値を取得したかったですが、今回は用意がないので一旦適当に数値を取れるものを、ということでマウスのx座標を取得して1秒おきにKafkaに送付するスクリプトを書きました。

KafkaProducerの引数bootstrap_serverに渡す値は、docker-compose.ymlにも書いたdocker hostのIPアドレスと、kafkaのコンテナに割り当てられたPort番号を指定します。
割り当てられてるPort番号はdocker psで確認できます。
スクリーンショット 2019-12-14 21.42.04.png
上の場合は32783がそれです。
以下が作成したProducer側のソースコードです。
procuer.send(testというTopicに現在のマウスのx座標を投げています。

from kafka import KafkaProducer
import pyautogui
import time

def main():
    producer = KafkaProducer(bootstrap_servers='{Docker HostのIPアドレス}:{Port}')
    while True:
        result = producer.send('test', str(pyautogui.position().x).encode()).get(timeout=60)
        print(result)
        time.sleep(1)

if __name__ == '__main__':
    main()

Consumerを実装する

次にConsumer側の実装です。同様にKafkaのIPアドレスとPortを指定します。
for message in consumer:でKafkaからデータを逐次pullしてきてくれます。

from kafka import KafkaConsumer

def main():
    consumer = KafkaConsumer(
            'test', 
            bootstrap_servers=['{Docker HostのIPアドレス}:{Port}'])

    for message in consumer:
        print("x = " + message.value.decode())

if __name__ == '__main__':
    main()

実行

下の左側でProducer側、右側でConsumerを実行しています。
Producer側からKafkaに送った値(マウスのx座標)をConsumer側でKafkaから取得して表示できているのが確認できます。

test.gif

今後やりたいこと

・ラズパイ、Arduinoとか使ってセンサーの値を組み込む。
・取得したデータをグラフにしたり、解析したりする。

16
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?