Edited at

gRPCでApache Kafkaを使ってみる

More than 1 year has passed since last update.

この記事は 富士通クラウドテクノロジーズ Advent Calendar 2017 の7日目です。

昨日は @makky05 さんの「 FJCTアルバイト ~タスク振り返り感動的ダイジェスト~ 」でした。

とても楽しそうで感動的な業務内容でしたね。ぜひとも「Wifi接続状況で社員がどこにいるか探すサービス」を復活させて欲しいものです。そしてこの1年間で圧倒的な成長があったようで何よりです!


概要

今回は、gRPCでApache Kafkaを使ってみたいと思います。

具体的には、Apache KafkaにgRPCプロキシしてくれる Kafka-Pixy を使用して .proto ファイルからコードを生成し、Apache Kafkaへメッセージの投入、メッセージの取得を行いたいと思います。

gRPC とは、Googleが2015年2月に公開したRPCフレームワークの1つです。

IDLで .proto ファイルを記述し、そのファイルから 様々な言語 のサーバー・クライアント側に必要なソースコードのひな形を生成することができます。また、HTTP/2で通信を行います。

詳しい説明はとてもわかりやすい記事がたくさんありますので、そちらをご参照いただければと思います。

What is gRPC?

gRPCって何?

REST APIの設計で消耗している感じたときのgRPC入門

ProtocolBuffersについて調べてみた

Apache Kafka とは、LinkedInが公開したOSSの分散メッセージングシステムで、大容量のデータを高スループット、低レイテンシで収集・配信することを目的に開発されいるPull型のPub/Subシステムです。

こちらも詳しくは、わかりやすい記事がたくさんありますので、そちらにお任せしたいと思います。

Introduction

Kafka 公式サイトの Introduction を読んだメモ

Apache Kafka ―入門からTrifectaを用いた可視化まで―

分散型メッセージングミドルウェアの詳細比較


Kafka-Pixyについて

Kafka-PixyMailgun Team が開発しているApache Kafka用のgRPCおよびREST APIのプロキシです。

今回はこのKafka-Pixyを使うことで、gRPCでApache Kafkaを使ってみようと思います。

このプロキシのインターフェースが kafkapixy.proto で定義されており、この .proto ファイルからKafka-Pixyを使うclientのコードを生成することができます。

Kafka-Pixyを経由することで、 .proto ファイルから生成したコードを使用することができるので、Kafkaを使う際のclient側の実装コストを抑えることができます。


動作環境

$ cat /etc/redhat-release 

CentOS Linux release 7.4.1708 (Core)

$ java -version
openjdk version "9-ea"
OpenJDK Runtime Environment (build 9-ea+163)
OpenJDK 64-Bit Server VM (build 9-ea+163, mixed mode)

$ python --version
Python 3.6.2


Apache Kafka


  • インストール(基本的には こちら を参考に)

$ wget http://ftp.jaist.ac.jp/pub/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz 

$ tar -zxf kafka_2.12-1.0.0.tgz


  • とりあえずデフォルトの設定のまま起動

$ cd kafka_2.12-1.0.0

$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties


  • とりあえずtopicを作成してみる

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic

$ bin/kafka-topics.sh --list --zookeeper localhost:2181


  • メッセージを入れてみる

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

message hoge


  • メッセージを取得してみる

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

message hoge


gRPC

今回はPythonで試したいと思います。

Python Quickstart を参考に

$ python -m pip install grpcio

$ python -m pip install grpcio-tools


Kafka-Pixy

$ curl -L https://github.com/mailgun/kafka-pixy/releases/download/v0.14.0/kafka-pixy-v0.14.0-linux-amd64.tar.gz | tar xz

$ cd kafka-pixy-v0.14.0-linux-amd64
$ cp default.yaml config.yaml
#とりあえずデフォルトのまま起動
$ ./kafka-pixy --config config.yaml


.protoファイルからコードを生成

kafka-pixy-v0.14.0-linux-amd64 の中に kafkapixy.proto があるので、それを使ってコードを生成したと思います。

今回はあらかじめsampleディレクトリを作成しておき、そこにコードを生成します。

$ python -m grpc_tools.protoc -I./kafka-pixy-v0.14.0-linux-amd64 --python_out=./sample --grpc_python_out=./sample ./kafka-pixy-v0.14.0-linux-amd64/kafkapixy.proto

すると、 kafkapixy_pb2.pykafkapixy_pb2_grpc.py の2つが生成されます。

