この記事は 富士通クラウドテクノロジーズ Advent Calendar 2017 の7日目です。
昨日は @makky05 さんの「 FJCTアルバイト ~タスク振り返り感動的ダイジェスト~ 」でした。
とても楽しそうで感動的な業務内容でしたね。ぜひとも「Wifi接続状況で社員がどこにいるか探すサービス」を復活させて欲しいものです。そしてこの1年間で圧倒的な成長があったようで何よりです!
概要
今回は、gRPCでApache Kafkaを使ってみたいと思います。
具体的には、Apache KafkaにgRPCプロキシしてくれる Kafka-Pixy を使用して .proto
ファイルからコードを生成し、Apache Kafkaへメッセージの投入、メッセージの取得を行いたいと思います。
gRPC とは、Googleが2015年2月に公開したRPCフレームワークの1つです。
IDLで .proto
ファイルを記述し、そのファイルから 様々な言語 のサーバー・クライアント側に必要なソースコードのひな形を生成することができます。また、HTTP/2で通信を行います。
詳しい説明はとてもわかりやすい記事がたくさんありますので、そちらをご参照いただければと思います。
What is gRPC?
gRPCって何?
REST APIの設計で消耗している感じたときのgRPC入門
ProtocolBuffersについて調べてみた
Apache Kafka とは、LinkedInが公開したOSSの分散メッセージングシステムで、大容量のデータを高スループット、低レイテンシで収集・配信することを目的に開発されいるPull型のPub/Subシステムです。
こちらも詳しくは、わかりやすい記事がたくさんありますので、そちらにお任せしたいと思います。
Introduction
Kafka 公式サイトの Introduction を読んだメモ
Apache Kafka ―入門からTrifectaを用いた可視化まで―
分散型メッセージングミドルウェアの詳細比較
Kafka-Pixyについて
Kafka-Pixy は Mailgun Team が開発しているApache Kafka用のgRPCおよびREST APIのプロキシです。
今回はこのKafka-Pixyを使うことで、gRPCでApache Kafkaを使ってみようと思います。
このプロキシのインターフェースが kafkapixy.proto で定義されており、この .proto
ファイルからKafka-Pixyを使うclientのコードを生成することができます。
Kafka-Pixyを経由することで、 .proto
ファイルから生成したコードを使用することができるので、Kafkaを使う際のclient側の実装コストを抑えることができます。
動作環境
$ cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core)
$ java -version
openjdk version "9-ea"
OpenJDK Runtime Environment (build 9-ea+163)
OpenJDK 64-Bit Server VM (build 9-ea+163, mixed mode)
$ python --version
Python 3.6.2
Apache Kafka
- インストール(基本的には こちら を参考に)
$ wget http://ftp.jaist.ac.jp/pub/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz
$ tar -zxf kafka_2.12-1.0.0.tgz
- とりあえずデフォルトの設定のまま起動
$ cd kafka_2.12-1.0.0
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
- とりあえずtopicを作成してみる
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
- メッセージを入れてみる
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
message hoge
- メッセージを取得してみる
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
message hoge
gRPC
今回はPythonで試したいと思います。
Python Quickstart を参考に
$ python -m pip install grpcio
$ python -m pip install grpcio-tools
Kafka-Pixy
- howto-install.md を参考に
$ curl -L https://github.com/mailgun/kafka-pixy/releases/download/v0.14.0/kafka-pixy-v0.14.0-linux-amd64.tar.gz | tar xz
$ cd kafka-pixy-v0.14.0-linux-amd64
$ cp default.yaml config.yaml
#とりあえずデフォルトのまま起動
$ ./kafka-pixy --config config.yaml
.protoファイルからコードを生成
kafka-pixy-v0.14.0-linux-amd64
の中に kafkapixy.proto
があるので、それを使ってコードを生成したと思います。
今回はあらかじめsampleディレクトリを作成しておき、そこにコードを生成します。
$ python -m grpc_tools.protoc -I./kafka-pixy-v0.14.0-linux-amd64 --python_out=./sample --grpc_python_out=./sample ./kafka-pixy-v0.14.0-linux-amd64/kafkapixy.proto
すると、 kafkapixy_pb2.py と kafkapixy_pb2_grpc.py の2つが生成されます。
なお、 quick-start-python.md にある通り、 kafkapixy_pb2.py や kafkapixy_pb2_grpc.py は Kafka-Pixy に生成済みのものがあるので、それをコピーして使っても大丈夫です。
生成したコードを使ってKafkaにメッセージ投入・取得
上記で生成した kafkapixy_pb2.py と kafkapixy_pb2_grpc.py を使ってApache Kafkaにメッセージ投入・取得をしてみます。
今回はとりあえずメッセージを投入・取得するだけのコードを書きました。
メッセージ投入用コード
import grpc
from kafkapixy_pb2 import ProdRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys
grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)
def produce(kafkapixy_client ,topic, msg):
rq = ProdRq(topic=topic, message=msg)
rs = kafkapixy_client.Produce(rq)
return rs
def main():
topic = sys.argv[1]
msg = bytes(sys.argv[2], encoding="utf-8")
produce(kafkapixy_client, topic, msg)
if __name__ == "__main__":
main()
メッセージ取得用コード
import grpc
from kafkapixy_pb2 import ConsNAckRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys
grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)
def consume(kafkapixy_client, group, topic):
ack_partition = None
ack_offset = None
rq = ConsNAckRq(topic=topic, group=group)
keep_running = True
while keep_running:
if ack_offset is None:
rq.no_ack = True
rq.ack_partition = 0
rq.ack_offset = 0
else:
rq.no_ack = False
rq.ack_partition = ack_partition
rq.ack_offset = ack_offset
try:
rs = kafkapixy_client.ConsumeNAck(rq)
except grpc.RpcError as err:
if err.code() == grpc.StatusCode.NOT_FOUND:
ack_offset = None
continue
else:
print(err.result)
continue
try:
ack_partition = rs.partition
ack_offset = rs.offset
except:
ack_offset = None
print(rs.message)
ack_partition = rs.partition
ack_offset = rs.offset
def main():
topic = sys.argv[1]
group = sys.argv[2]
consume(kafkapixy_client, group, topic)
if __name__ == "__main__":
main()
実際にメッセージを投入・取得してみる
デフォルトでは、新しくグループが追加されると、その後に投入されたメッセージから取得するようになっているため、今回は先にメッセージ取得用のスクリプトを実行しておいて(グループが存在しなければ作成してくれます)からメッセージを投入します。
(デフォルトではコンシューマーの設定が auto.offset.reset=latest
のため)
- メッセージ取得用のスクリプトを実行しておく
#test-topicからgroup_idにtest-consumer-groupを指定してメッセージを取得
$ python sample_consumer.py test-topic test-consumer-group
- メッセージを投入してみる
#test-topicにhogehoge1というメッセージを投入
$ python sample_producer.py test-topic hogehoge1
- 結果
$ python sample_consumer.py test-topic test-consumer-group
b'hogehoge1'
無事投入したメッセージを取得することができました。
おまけ
- Kafka-PixyはREST APIも対応していて、以下のようにメッセージを取得することも可能です
$ curl "http://localhost:19092/topics/test-topic/messages?group=test-consumer-group"
{
"key": null,
"value": "aGVsbG8=",
"partition": 0,
"offset": 64
}
#keyとvalueはbase64エンコードされた値となります
- gRPC, REST APIのlistenポートはKafka-Pixyの設定ファイルで以下のように定義されます
# TCP address that gRPC API server should listen on.
grpc_addr: 0.0.0.0:19091
# TCP address that RESTful API server should listen on.
tcp_addr: 0.0.0.0:19092
まとめ
今回はKafka-Pixyを用いて、gRPCでApache Kafkaを使ってみました。
Kafka-Pixyの .proto
ファイルからclient側が使う言語に合わせてソースコードを生成できるので、簡単にclient側の実装ができました。
gRPCでは、サーバー側のメリットとしても、 .proto
ファイルから生成されたコードを使用でき、 .proto
ファイルにAPI仕様を強制的に明文化できるなどがあります。今後はgRPCを使ってサーバー側の実装もやってみたいですね。
Kafkaに関する内容は薄くなってしまいましたが、手っ取り早く使う方法は書けたのではないかと思っております。
(そうでないと感じられた方、その他指摘がある場合はコメント頂けますと幸いです。)
明日は @ntoofu さんの 「IaaS基盤をテストする環境を作る話」 です。とても興味深いですね。