Clojureアドベントカレンダーが出ると今年も終わりだという感じがでてきますね!皆様にとって素敵なClojure yearだったことを願っています。
僕はClojure仲間と(dosync radio)というpodcastを始めたり、Clojure/conjに参加してマサカリを投げる練習をしたのがハイライトでした。来年も開催地が同じだったら皆でマサカリ投げに行きましょう。
マサカリ以外だと今年は仕事でKafkaをひたすら使っていたので、自分の理解を整理しようという動機でKafkaについてまとめてみました。
今回の記事に登場するsnippetを書き散らかしたレポジトリはこちらです。
Kafkaとは?
Apache Kafka自体はスケーラブルなメッセージブローカーです。カタログ的なspecについてはこの記事で触れないことにしましたが、後述するKafka Streamsというストリーム処理の抽象化を提供するJavaライブラリ(= Clojureから使える!)の存在が世に存在するその他のメッセージブローカーと比較する際のKiller Featureだと考えています。
ブローカーを動かす
ブローカーを動かさないことには始まりません。
選択肢としては
- apache kafka提供のtgz
- confluent提供のzip/tar
- docker
- Confluent PlatformやAWS MSK, Cloud Karafka等のサービス
等があります。
dockerで動かす場合は同一ホストのdockerコンテナからアクセスする際にadvertized.*の設定のdockerコンテナのnetworkの理解が必要1で難しかったので仕事の開発ではサービス提供のブローカーにインターネット経由でアクセスする方法を最初に採用し、現在ではリモートのkubernetesクラスタデプロイされているbrokerにアクセスして開発をしています。
今回はローカルでClojureを動かすので上記の問題が発生しないためdockerでいきます。
Topicの作成
ここからようやくClojureです。世には既にClojure Kafkaラッパーが多数存在してはいますが、今回は自分の勉強を兼ねているので生Java APIを叩きます。
最初にAdminClientクラスを使ってトピックを作成します。Partitionは同時に動かしたいメッセージを処理するアプリケーションのインスタンスの数によって決めます。例えばpartition数が10であれば最大10インスタンスのアプリケーションで分散処理ができます。今回はbrokerは1インスタンスしかないのでreplication factorは1です(2以上を指定するとエラー)。
;; deps.edn
{:deps {org.apache.kafka/kafka-clients {:mvn/version "2.3.1"}}}
;; code
(import [org.apache.kafka.clients.admin AdminClient NewTopic])
(def admin-clint (AdminClient/create {"bootstrap.servers" "localhost:9092"}))
(let [topic-name "foo"
partitions 8
replication-factor 1]
(-> admin-clint
(.createTopics [(NewTopic. topic-name partitions replication-factor)])
.all
.get))
作成したトピックはlistTopicsメソッドの結果から確認できます。
(.get (.names (.listTopics admin-clint)))
=> #{"foo" "_confluent-metrics" "__confluent.support.metrics"}
ブローカー側でトピックを自動で作成するオプションもありますが、本番で動かす際にはトピックの設定を再現可能にするべきなので手動で作成する方が良いと言えます。
Produce
トピックを作成したらばトピックへの読み書きをやってみましょう。
書き込みはProducerクラスのsendメソッドを使います。下記のsnippetでは"topic"というトピックに"key"というkeyをもった"value"というレコードを作成しています。また、sendはFutureを返すので@
でderefすることができ、送信の結果を同期的に確認できます。
(def producer
(KafkaProducer.
{"bootstrap.servers" "localhost:9092"
"key.serializer" "org.apache.kafka.common.serialization.StringSerializer"
"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"}))
@(.send producer (ProducerRecord. "topic" "key" "value"))
=> #object[org.apache.kafka.clients.producer.RecordMetadata 0x689a8aa2 "foo-3@3"]
;;"foo-3@3" = "<topic-name>-<partition>@<offset>"
ここでは文字列を送信していますが、現実的なアプリケーションではserializerを変更して複雑なClojureオブジェクトをレコードに渡します。需要があれば書きます。
また、recordは同じkeyであれば必ず同じpartitionに届くという性質があり、更に同一partition内ではメッセージが順番に処理されることが保証されるため順序が大事な処理に関しては覚えておくべきポイントになります。
Consume
Consumerのインスタンスを作り、トピックにsubscribeしてpollをすることでレコードを読み出すことができます。
kafkaにはConsumer Groupというコンセプトがあり、group.id毎にトピックをどこまで読んだかというオフセットの情報をグループ毎に独立して管理できるようになっています。
deserializerもserialierと同じ話ですが、現実のアプリケーションでは複雑なClojureオブジェクトをやりとりします。下記のsnippetではレコードのkeyとvalueだけを読んでいますが、他にもレコードがどのtopicのどのpartitionから来たのか等の情報を返すメソッドが提供されています2。
(def consumer
(KafkaConsumer.
{"bootstrap.servers" "localhost:9092"
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"group.id" "my-group"}))
(.subscribe consumer ["foo"])
(->> (.poll consumer (java.time.Duration/ofMillis 200))
(.iterator)
(iterator-seq)
(map (juxt #(.key %) #(.value %)))
(into []))
=> [["key" "value"] ...]
Kafka Streams
真打です。
ブローカーの視点ではレコードはバイト配列でしかないため、Kafkaのトピックに読み書きをするという使い方だけであればClojure,ひいてはjvm上で行う特筆するべきメリットは最もサポートされているという点以外には特にありません。そこで出てくる目玉商品がKafka Streamsになります。
Kafka Streamsはストリーム処理をアプリケーションのロジックとして記述できるdslを提供するJavaライブラリです。これだけだとふわっとした説明でしかないのですが、リアルタイムでtopicのレコードを別のレコードにmapしたり、分岐したり、topic間でレコードを結合したり、集約操作を行ったりができて幸せという話しです。
また、別の言い方をするとデータを受けとってデータを作るためのフレームワークとも言えます。Clojureと相性良さそうですね?
MinimalなStreamの例
下記はfooトピックに来たレコードをひたすらprnする処理です。基本的にはStreamsBuilderのインスタンスを作り、KStream3クラスにあるメソッドでストリーム処理を記述していきます。
(ns kafka.streams
(:import [org.apache.kafka.streams KafkaStreams StreamsBuilder StreamsConfig]
[org.apache.kafka.common.serialization Serdes]
[org.apache.kafka.streams.kstream KStream ForeachAction]))
(defn print-each-foo-record []
(let [builder (StreamsBuilder.)]
(-> builder
(.stream "foo")
(.foreach (reify
ForeachAction
(apply [this k v]
(prn k v)))))
builder))
できたStreamsBuilderをKafkaStreamsのインスタンスに設定と共に渡してstartメソッドを呼ぶと起動します。applicatoin.idはconsumerのgroup.idと同じようなコンセプトで、トピックをどこまで処理したかをアプリケーション毎に管理できます。
(def stream
(doto (KafkaStreams.
(.build (print-each-foo-record))
(StreamsConfig. {"bootstrap.servers" "localhost:9092"
"application.id" "my-app"
"default.key.serde" (.getClass (Serdes/String))
"default.value.serde" (.getClass (Serdes/String))}))
(.start)))
これで前の項目で作ったproducerを使ってfooトピックに書き込みを行うと勝手に書いた物が出力されるかと思います
@(.send producer (ProducerRecord. "foo"
"Some random key"
"Clojure Rocks!"))
;; "Some random key" "Clojure Rocks!"
複雑なTopology + Look Up
折角なので少し複雑なTopologyを作ってみます。特に意味は無いですが、textトピックに入ってきた文字列を1~n-gramにマップしてgramsトピックに流します。
e.g. "foo" => "f" "o" "o" "fo" "oo" "foo"
1つの入力レコードに対して複数レコードを作るのでflatMapを使用します。
(defn n-grams* [s]
(->> (.length s)
(range)
(map (fn [len]
(->> (partition (inc len) 1 s)
(map str/join))))))
(defn n-gram []
(let [builder (StreamsBuilder.)]
(-> builder
(.stream "text")
(.flatMap (reify KeyValueMapper
(apply [this k v]
(->> (n-grams* v)
(flatten)
(map #(KeyValue. (str (count %)) %))))))
(.to "grams"))
builder))
最後にgramsトピックに入ってきた文字列がnごとに何個あるかを知りたいとします。そういう時のためにストリームに対するgroupBy操作があります。下記はgramsトピックに来た文字列を、長さ(n-gramでkeyを文字列の長さにしているため)でグルーピングして数えています。
(defn ngram-count-table []
(let [builder (StreamsBuilder.)]
(-> builder
(.stream "grams")
(.groupByKey)
(.count (Materialized/as "count-table")))
builder))
上記のようなtopologyがある場合、起動したstreamのインスタンスのストアに対して現在のKeyに対する値をルックアップすることができます。下記は1-gramの数をlook upしています。
(def count-table-stream
(doto (KafkaStreams.
(.build (ngram-count-table))
stream-config
(.start)))
(.get (.store count-table-stream
"count-table"
(QueryableStoreTypes/keyValueStore))
"1")
=> 83
その他
自分のKafkaの基礎部分の理解を深める目的を優先したのでSerializer/Deserializerの話やライブラリのClojureラッパーの話はしませんでしたが、軽く触れると
ClojureでKafkaをやっていく際Serdeの選択肢としては
- fressian
- transit+msgpack,transit+json
- nippy
- json
- edn
- avro
があります。jsonやedn等のテキストデータだとデータの量が多くなってパフォーマンスに影響があるのでバイナリベースの物を使うのが基本かな、というところです。
Clojure Kafka Wrapperで認識している物としてはは
- jackdaw - 2018年のConjで発表された。ストリームAPIまでカバー
- kinsky - 古参。ストリームAPIはサポートしていない。
という感じです。
まとめ
Clojure x Kafkaを仕事で使っている視点から基礎的な事をカバーしてみました。
Clojureを今やっている人にはKafka良いぞ、Kafkaを今やっている人でClojureをやっていない人にはClojureでもできるよ、というメッセージになれば幸いです。