18
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

gRPCでApache Kafkaを使ってみる

Last updated at Posted at 2017-12-06

この記事は 富士通クラウドテクノロジーズ Advent Calendar 2017 の7日目です。

昨日は @makky05 さんの「 FJCTアルバイト ~タスク振り返り感動的ダイジェスト~ 」でした。
とても楽しそうで感動的な業務内容でしたね。ぜひとも「Wifi接続状況で社員がどこにいるか探すサービス」を復活させて欲しいものです。そしてこの1年間で圧倒的な成長があったようで何よりです!

概要

今回は、gRPCでApache Kafkaを使ってみたいと思います。
具体的には、Apache KafkaにgRPCプロキシしてくれる Kafka-Pixy を使用して .proto ファイルからコードを生成し、Apache Kafkaへメッセージの投入、メッセージの取得を行いたいと思います。

gRPC とは、Googleが2015年2月に公開したRPCフレームワークの1つです。
IDLで .proto ファイルを記述し、そのファイルから 様々な言語 のサーバー・クライアント側に必要なソースコードのひな形を生成することができます。また、HTTP/2で通信を行います。
詳しい説明はとてもわかりやすい記事がたくさんありますので、そちらをご参照いただければと思います。
What is gRPC?
gRPCって何?
REST APIの設計で消耗している感じたときのgRPC入門
ProtocolBuffersについて調べてみた

Apache Kafka とは、LinkedInが公開したOSSの分散メッセージングシステムで、大容量のデータを高スループット、低レイテンシで収集・配信することを目的に開発されいるPull型のPub/Subシステムです。
こちらも詳しくは、わかりやすい記事がたくさんありますので、そちらにお任せしたいと思います。
Introduction
Kafka 公式サイトの Introduction を読んだメモ
Apache Kafka ―入門からTrifectaを用いた可視化まで―
分散型メッセージングミドルウェアの詳細比較

Kafka-Pixyについて

Kafka-PixyMailgun Team が開発しているApache Kafka用のgRPCおよびREST APIのプロキシです。
今回はこのKafka-Pixyを使うことで、gRPCでApache Kafkaを使ってみようと思います。
このプロキシのインターフェースが kafkapixy.proto で定義されており、この .proto ファイルからKafka-Pixyを使うclientのコードを生成することができます。
Kafka-Pixyを経由することで、 .proto ファイルから生成したコードを使用することができるので、Kafkaを使う際のclient側の実装コストを抑えることができます。

動作環境

$ cat /etc/redhat-release 
CentOS Linux release 7.4.1708 (Core)

$ java -version
openjdk version "9-ea"
OpenJDK Runtime Environment (build 9-ea+163)
OpenJDK 64-Bit Server VM (build 9-ea+163, mixed mode)

$ python --version
Python 3.6.2

Apache Kafka

  • インストール(基本的には こちら を参考に)
$ wget http://ftp.jaist.ac.jp/pub/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz 
$ tar -zxf kafka_2.12-1.0.0.tgz
  • とりあえずデフォルトの設定のまま起動
$ cd kafka_2.12-1.0.0
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
  • とりあえずtopicを作成してみる
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
  • メッセージを入れてみる
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
message hoge
  • メッセージを取得してみる
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
message hoge

gRPC

今回はPythonで試したいと思います。
Python Quickstart を参考に

$ python -m pip install grpcio
$ python -m pip install grpcio-tools

Kafka-Pixy

$ curl -L https://github.com/mailgun/kafka-pixy/releases/download/v0.14.0/kafka-pixy-v0.14.0-linux-amd64.tar.gz | tar xz
$ cd kafka-pixy-v0.14.0-linux-amd64
$ cp default.yaml config.yaml
#とりあえずデフォルトのまま起動
$ ./kafka-pixy --config config.yaml

.protoファイルからコードを生成

kafka-pixy-v0.14.0-linux-amd64 の中に kafkapixy.proto があるので、それを使ってコードを生成したと思います。
今回はあらかじめsampleディレクトリを作成しておき、そこにコードを生成します。

$ python -m grpc_tools.protoc -I./kafka-pixy-v0.14.0-linux-amd64 --python_out=./sample --grpc_python_out=./sample ./kafka-pixy-v0.14.0-linux-amd64/kafkapixy.proto

すると、 kafkapixy_pb2.pykafkapixy_pb2_grpc.py の2つが生成されます。

なお、 quick-start-python.md にある通り、 kafkapixy_pb2.pykafkapixy_pb2_grpc.pyKafka-Pixy に生成済みのものがあるので、それをコピーして使っても大丈夫です。

