MDC Advent Calendar 2019 の15日目です。
Kafkaとは何か
LinkedInが作ったOSSの分散メッセージングシステム(メッセージングキュー)。
高スループット、高スケーラビリティ。
Java(Scala)で書かれている。
Producer、Broker、Consumerの3つのコンポーネントで構成される。
ProducerからストリーミングされたデータをConsumerへ中継する。対障害のためにデータの永続化もする。送達保証も実現。
・ Producer: メッセージを配信する
・ Broker: ProducerからConsumerへメッセージの受け渡しをする
・ Consumer: メッセージを受け取る
※仔細な仕組みやアーキテクチャ等については以下が参考になります。
・Apache Kafkaの概要とアーキテクチャ
・Apache Kafkaに入門した
何に使うのか
ユースケースとしては以下のようなものが挙げられます。
- システムがサイロ化するのを防ぐためにデータハブとしてアーキテクチャに組み込む(マイクロサービスとかで)
- Fluentdなどと連携してログ収集に使う
- Webサイトのユーザのページ移動とかを収集してWebアクティビティ分析に使う
- IoTデバイスのセンサーの値を集約し、可視化や分析、他のデバイスの制御などに使う
- ビッグデータ、機械学習、etc
具体的なところだと以下。
・LINEの大規模データパイプライン
・Yahooリアルタイム検索
・大手ヘルスケアIT企業 Cerner社のKafka活用事例
動かしてみる
簡単なサンプルを作って試してみます。
今回はKafka本体はKafka-dockerを使って環境構築をして、ProducerとConsumerのクライアント側はkafka-pythonを使いました。
Kafka-dockerのインストール・起動
公式にあるとおり、kafka-dockerをダウンロードして、docker-compose.ymlのKAFKA_ADVERTISED_HOST_NAME
にdocker hostのIPアドレスを書いたあと、
docker-compose up -d
すればOKです。
Producerを実装する
ほんとうはTwitter Striming APIみたいなデータをとってきたり、IoTのセンサーの値を取得したかったですが、今回は用意がないので一旦適当に数値を取れるものを、ということでマウスのx座標を取得して1秒おきにKafkaに送付するスクリプトを書きました。
KafkaProducerの引数bootstrap_serverに渡す値は、docker-compose.ymlにも書いたdocker hostのIPアドレスと、kafkaのコンテナに割り当てられたPort番号を指定します。
割り当てられてるPort番号はdocker psで確認できます。
上の場合は32783がそれです。
以下が作成したProducer側のソースコードです。
procuer.send(
でtest
というTopicに現在のマウスのx座標を投げています。
from kafka import KafkaProducer
import pyautogui
import time
def main():
producer = KafkaProducer(bootstrap_servers='{Docker HostのIPアドレス}:{Port}')
while True:
result = producer.send('test', str(pyautogui.position().x).encode()).get(timeout=60)
print(result)
time.sleep(1)
if __name__ == '__main__':
main()
Consumerを実装する
次にConsumer側の実装です。同様にKafkaのIPアドレスとPortを指定します。
for message in consumer:
でKafkaからデータを逐次pullしてきてくれます。
from kafka import KafkaConsumer
def main():
consumer = KafkaConsumer(
'test',
bootstrap_servers=['{Docker HostのIPアドレス}:{Port}'])
for message in consumer:
print("x = " + message.value.decode())
if __name__ == '__main__':
main()
実行
下の左側でProducer側、右側でConsumerを実行しています。
Producer側からKafkaに送った値(マウスのx座標)をConsumer側でKafkaから取得して表示できているのが確認できます。
今後やりたいこと
・ラズパイ、Arduinoとか使ってセンサーの値を組み込む。
・取得したデータをグラフにしたり、解析したりする。