Confluent から KSQL のリリースがアナウンスされました。 Kafka 上で SQL によるストリーム処理が可能となるツールです。早速使ってみましょう。
参考
- Introducing KSQL: Open Source Streaming SQL for Apache Kafka
- KSQL from Confluent | Streaming SQL for Apache Kafka™ - YouTube
- ksql/docs/quickstart at 0.1.x · confluentinc/ksql
Apache Kafka とは
Apache Kafka は LinkedIn 製の分散メッセージングシステムです。トピックによるメッセージ管理、コンシューマーグループごとキューイングなどの機能を持ちます。
KSQL とは
KSQL は Kafka やストリーム処理のマネージドプラットフォームサービスを提供している Confluent が作成した Kafka 上で SQL を使うための OSS プロダクトです。
- Web: https://www.confluent.io/product/ksql/
- GitHub: https://github.com/confluentinc/ksql
- Documents: https://github.com/confluentinc/ksql/tree/0.1.x/docs/
- Blog Post: https://www.confluent.io/blog/ksql-open-source-streaming-sql-for-apache-kafka/
アプリケーションのログデータからエラーの発生を検知しする、などに使えるようです。
セットアップ
Docker で立ち上げる方法が手軽です。まずはリポジトリをダウンロードしてます。
$ git clone git@github.com:confluentinc/ksql.git
Cloning into 'ksql'...
remote: Counting objects: 17903, done.
remote: Total 17903 (delta 0), reused 0 (delta 0), pack-reused 17903
Receiving objects: 100% (17903/17903), 4.14 MiB | 560.00 KiB/s, done.
Resolving deltas: 100% (8522/8522), done.
あとは ksql/docs/quickstart
内で docker-compose up
すれば Kafka と Zookeeper 、 KSQL が立ち上がります。
$ cd ksql/docs/quickstart
$ ls
README.md docker-compose.yml ksql-quickstart-schemas.jpg quickstart-docker.md quickstart-non-docker.md
$ docker-compose up -d
Creating network "quickstart_default" with the default driver
Pulling zookeeper (confluentinc/cp-zookeeper:latest)...
latest: Pulling from confluentinc/cp-zookeeper
...
$ docker-compose ps
Name Command State Ports
-------------------------------------------------------------------------------------------------------------------------------------
quickstart_kafka_1 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
quickstart_ksql-cli_1 perl -e while(1){ sleep 99 ... Up
quickstart_ksql-datagen-pageviews_1 bash -c echo Waiting for K ... Up
quickstart_ksql-datagen-users_1 bash -c echo Waiting for K ... Up
quickstart_schema-registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
quickstart_zookeeper_1 /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp
コンテナ内の KSQL CLI に入るには docker-compose exec
を使います。下記のコマンドをそのまま実行するだけ。
$ docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092
======================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Kafka =
Copyright 2017 Confluent Inc.
CLI v0.1, Server v0.1 located at http://localhost:9098
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
ストリームとテーブルの作成
Quick Start では、下記のスキーマを持つ pageviews
と users
というトピックを例にします。
ksql/ksql-quickstart-schemas.jpg at 0.1.x · confluentinc/ksql
pageviews
と users
からストリーム、およびテーブルを作成するには下記のクエリを実行します。
-- ストリームの作成
CREATE STREAM pageviews_original(
viewtime bigint,
userid varchar,
pageid varchar
) WITH(
kafka_topic = 'pageviews',
value_format = 'DELIMITED'
);
-- テーブルの作成
CREATE TABLE users_original(
registertime bigint,
gender varchar,
regionid varchar,
userid varchar
) WITH(
kafka_topic = 'users',
value_format = 'JSON'
);
KSQL CLI 上で実行してみます。ちゃんと作成されました。 ROWTIME
と ROWKEY
というカラムが自動作成されています。 ROWTIME
には Kafka のメッセージタイムスタンプが、 ROWKEY
にはメッセージキーが入ります。
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
Message
----------------
Stream created
ksql> DESCRIBE pageviews_original;
Field | Type
----------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
VIEWTIME | BIGINT
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
ksql> CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON');
Message
---------------
Table created
ksql> DESCRIBE users_original;
Field | Type
--------------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
REGISTERTIME | BIGINT
GENDER | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
USERID | VARCHAR(STRING)
ストリームとテーブルの一覧を取得するには、下記のクエリを実行します。
ksql> SHOW STREAMS;
Stream Name | Kafka Topic | Format
-----------------------------------------------------------------
PAGEVIEWS_ORIGINAL | pageviews | DELIMITED
ksql> SHOW TABLES;
Table Name | Kafka Topic | Format | Windowed
--------------------------------------------------------------
USERS_ORIGINAL | users | JSON | false
クエリの書き方
SELECT によるメッセージの取得
SELECT
ではストリームからデータを取得できます。デフォルトではクエリが実行されたあとのメッセージが取得できるようです。 LIMIT
を設定しないとメッセージが表示され続けます。データの取得を終了するときは <ctrl-c>
を実行しましょう。
ksql> SELECT pageid FROM pageviews_original LIMIT 3;
Page_51
Page_37
Page_75
LIMIT reached for the partition.
Query terminated
他のストリームやテーブルを利用したストリームの作成
複数のストリームやテーブルを作って新たにストリームを作成することができます。ここでは性別が 女性 のユーザーのページビュー情報を取得するストリームを作成しています。結果は Kafka の PAGEVIEWS_FEMALE
トピックに流されます。
CREATE STREAM pageviews_female AS
SELECT
users_original.userid AS userid,
pageid,
regionid,
gender
FROM
pageviews_original
LEFT JOIN
users_original
ON pageviews_original.userid = users_original.userid
WHERE
gender = 'FEMALE'
;
ksql> CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
Message
----------------------------
Stream created and running
ksql> DESCRIBE pageviews_female;
Field | Type
----------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
SELECT
を実行してみます。ちゃんと FEMALE
のデータだけが取得できました!
ksql> SELECT * FROM pageviews_female LIMIT 3;
1503994511734 | User_7 | User_7 | Page_54 | Region_1 | FEMALE
1503994518152 | User_5 | User_5 | Page_94 | Region_3 | FEMALE
1503994519403 | User_5 | User_5 | Page_14 | Region_3 | FEMALE
LIMIT reached for the partition.
Query terminated
LIKE による絞り込み
LIKE 句を使ってさらにデータを絞り込み、トピック名の指定も可能なようですね。
CREATE STREAM pageviews_female_like_89 WITH(
kafka_topic = 'pageviews_enriched_r8_r9',
value_format = 'DELIMITED'
) AS
SELECT
*
FROM
pageviews_female
WHERE
regionid LIKE '%_8'
OR regionid LIKE '%_9'
;
Window 処理
ストリーム処理では重要な Window 処理も実行できます。ここでは地域ごとにページビューを集計 pageviews_female
から取得します。
CREATE TABLE pageviews_regions AS
SELECT
gender,
regionid,
COUNT(*) AS numusers
FROM
pageviews_female WINDOW TUMBLING(
size 30 second
)
GROUP BY
gender,
regionid
HAVING COUNT(*) > 1
;
作成した pageviews_regions
ストリームを取得します。
ksql> SELECT regionid, numusers FROM pageviews_regions LIMIT 5;
Region_8 | 3
Region_8 | 4
Region_6 | 2
Region_4 | 2
Region_6 | 4
LIMIT reached for the partition.
Query terminated
クエリの確認
実行したクエリの一覧は SHOW QUERIES
で取得できます。 SELECT
によるメッセージの取得や、はじめに実行した CREATE STREAM
のようなクエリは出力されません。 Kafka の特定のトピックに結果を出力するクエリのみ表示されます。
SHOW QUERIES;
Query ID | Kafka Topic | Query String
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
2 | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
3 | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;
あとかたづけ
クエリの停止は TERMINATE
を使います。クエリ ID を指定しましょう。
ksql> TERMINATE 2;
KSQL CLI を抜けるには exit
を実行します。
ksql> exit
Exiting KSQL.
コンテナの停止も忘れずに。
$ docker-compose down
Stopping quickstart_ksql-cli_1 ... done
Stopping quickstart_ksql-datagen-users_1 ... done
Stopping quickstart_ksql-datagen-pageviews_1 ... done
Stopping quickstart_schema-registry_1 ... done
Stopping quickstart_kafka_1 ... done
Stopping quickstart_zookeeper_1 ... done
Removing quickstart_ksql-cli_1 ... done
Removing quickstart_ksql-datagen-users_1 ... done
Removing quickstart_ksql-datagen-pageviews_1 ... done
Removing quickstart_schema-registry_1 ... done
Removing quickstart_kafka_1 ... done
Removing quickstart_zookeeper_1 ... done
Removing network quickstart_default
感想
- 複数のトピックにまたがった集計処理が簡単に書けて便利
- 集計結果を特定のトピックにまとめることができる
- ログからアラートに投げるとか、バッチ処理に使うとか、いろいろできそう
- ストリームとテーブルの違いが難しい。
- concepts にストリームとテーブルの説明があるが…うーん?