Data Integration as Service: Kafka-Streamsの紹介
アドベントカレンダー4日目のKafka-Connectに続き,今回はApache Kafka 0.10から同梱されていKafka-Streamsを紹介します。Kafka-StreamsはKafkaから入力トピックからデータを受け取り、何らかの処理を行って、Kafkaの出力トピックにデータを書き戻すという、ストリーミングアプリを書くためのライブラリです。Kafka-Connectと違い、ライブラリですので単体では何もできません。
※能書きはいいから、手っ取り早くどう使うのか知りたいという人は"まとめ"->"DSL"の順に読んでください。
特徴
- ミリ秒単位のレイテンシーでイベント毎の処理(マイクロバッチではない)
- DSLと低レベルAPIの2つを提供
- ステートフル・ステートレスの処理
- ストリームJOIN・集約
- 時間に対する窓関数により、一定時間毎の処理が可能
- ロードバランシング
- 耐障害性の確保、障害からの回復
- ダウンタイムなしにアプリのバージョン更新ができる
Kafka-Streams背景
特徴を読むと「Spark Streamingでできることばっかり?Apache Flink,Storm,Samza,...とか既にあるよな?」と思われるでしょう。しかしこれらの重量級のフレームワーク、さらには複雑なHadoopエコシステム一式がなくても書けるアプリも多くあるはずです。ストリームアプリの多くはmap-reduceを必要としているわけではないはず。
その一方Kafkaのconsumer/producer APIを直接叩くアプリは簡単に書けますが、スケーラビリティのための分散処理や、耐障害性のオフセット管理などを考え出すと結構手間です。
そこで、重量級フレームワークなしに、簡単にスケーラブルで信頼性の高いストリーミングアプリが書けるようにするのがKafka-Streams ライブラリ です。
ストリームはテーブルだ!
Kafka-Streamsではストリームとテーブルは二重性(二つの性質を同時に持つ)の概念とされています。RDBMSのテーブルはSQLでINSERT, UPDATE, DELETEという一連の操作を行った結果が格納されているのですから、ある種のストリームとみなせるということです。ストリームとテーブルはそれぞれKStream, KTableでモデル化されています。
※用語がややこしいのがKafka-Streamsの難点なのですが、KStreamもKTableもストリームのタイプです。実際次に出てくるようにKTableはチェンジログ ストリームと呼ばれています。KTableはキー毎に最新の値が格納されたキャッシュみたいなものです。Tableでなくもっと良い名前があったはず。
KStream - レコードストリーム
KStreamにはKafkaから受け取ったレコードがそのまま追加されます。DBの例なら,キーバリューペアをINSERTしたのと似ています。
今クリック数を表示するアプリを書いたとします。Kafkaのメッセージは(user, clicks)とし、アプリはuserをキーとしてsum(clicks)をするものとします。今次のように二つのレコードが逐次入ってきとします。
("alice", 1)
("alice", 3)
もしこのアプリがKStreamを使って実装されているなら、aliceのクリック数は1+3で4になります。
KTable - チェンジログストリーム
KTableはKafkaから受け取ったレコードで、キーを元に値を更新します。DBの例ならPKを使ってUPDATEを行うことに相当します。
上で書いたクリック数を表示するアプリの例なら、最新のレコードの値がKTableに入っているわけですから、クリック数は3ということになります。
集約
すでにsumの例をあげましたが、min, max, sumなどの集約関数でKStream/KTableとも計算が可能です。
タイムスタンプと窓関数
ストリームアプリはイベントが発生した時刻を元に処理を行うことが多いと思います。ただし複数のサーバーからKafkaに送られてくるイベントが発生順に到着する保障はないでしょう。窓関数で期間を設けることで、順序を正しくしてから処理することができます。
タイムスタンプのタイプはイベント時刻以外に、扱いの簡単な取込み時刻、処理時刻もあります。
タイムスタンプのタイプ | 概要 |
---|---|
event time | Kafka Producerが申告したイベントの発生時刻 |
ingestion time | Kafka Brokerがレコードを取り込んだ時刻 |
processing time | ストリーミングアプリが処理した時刻 |
窓関数のタイプは、タンブリング、ホッピング、スライディングの3つがあります。タンブリングは決まった間隔で定期的に繰り返される窓です。
ホッピングは定期的に繰り返される点はタンブリングと同じです。違いはオーバーラップすることです。例えば15秒ごとに30秒の期間のレコードを処理するなどという場合です。Spark Streamingのスライディングウィンドウはこれに相当します。株価の移動平均の計算はこの方法で計算されることが多いです。
スライディングウィンドウは、レコードのタイムスタンプから過去X秒を計算対象とする窓関数です。つまりイベントが発生する度に実行されます。Spark Streamingのスライディングウィンドウとは定義が異なりますので注意!
窓のタイプ | オーバーラップ | ギャップ |
---|---|---|
タンブリング Tumbling | なし | なし |
ホッピング Hopping | あり | 設定次第 |
スライディング Sliding | イベントの発生間隔次第 | イベントの発生間隔次第 |
JOIN
JOINには次の3つがあります
- KStreamとKStream: このJOINは常にウィンドウが適用されたJOINです。無限のストリームでは計算できませんので。新しいレコードが入る度、JOINが実行されます。
- KTableとKTable: これはRDBMSのJOINと似ています。新しいレコードが入る度にJOINが行われますが、結果はマテリアライズされ、常に最新の状態が維持されています。
- KStreamとKTable: このJOINはKStreamにレコードが入る度にKTableをルックアップするために使います。例えばユーザーのプロファイルがKTableにあり、ユーザーのアクティビティがKStreamから得られるとしたら、JOINすることでユーザーの詳細が得られます。
テーブル | (INNER) JOIN | OUTER JOIN | LEFT JOIN |
---|---|---|---|
KStream-to-KStream | ○ | ○ | ○ |
KTable-to-KTable | ○ | ○ | ○ |
KStream-to-KTable | N/A | N/A | ○ |
DSL
文法を全部説明するのは無理なので、主要な命令と、雰囲気を紹介します。Java8のラムダ式と一緒に使うと簡潔に処理を記述することができます。Spark Streaming+Scalaと雰囲気はそっくりです。トランスフォームの操作をするとKStreamがKTableになったり、KTableをKStreamに戻すために.toStream()
を呼んだりするところが独特です。
ソース
何はなくともKafkaに接続しないと始まりません。KStreamBuilder
でKStreamとKTableを作ります。
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
KStreamBuilder builder = new KStreamBuilder();
// PageViewsトピックにサブスクライブしてKStreamオブジェクトを作る
KStream<String, GenericRecord> pageViews = builder.stream("PageViews");
// UserProfilesトピックにサブスクライブしてKTableオブジェクトを作る。
KTable<String, GenericRecord> userProfiles = builder.table("UserProfiles");
トランスフォーム
大文字に変換する例です。Spark Streamingとよく似ています。
KStream<Long, String> uppercased =
nicknameByUserId.mapValues(nickname -> nickname.toUpperCase());
集約の例
ワードカウントの例。これもSpark Streamingに似ています
KStream<String, String> textLines = ...;
KStream<String, Long> wordCounts = textLines
// 空白で区切って、小文字に正規化。
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
// wordの数を数えます。この操作ではストリームのタイプがKStream<String, String>から
// KTable<String, Long> (word -> count)に変わります。テーブルには名前が必要で
// Countsと名前をつけました。
.countByKey("Counts")
// 最後にKTable<String, Long>をKStream<String, Long>に変換
.toStream();
窓関数
タンブリングウィンドウの定義の例。
KStream<String, GenericRecord> viewsByUser = ...;
KTable<Windowed<String>, Long> userCounts =
// 1分ごとに5分の長さの窓関数を適用してカウント
viewsByUser.countByKey(TimeWindows.of("GeoPageViewsWindow", 5 * 60 * 1000L).advanceBy(60 * 1000L));
JOIN
JOINの例です。userClicksStreamには(user, clicks)が入ってきます。userRegionsTableは(user, region)が入っています。userをキーにLEFT JOINする例です。JavaはScalaと違ってタプルをサポートしていないのでRegionWithClicksというクラスを別途定義してオブジェクトを返しています。
KStream<String, Long> userClicksStream = ...;
KTable<String, String> userRegionsTable = ...;
KStream<String, RegionWithClicks> userClicksWithRegion = userClicksStream
.leftJoin(userRegionsTable,
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks));
独自プロセッサの作り方と適用
プロセッサを作るにはProcessor
インターフェースを実装したクラスを作り、.process( () -> new MyProcessor())
とするだけです。詳細はマニュアルをごらんください...。
Kafkaへパブリッシュ
そして最後にKafkaに処理結果を書き戻します。userCountByRegionストリームをKafkaのRegionCountsTopicにパブリッシュする例。これは専用DSLなので簡単です。
userCountByRegion.to("RegionCountsTopic");
まとめ
以上駆け足でKafka-Streamsの紹介を行いました。
- Kafkaからレコードを読み込んで、処理を行い、Kafkaの別のトピックにパブリッシュできる
- Kafka専用のDSLなのでSparkと違いKafkaへの接続、障害回復処理などが全て用意されている。
- アプリの書き方はSpark Streamingによく似ているがKafka以外外部のシステムに依存しないので開発もデプロイも楽。
アプリの流れは次の通り。
- ストリームはKStreamBuilderで作る
- ストリームのタイプはKStreamとKTableがある。
- builder.stream("sourceTopic")
- builder.table("sourceTopic")
- KTableはステート(状態)の維持にも使える
- ストリームはJOINできる
- ストリームは集約できる
- 集約には窓を適用することができる。
- トランスフォームの操作によってKStreamがKTableになることがある。
- KTableは.toStream()でKStreamになる
- 独自処理はstream.process()で呼び出せる
- stream.to("sinkTopic")でKafkaにパブリッシュ
出典
この記事の内容はApache Kafkaのマニュアルとサンプルコードを元に僕の理解で書きました。
また次のブログも参考にしています。
https://kafka.apache.org/documentation#streams
http://docs.confluent.io/3.1.1/streams/
https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/
おまけ
Kafka 0.10.1ではクエリー機能が付いているそうです。Kafka Streamsが持つローカルストアの内容をクエリーで問い合わせることができます。読み出し専用で書き込みはできません。REST APIをアプリに組み込めば外部から問い合わせも可能となります。ここはSpark Streamingから一歩進んだ機能でしょう。僕自身試していないので確かなことは言えませんが、興味を持たれた方は次の記事とGithubのコードを参照してください。「リアルタイムTOP 10 ヒットチャート」のような問い合わせができるようです。
https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
https://github.com/confluentinc/examples/blob/master/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java