Kafkaでストリーム処理を書けるようになった
2016/06 時点の最新リリース v0.10 から、Kafkaにストリーム処理のアプリケーションを書くためのライブラリが入った。Kafka本体に同梱されているので追加で何かをインストールする必要はない。このライブラリを使うと、
「KafkaのトピックAにデータが入ってきたら、即座ににそれを処理して別のトピックBに格納する」
というアプリケーションを簡単に作ることができる。なお、Kafkaに同梱されているからといって、Kafka本体、すなわちブローカー側に何か特殊な仕掛けが導入されたわけではない。Kafka Streamは単なるKafkaのクライアントアプリである。別の言い方をすると、Samza とか SparkStreaming とかでできることを、Kafka 本体だけでもできるようになったということができる。
Kafka Streams で実現できるストリーミング処理は以下の特徴を持つ。
- at least once な処理を実現する
- ステートフルな処理が可能(Window処理等)
- 複数台・複数スレッドで並列動作する
- 耐障害性を持つ
- 特別な分散協調プロセスが存在しないシンプルな基盤構成
少し勉強してでも身につける価値があるといえよう。
Kafka Streams 概要
Kafka Streams はプログラマがKafkaを使ったアプリケーションを作成するのを手伝うためのライブラリである。そのインターフェースは2つ、すなわち High Level な Kafka Streams DSL と、Low Levelの Processor API が存在する。現時点でドキュメント化されてるのは Kafka Streams DSLなので、プログラマはまずDSLから入るのがよいし、本投稿もDSLに基づいたものである。
Stream のセマンティックス
Kafka Streams における Stream とは、 <Key, Value>
で識別されるレコードが絶え間なく流れてくるものである。 Stream に流されるこのレコードには二種類の意味づけが可能である。具体的には <52番信号機, 青>
というようなレコードは、信号機の今の状態は青である、というように意味づけできる。つまり「レコードは、keyの現在の状態を表している」という風に解釈することができる。 このように解釈されるレコードが流れる Stream を Kafka Streams では changelog stream という。かたや <3246番のユーザ, 100円入金>
というレコードは、ユーザの残高が100円であるということではなく、ユーザの残高を100円増加させるイベントが流れてきたというように意味づけができるだろう。このように、「レコードはkeyに紐づいた何らかの値を変化するものだ」というように意味づけをすることもできる。このようなレコードが流れるstreamを record stream というように名前を付ける。
強調するべきは、Streamを流れるレコードに対してどのように解釈・処理をするのかは、プログラマが決めなくてはならない、ということだ。いや、プログラマに決める自由があるというべきか。いずれにせよ、Streamは単にレコードの集合であり、changelog stream なのか record stream なのかは、(レコードの意味するところをくみ取って、もしくはしたい処理にしたがって)プログラマが決めるのだ。
Kafka Streams では「changelog stream には KStream」、「record stream にはKTable」というクラスを割り当てている。以降それを見ていこう。
KStream/KTable と状態
KStream と KTable どっちが複雑かは明確だろう。答えは KTable。
KTableにおいては、レコードは、key に紐づく何らかの値の 状態 が変化するモノと考える。すなわち、これまでどうであったか?という状態を覚えておく必要がある。すなわち KTable を使ったストリーミング処理は、状態を持つ、すなわち ステートフルなストリーミング処理 である。一方、 KStream ではレコードの値がそのまま、紐づくkeyの値を示しているので、何かを覚えておく必要がない。シンプルである。
KStreamはKStreamを生成する
これは自明だろう。例えば、KStreamには filter
というメソッドが存在する。これはストリームに含まれるレコードのうち、特定の条件にマッチしたレコードだけでできたストリームを生成する。つまり、filter
の戻り値は KStream なのである。
このようにStreamに処理を加え、新たにStreamをつくり、さらに処理を加え、という形でデータを処理していく。これはKafka Streams のプログラミングスタイルがメソッドチェーンで書けることに反映される。例えば、stream.filter().map().map()
というようにかくことができる。
KTableはKTable を生成する
これも同様。 たとえば、KTable にはforeach
というメソッドがあるが、これは各keyに対して何らかのアクションを取るのだが、返り値が KTableになる、すなわちKTableがKTableを生成するのは自明だろう
KStream は KTable を生成するし、逆もしかり
これもある意味自明である。前述の通り、KStreamかKTableかはただのセマンティクスの違いに過ぎず、実体はただのレコードの流れだからだ。例えば、WordCountを考えてみよう。あるwordの頻度を管理するようにKTableを作っていたとして、これを出力する際に KStream にすることは可能だ。具体的には KTable には toStream
というメソッドがあり、生成されたKStreamは、その時点でのwordの頻度が流れることになる。
逆に KStream のメソッドがKTable を生成することもある。あるWindow幅で集計をする KStream のメソッド aggregateByKey
がKTableを生成するのは、納得がいくことと思う。
Kafka Streams アプリケーションの作り方
ここまでがわかればKafka Streams を使ったプログラムはどのような形式をしているか、は想像がつくだろう。
- Kafkaのトピック名を指定して、KStream or KTable とする
- 得られた KStream/KTable に対して
filter
などのメソッドを呼び出し、処理を実施するとともに(返り値として)新たな KStream/KTable を生成する - 2.を繰り返し実行して、目的とする処理を実現する
- KStream/KTable の
to
メソッドを実行して Kafkaのトピックにしまう
次章では具体的な例を見ながら、理解を深めよう。
Kafka Streams Examples を読み解く
cofluent社がいくつかの例を挙げているので、それを読み解いていこう。 https://github.com/confluentinc/examples/tree/master/kafka-streams/src/main/java/io/confluent/examples/streams
例:文字列を大文字にする
MapFunctionLambdaExample.java は最も簡単なKafka Streams の例である。Kafkaのトピック TextLinesTopic
に入ってきたレコードを大文字に変更して、トピック OriginalAndUppercased
に格納するものだ。
まず、入力のトピックは、状態をもたないので KStreamとして解釈すればよい。以下のようにBuilderのインスタンスを作ってから、トピック名を指定してKStream を作成する。ちなみにSerdeはserialize/deserializeの略。
KStreamBuilder builder = new KStreamBuilder();
KStream<byte[], String> textLines = builder.stream(byteArraySerde, stringSerde, "TextLinesTopic");
得られたKStreamに、map
を呼び出して大文字に変換する処理を施した新たなKStreamを生成する。Key, Valueの形にしなくてはいけないので、とりあえずkey を変換前の value というようにしている。
KStream<String, String> originalAndUppercased =
textLines.map((key, value) -> KeyValue.pair(value, value.toUpperCase()));
それをKafkaトピックに格納する。
originalAndUppercased.to(stringSerde, stringSerde, "OriginalAndUppercased");
例:WordCount
次に簡単なのはWordCountLambdaExample.javaだろう。
トピックTextLinesTopic
を入力とするKStreamをつくり、
textLines = builder.stream(stringSerde, stringSerde, "TextLinesTopic")
メソッドチェーンで処理を書く。
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream()
.to(stringSerde, longSerde, "WordsWithCountsTopic");
上の処理を説明的に書くと、
- 各レコードのValueを空白区切りで分解した、単語ストリームを作り、
- keyにもvalueにも単語を持つようなストリームに変換し、
- keyで集計するようなKTableを作り、
- それをKStreamに変換して
- 出力トピックに格納する
ということをしている。各メソッドの詳細については、javadoc
https://kafka.apache.org/0100/javadoc/index.html
を参考のこと。
例: 直近1時間における分野別ニュース Top100
Avroでシリアライゼーションされたトピックから、直近1hrのページを取得する。まずは、
builder.stream("PageViews")
.filter((dummy, record) -> isArticle(record))
.map((dummy, article) -> new KeyValue<>(article, article))
.countByKey(TimeWindows.of("PageViewCountWindows", 60 * 60 * 1000L), avroSerde)
とする。すなわち、
- ストリームのうち、"ART"フラグを持つ(注:Articleと思われる)レコードのみをfilterして
-
<value, value>
というストリームに変換して - keyでcountする(ただし3600sec=1hrというWindow幅)ことで、KTableをつくる
このKTableに対して、以下のように集約処理をする。
viewCounts.groupBy(★A, windowedStringSerde, avroSerde)
.aggregate(★B, ★C, ★D, new PriorityQueueSerde<>(), "AllArticles")
まず、
- 分野(industryName)でgroupByすることで、KGroupTable<分野名, 統計情報>を生成する
- 分野ごとに用意した順序付きキューに入れるという集約処理aggregateをする
これによって得られた分野別のキューから、上位100件を取り出すことで、分野別のTop100を生成することができる。
最後に
こんなにも簡単にストリーム処理が書けるのは驚きとしか言いようがない。Kafka Streamsは間違えなく使える一品であるという印象を持った。
参考:
http://docs.confluent.io/2.1.0-alpha1/streams/index.html
https://kafka.apache.org/0100/javadoc/index.html