なお、 quick-start-python.md にある通り、 kafkapixy_pb2.pykafkapixy_pb2_grpc.pyKafka-Pixy に生成済みのものがあるので、それをコピーして使っても大丈夫です。


生成したコードを使ってKafkaにメッセージ投入・取得

上記で生成した kafkapixy_pb2.pykafkapixy_pb2_grpc.py を使ってApache Kafkaにメッセージ投入・取得をしてみます。

今回はとりあえずメッセージを投入・取得するだけのコードを書きました。


メッセージ投入用コード


sample_producer.py

import grpc

from kafkapixy_pb2 import ProdRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys

grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)

def produce(kafkapixy_client ,topic, msg):
rq = ProdRq(topic=topic, message=msg)
rs = kafkapixy_client.Produce(rq)
return rs

def main():
topic = sys.argv[1]
msg = bytes(sys.argv[2], encoding="utf-8")
produce(kafkapixy_client, topic, msg)

if __name__ == "__main__":
main()



メッセージ取得用コード


sample_consumer.py

import grpc

from kafkapixy_pb2 import ConsNAckRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys

grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)

def consume(kafkapixy_client, group, topic):

ack_partition = None
ack_offset = None
rq = ConsNAckRq(topic=topic, group=group)
keep_running = True
while keep_running:
if ack_offset is None:
rq.no_ack = True
rq.ack_partition = 0
rq.ack_offset = 0
else:
rq.no_ack = False
rq.ack_partition = ack_partition
rq.ack_offset = ack_offset

try:
rs = kafkapixy_client.ConsumeNAck(rq)
except grpc.RpcError as err:
if err.code() == grpc.StatusCode.NOT_FOUND:
ack_offset = None
continue
else:
print(err.result)
continue

try:
ack_partition = rs.partition
ack_offset = rs.offset
except:
ack_offset = None

print(rs.message)
ack_partition = rs.partition
ack_offset = rs.offset

def main():
topic = sys.argv[1]
group = sys.argv[2]
consume(kafkapixy_client, group, topic)

if __name__ == "__main__":
main()



実際にメッセージを投入・取得してみる

デフォルトでは、新しくグループが追加されると、その後に投入されたメッセージから取得するようになっているため、今回は先にメッセージ取得用のスクリプトを実行しておいて(グループが存在しなければ作成してくれます)からメッセージを投入します。

(デフォルトではコンシューマーの設定が auto.offset.reset=latest のため)


  • メッセージ取得用のスクリプトを実行しておく

#test-topicからgroup_idにtest-consumer-groupを指定してメッセージを取得

$ python sample_consumer.py test-topic test-consumer-group


  • メッセージを投入してみる

#test-topicにhogehoge1というメッセージを投入

$ python sample_producer.py test-topic hogehoge1


  • 結果

$ python sample_consumer.py test-topic test-consumer-group

b'hogehoge1'

無事投入したメッセージを取得することができました。


おまけ


  • Kafka-PixyはREST APIも対応していて、以下のようにメッセージを取得することも可能です

$ curl "http://localhost:19092/topics/test-topic/messages?group=test-consumer-group"

{
"key": null,
"value": "aGVsbG8=",
"partition": 0,
"offset": 64
}
#keyとvalueはbase64エンコードされた値となります


  • gRPC, REST APIのlistenポートはKafka-Pixyの設定ファイルで以下のように定義されます

# TCP address that gRPC API server should listen on.

grpc_addr: 0.0.0.0:19091

# TCP address that RESTful API server should listen on.
tcp_addr: 0.0.0.0:19092


まとめ

今回はKafka-Pixyを用いて、gRPCでApache Kafkaを使ってみました。

Kafka-Pixyの .proto ファイルからclient側が使う言語に合わせてソースコードを生成できるので、簡単にclient側の実装ができました。

gRPCでは、サーバー側のメリットとしても、 .proto ファイルから生成されたコードを使用でき、 .proto ファイルにAPI仕様を強制的に明文化できるなどがあります。今後はgRPCを使ってサーバー側の実装もやってみたいですね。

Kafkaに関する内容は薄くなってしまいましたが、手っ取り早く使う方法は書けたのではないかと思っております。

(そうでないと感じられた方、その他指摘がある場合はコメント頂けますと幸いです。)

明日は @ntoofu さんの 「IaaS基盤をテストする環境を作る話」 です。とても興味深いですね。