Help us understand the problem. What is going on with this article?

kafkaのRubyクライアントphobosで速攻Consumer (※Producerも可)

kafkaを楽に扱いたいなあと、良い感じのRubygemsを探しました。でもよく紹介されているのはたいていRailsとの連携を前提にしているRubyGemsたちです。

目的はただ、topicをサブスクライブしてカジュアルにディスパッチがしたいんだー。というのにはちょっと重たいかなって。

そんななか、見つけたのがphobosです。コード数行からConsumerをつくれてとってもハッピーです。

これなら(俺でも)スクリプト感覚でリアクションかけちゃうじゃないか。ということで紹介します。

テスト用kafka-brokerの準備

このあたりから引用して、とりあえずローカルでkafka-brokerを動かします。

ポートやenvをちょっとアレンジ。

zookeeperと、

run-zookeeper
$ docker run -d --rm \
  -p 2181:2181 \
  --name=zookeeper \
  -e ZOOKEEPER_CLIENT_PORT=2181 \
  -e ZOOKEEPER_TICK_TIME=2000 \
  -e ZOOKEEPER_SYNC_LIMIT=2 \
  confluentinc/cp-zookeeper:5.1.0

brokerです。

run-broker
$ docker run -d --rm \
  -p 9092:9092 \
  --name=kafka \
  --link=zookeeper \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_BROKER_ID=1 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false \
  confluentinc/cp-kafka:5.1.0

docker-composeならこんな感じ。

docker-compose.yml
---
version: "3.1"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.1.0
    ports:
      - 2181:2181
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_SYNC_LIMIT=2
  kafka:
    image: confluentinc/cp-kafka:5.1.0
    ports:
      - 9092:9092
    links:
      - zookeeper
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_BROKER_ID=2
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false

さて、kafka-brokerの準備が整ったところで、phobosの話へ移りましょう。

phobosセットアップ

Rubygemsのphobosをインストール。

$ bundle init
$ echo "gem 'phobos'" >> Gemfile 
$ bundle install --binstubs --path vendor/bundle

ヘルプが出るか確認します。

$ ./bin/phobos 
Commands:
  phobos help [COMMAND]  # Describe available commands or one specific command
  phobos init            # Initialize your project with Phobos
  phobos start           # Starts Phobos
  phobos version         # Outputs the version number. Can be used with: phobos -v or phobos --version

phobos initで初期ファイルを用意しましょう。

$ ./bin/phobos init
      create  config/phobos.yml
      create  phobos_boot.rb

この時点ですでになんか親切ですね。

軽くphobosの動作確認

config/phobos.yml に全体的な設定を記述します。

  • 接続情報などグローバルな設定
  • Producer/Consumerのデフォルト設定
  • Consumerの購読先(複数可)とリアクション

ちょいとinitがこしらえたconfig/phobos.ymlを覗いてみます。

config/phobos.yml(part)
listeners:
  - handler: Phobos::EchoHandler
    topic: test

testというtopicを購読、Phobos::EchoHandlerという組み込みのハンドラが処理する、という定義ですね。

前述の通りkafka-brokerがローカルに立ち上がっていれば変更は不要です、phobos startで起動してみましょう。

$ ./bin/phobos start
______ _           _
| ___ \ |         | |
| |_/ / |__   ___ | |__   ___  ___
|  __/| '_ \ / _ \| '_ \ / _ \/ __|
| |   | | | | (_) | |_) | (_) \__ \
\_|   |_| |_|\___/|_.__/ \___/|___/

phobos_boot.rb - find this file at /Users/sawanoboriyu/develop/src/sandbox/qiita_ruby-phobos/phobos_boot.rb

[2019-02-03T18:11:59:345+0900Z] INFO  -- Phobos : <Hash> {:message=>"Phobos configured", :env=>"N/A"}
[2019-02-03T18:11:59:374+0900Z] INFO  -- Phobos : <Hash> {:message=>"Listener started", :listener_id=>"442f72", :group_id=>"test-1", :topic=>"test", :handler=>"Phobos::EchoHandler"}

最後の行から、testというtopicのConsumerとして稼働したことがわかります。なにか流し込んでみますか。

では、流し込みにkafkacatをつかいましょう。homebrewなりで入れちゃいます。

※ 実際のところ、kafkacatですでにカジュアルなconsumerとして動かせるんですが、それは一旦置いときましょう。

