この記事は 富士通クラウドテクノロジーズ Advent Calendar 2017 の7日目です。

昨日は @makky05 さんの「 FJCTアルバイト ~タスク振り返り感動的ダイジェスト~ 」でした。
とても楽しそうで感動的な業務内容でしたね。ぜひとも「Wifi接続状況で社員がどこにいるか探すサービス」を復活させて欲しいものです。そしてこの1年間で圧倒的な成長があったようで何よりです!

概要

今回は、gRPCでApache Kafkaを使ってみたいと思います。
具体的には、Apache KafkaにgRPCプロキシしてくれる Kafka-Pixy を使用して .proto ファイルからコードを生成し、Apache Kafkaへメッセージの投入、メッセージの取得を行いたいと思います。

gRPC とは、Googleが2015年2月に公開したRPCフレームワークの1つです。
IDLで .proto ファイルを記述し、そのファイルから 様々な言語 のサーバー・クライアント側に必要なソースコードのひな形を生成することができます。また、HTTP/2で通信を行います。
詳しい説明はとてもわかりやすい記事がたくさんありますので、そちらをご参照いただければと思います。
What is gRPC?
gRPCって何?
REST APIの設計で消耗している感じたときのgRPC入門
ProtocolBuffersについて調べてみた

Apache Kafka とは、LinkedInが公開したOSSの分散メッセージングシステムで、大容量のデータを高スループット、低レイテンシで収集・配信することを目的に開発されいるPull型のPub/Subシステムです。
こちらも詳しくは、わかりやすい記事がたくさんありますので、そちらにお任せしたいと思います。
Introduction
Kafka 公式サイトの Introduction を読んだメモ
Apache Kafka ―入門からTrifectaを用いた可視化まで―
分散型メッセージングミドルウェアの詳細比較

Kafka-Pixyについて

Kafka-PixyMailgun Team が開発しているApache Kafka用のgRPCおよびREST APIのプロキシです。
今回はこのKafka-Pixyを使うことで、gRPCでApache Kafkaを使ってみようと思います。
このプロキシのインターフェースが kafkapixy.proto で定義されており、この .proto ファイルからKafka-Pixyを使うclientのコードを生成することができます。
Kafka-Pixyを経由することで、 .proto ファイルから生成したコードを使用することができるので、Kafkaを使う際のclient側の実装コストを抑えることができます。

動作環境

$ cat /etc/redhat-release 
CentOS Linux release 7.4.1708 (Core)

$ java -version
openjdk version "9-ea"
OpenJDK Runtime Environment (build 9-ea+163)
OpenJDK 64-Bit Server VM (build 9-ea+163, mixed mode)

$ python --version
Python 3.6.2

Apache Kafka

  • インストール(基本的には こちら を参考に)
$ wget http://ftp.jaist.ac.jp/pub/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz 
$ tar -zxf kafka_2.12-1.0.0.tgz
  • とりあえずデフォルトの設定のまま起動
$ cd kafka_2.12-1.0.0
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
  • とりあえずtopicを作成してみる
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
  • メッセージを入れてみる
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
message hoge
  • メッセージを取得してみる
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
message hoge

gRPC

今回はPythonで試したいと思います。
Python Quickstart を参考に

$ python -m pip install grpcio
$ python -m pip install grpcio-tools

Kafka-Pixy

$ curl -L https://github.com/mailgun/kafka-pixy/releases/download/v0.14.0/kafka-pixy-v0.14.0-linux-amd64.tar.gz | tar xz
$ cd kafka-pixy-v0.14.0-linux-amd64
$ cp default.yaml config.yaml
#とりあえずデフォルトのまま起動
$ ./kafka-pixy --config config.yaml

.protoファイルからコードを生成

kafka-pixy-v0.14.0-linux-amd64 の中に kafkapixy.proto があるので、それを使ってコードを生成したと思います。
今回はあらかじめsampleディレクトリを作成しておき、そこにコードを生成します。

$ python -m grpc_tools.protoc -I./kafka-pixy-v0.14.0-linux-amd64 --python_out=./sample --grpc_python_out=./sample ./kafka-pixy-v0.14.0-linux-amd64/kafkapixy.proto

すると、 kafkapixy_pb2.pykafkapixy_pb2_grpc.py の2つが生成されます。

なお、 quick-start-python.md にある通り、 kafkapixy_pb2.pykafkapixy_pb2_grpc.pyKafka-Pixy に生成済みのものがあるので、それをコピーして使っても大丈夫です。