生成したコードを使ってKafkaにメッセージ投入・取得

上記で生成した kafkapixy_pb2.pykafkapixy_pb2_grpc.py を使ってApache Kafkaにメッセージ投入・取得をしてみます。
今回はとりあえずメッセージを投入・取得するだけのコードを書きました。

メッセージ投入用コード

sample_producer.py
import grpc
from kafkapixy_pb2 import ProdRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys

grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)

def produce(kafkapixy_client ,topic, msg):
    rq = ProdRq(topic=topic, message=msg)
    rs = kafkapixy_client.Produce(rq)
    return rs

def main():
    topic = sys.argv[1]
    msg = bytes(sys.argv[2], encoding="utf-8")
    produce(kafkapixy_client, topic, msg)

if __name__ == "__main__":
    main()

メッセージ取得用コード

sample_consumer.py
import grpc
from kafkapixy_pb2 import ConsNAckRq
from kafkapixy_pb2_grpc import KafkaPixyStub
import sys

grpc_channel = grpc.insecure_channel("127.0.0.1:19091")
kafkapixy_client = KafkaPixyStub(grpc_channel)

def consume(kafkapixy_client, group, topic):

    ack_partition = None
    ack_offset = None
    rq = ConsNAckRq(topic=topic, group=group)
    keep_running = True
    while keep_running:
        if ack_offset is None:
            rq.no_ack = True
            rq.ack_partition = 0
            rq.ack_offset = 0
        else:
            rq.no_ack = False
            rq.ack_partition = ack_partition
            rq.ack_offset = ack_offset

        try:
            rs = kafkapixy_client.ConsumeNAck(rq)
        except grpc.RpcError as err:
            if err.code() == grpc.StatusCode.NOT_FOUND:
                ack_offset = None
                continue
            else:
                print(err.result)
                continue

        try:
            ack_partition = rs.partition
            ack_offset = rs.offset
        except:
            ack_offset = None

        print(rs.message)
        ack_partition = rs.partition
        ack_offset = rs.offset

def main():
    topic = sys.argv[1]
    group = sys.argv[2]
    consume(kafkapixy_client, group, topic) 

if __name__ == "__main__":
    main()

実際にメッセージを投入・取得してみる

デフォルトでは、新しくグループが追加されると、その後に投入されたメッセージから取得するようになっているため、今回は先にメッセージ取得用のスクリプトを実行しておいて(グループが存在しなければ作成してくれます)からメッセージを投入します。
(デフォルトではコンシューマーの設定が auto.offset.reset=latest のため)

  • メッセージ取得用のスクリプトを実行しておく
#test-topicからgroup_idにtest-consumer-groupを指定してメッセージを取得
$ python sample_consumer.py test-topic test-consumer-group
  • メッセージを投入してみる
#test-topicにhogehoge1というメッセージを投入
$ python sample_producer.py test-topic hogehoge1
  • 結果
$ python sample_consumer.py test-topic test-consumer-group
b'hogehoge1'

無事投入したメッセージを取得することができました。

おまけ

  • Kafka-PixyはREST APIも対応していて、以下のようにメッセージを取得することも可能です
$ curl "http://localhost:19092/topics/test-topic/messages?group=test-consumer-group"
{
  "key": null,
  "value": "aGVsbG8=",
  "partition": 0,
  "offset": 64
}
#keyとvalueはbase64エンコードされた値となります
  • gRPC, REST APIのlistenポートはKafka-Pixyの設定ファイルで以下のように定義されます
# TCP address that gRPC API server should listen on.
grpc_addr: 0.0.0.0:19091

# TCP address that RESTful API server should listen on.
tcp_addr: 0.0.0.0:19092

まとめ

今回はKafka-Pixyを用いて、gRPCでApache Kafkaを使ってみました。
Kafka-Pixyの .proto ファイルからclient側が使う言語に合わせてソースコードを生成できるので、簡単にclient側の実装ができました。
gRPCでは、サーバー側のメリットとしても、 .proto ファイルから生成されたコードを使用でき、 .proto ファイルにAPI仕様を強制的に明文化できるなどがあります。今後はgRPCを使ってサーバー側の実装もやってみたいですね。
Kafkaに関する内容は薄くなってしまいましたが、手っ取り早く使う方法は書けたのではないかと思っております。
(そうでないと感じられた方、その他指摘がある場合はコメント頂けますと幸いです。)

明日は @ntoofu さんの 「IaaS基盤をテストする環境を作る話」 です。とても興味深いですね。

18
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?