kafkacat-1
$ echo aa | kafkacat -P -b localhost -t test

Phobos::EchoHandlerは受け取ったメッセージをログに出すだけのシンプルなハンドラです。:message=>"aa"をちゃんと受け取ってますね。

$ ./bin/phobos start
______ _           _
| ___ \ |         | |
| |_/ / |__   ___ | |__   ___  ___
|  __/| '_ \ / _ \| '_ \ / _ \/ __|

# -- snip --

[2019-02-03T18:12:14:481+0900Z] INFO  -- Phobos : <Hash> {:message=>"aa", :listener_id=>"442f72", :group_id=>"test-1", :topic=>"test", :handler=>"Phobos::EchoHandler", :key=>nil, :partition=>0, :offset=>0, :retry_count=>0}

ハンドラを作ってmessageを処理する。

phobos initが作ったファイルはもう一つあります、phobos_boot.rbを見てみましょう。

putしかない。

phobos_boot.rb(initial)
# Use this file to load your code
puts <<~ART
  ______ _           _
  | ___ \\ |         | |
  | |_/ / |__   ___ | |__   ___  ___
  |  __/| '_ \\ / _ \\| '_ \\ / _ \\/ __|
  | |   | | | | (_) | |_) | (_) \\__ \\
  \\_|   |_| |_|\\___/|_.__/ \\___/|___/
ART
puts "
phobos_boot.rb - find this file at #{File.expand_path(__FILE__)}

"

これはデフォルトでphobosが参照するエントリーポイントです。なにかするならこれに追記していく格好ですね。

ではPhobos::Handlerの代わりに使うハンドラを記述します、とりあえず#consumeがあればOKです。標準出力にputsするだけのハンドラ、MyHandlerを次のように書いてみました。

phobos_boot.rb(append-1)
class MyHandler
  include Phobos::Handler

  def consume(payload, metadata)
    puts 'your message is ' + payload
  end
end

このハンドラを使うようにconfig/phobos.ymlで設定を変更します。

config/phobos.yml(part)
listeners:
  - handler: MyHandler
    # handler: Phobos::EchoHandler
    topic: test

phobos startして、裏でmessageを流し込んでみます。

$ ./bin/phobos start
______ _           _
| ___ \ |         | |
| |_/ / |__   ___ | |__   ___  ___
|  __/| '_ \ / _ \| '_ \ / _ \/ __|
| |   | | | | (_) | |_) | (_) \__ \
\_|   |_| |_|\___/|_.__/ \___/|___/

phobos_boot.rb - find this file at /Users/sawanoboriyu/develop/src/sandbox/qiita_ruby-phobos/phobos_boot.rb

[2019-02-03T18:45:52:276+0900Z] INFO  -- Phobos : <Hash> {:message=>"Phobos configured", :env=>"N/A"}
[2019-02-03T18:45:52:287+0900Z] INFO  -- Phobos : <Hash> {:message=>"Listener started", :listener_id=>"b59783", :group_id=>"test-1", :topic=>"test", :handler=>"MyHandler"}

# 裏で `echo aa | kafkacat -P -b localhost -t test`

your message is aa

やったぜ。

ちなみにPhobos::Producerをincludeすれば任意のtopicにメッセージを流し込んだりできます。こちらも簡単なのでやっちゃってみてください。

例外どうなんの?

Consumerの処理で例外があると、シンプルなBackoffアルゴリズムでランダム待ちのあと、リトライします。
ですが、ConsumerGroupの仕組み上、どこまで処理したか覚えてるようで、このmessageに対するリトライが繰り返してしまいます。
さっさとログに吐いて終了とするなり、通知したりリトライ用のトピックなどにペイロードを回しちゃうなりをしないと次を処理しないようなのでそのように。

おわりに

これだけ簡単に扱えると、kafkaがぐっと身近になった感じがします。感覚的にはredisくらいまでカジュアル感UPです。
既存のなにかに組み込んでライブラリ的に使うのもOKで、そっちはamqp + bunnyな感じの使い勝手かなと思います。

Phobos自体だけでもスレッドでの並列処理をパラメータ一つで行えたり、複数のlistnerを個別に取り扱えたりと便利な上に、group_idとかkafka側の仕組みがしっかりしているので雑にスケールできるのもよいですね。

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away