生成したコードを使ってKafkaにメッセージ投入・取得

上記で生成した kafkapixy_pb2.pykafkapixy_pb2_grpc.py を使ってApache Kafkaにメッセージ投入・取得をしてみます。
今回はとりあえずメッセージを投入・取得するだけのコードを書きました。

メッセージ投入用コード

sample_producer.py
import grpc
from kafkapixy_pb2 import ProdRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys

grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)

def produce(kafkapixy_client ,topic, msg):
    rq = ProdRq(topic=topic, message=msg)
    rs = kafkapixy_client.Produce(rq)
    return rs

def main():
    topic = sys.argv[1]
    msg = bytes(sys.argv[2], encoding="utf-8")
    produce(kafkapixy_client, topic, msg)

if __name__ == "__main__":
    main()

メッセージ取得用コード

sample_consumer.py
import grpc
from kafkapixy_pb2 import ConsNAckRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys

grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)

def consume(kafkapixy_client, group, topic):

    ack_partition = None
    ack_offset = None
    rq = ConsNAckRq(topic=topic, group=group)
    keep_running = True
    while keep_running:
        if ack_offset is None:
            rq.no_ack = True
            rq.ack_partition = 0
            rq.ack_offset = 0
        else:
            rq.no_ack = False
            rq.ack_partition = ack_partition
            rq.ack_offset = ack_offset

        try:
            rs = kafkapixy_client.ConsumeNAck(rq)
        except grpc.RpcError as err:
            if err.code() == grpc.StatusCode.NOT_FOUND:
                ack_offset = None
                continue
            else:
                print(err.result)
                continue

        try:
            ack_partition = rs.partition
            ack_offset = rs.offset
        except:
            ack_offset = None

        print(rs.message)
        ack_partition = rs.partition
        ack_offset = rs.offset

def main():
    topic = sys.argv[1]
    group = sys.argv[2]
    consume(kafkapixy_client, group, topic) 

if __name__ == "__main__":
    main()

実際にメッセージを投入・取得してみる

デフォルトでは、新しくグループが追加されると、その後に投入されたメッセージから取得するようになっているため、今回は先にメッセージ取得用のスクリプトを実行しておいて(グループが存在しなければ作成してくれます)からメッセージを投入します。
(デフォルトではコンシューマーの設定が auto.offset.reset=latest のため)

  • メッセージ取得用のスクリプトを実行しておく
#test-topicからgroup_idにtest-consumer-groupを指定してメッセージを取得
$ python sample_consumer.py test-topic test-consumer-group
  • メッセージを投入してみる
#test-topicにhogehoge1というメッセージを投入
$ python sample_producer.py test-topic hogehoge1
  • 結果
$ python sample_consumer.py test-topic test-consumer-group
b'hogehoge1'

無事投入したメッセージを取得することができました。

おまけ

  • Kafka-PixyはREST APIも対応していて、以下のようにメッセージを取得することも可能です
$ curl "http://localhost:19092/topics/test-topic/messages?group=test-consumer-group"
{
  "key": null,
  "value": "aGVsbG8=",
  "partition": 0,
  "offset": 64
}
#keyとvalueはbase64エンコードされた値となります
  • gRPC, REST APIのlistenポートはKafka-Pixyの設定ファイルで以下のように定義されます
# TCP address that gRPC API server should listen on.
grpc_addr: 0.0.0.0:19091

# TCP address that RESTful API server should listen on.
tcp_addr: 0.0.0.0:19092

まとめ

今回はKafka-Pixyを用いて、gRPCでApache Kafkaを使ってみました。
Kafka-Pixyの .proto ファイルからclient側が使う言語に合わせてソースコードを生成できるので、簡単にclient側の実装ができました。
gRPCでは、サーバー側のメリットとしても、 .proto ファイルから生成されたコードを使用でき、 .proto ファイルにAPI仕様を強制的に明文化できるなどがあります。今後はgRPCを使ってサーバー側の実装もやってみたいですね。
Kafkaに関する内容は薄くなってしまいましたが、手っ取り早く使う方法は書けたのではないかと思っております。
(そうでないと感じられた方、その他指摘がある場合はコメント頂けますと幸いです。)

明日は @ntoofu さんの 「IaaS基盤をテストする環境を作る話」 です。とても興味深いですね。

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.