Confluent から KSQL のリリースがアナウンスされました。 Kafka 上で SQL によるストリーム処理が可能となるツールです。早速使ってみましょう。
参考
- Introducing KSQL: Open Source Streaming SQL for Apache Kafka
 - KSQL from Confluent | Streaming SQL for Apache Kafka™ - YouTube
 - ksql/docs/quickstart at 0.1.x · confluentinc/ksql
 
Apache Kafka とは
Apache Kafka は LinkedIn 製の分散メッセージングシステムです。トピックによるメッセージ管理、コンシューマーグループごとキューイングなどの機能を持ちます。
KSQL とは
KSQL は Kafka やストリーム処理のマネージドプラットフォームサービスを提供している Confluent が作成した Kafka 上で SQL を使うための OSS プロダクトです。
- Web: https://www.confluent.io/product/ksql/
 - GitHub: https://github.com/confluentinc/ksql
 - Documents: https://github.com/confluentinc/ksql/tree/0.1.x/docs/
 - Blog Post: https://www.confluent.io/blog/ksql-open-source-streaming-sql-for-apache-kafka/
 
アプリケーションのログデータからエラーの発生を検知しする、などに使えるようです。
セットアップ
Docker で立ち上げる方法が手軽です。まずはリポジトリをダウンロードしてます。
$ git clone git@github.com:confluentinc/ksql.git
Cloning into 'ksql'...
remote: Counting objects: 17903, done.
remote: Total 17903 (delta 0), reused 0 (delta 0), pack-reused 17903
Receiving objects: 100% (17903/17903), 4.14 MiB | 560.00 KiB/s, done.
Resolving deltas: 100% (8522/8522), done.
あとは ksql/docs/quickstart 内で docker-compose up すれば Kafka と Zookeeper 、 KSQL が立ち上がります。
$ cd ksql/docs/quickstart 
$ ls
README.md                   docker-compose.yml          ksql-quickstart-schemas.jpg quickstart-docker.md        quickstart-non-docker.md
$ docker-compose up -d
Creating network "quickstart_default" with the default driver
Pulling zookeeper (confluentinc/cp-zookeeper:latest)...
latest: Pulling from confluentinc/cp-zookeeper
...
$ docker-compose ps
               Name                              Command               State                           Ports                          
-------------------------------------------------------------------------------------------------------------------------------------
quickstart_kafka_1                    /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp       
quickstart_ksql-cli_1                 perl -e while(1){ sleep 99 ...   Up                                                             
quickstart_ksql-datagen-pageviews_1   bash -c echo Waiting for K ...   Up                                                             
quickstart_ksql-datagen-users_1       bash -c echo Waiting for K ...   Up                                                             
quickstart_schema-registry_1          /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp                                 
quickstart_zookeeper_1                /etc/confluent/docker/run        Up      2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp
コンテナ内の KSQL CLI に入るには docker-compose exec を使います。下記のコマンドをそのまま実行するだけ。
$ docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092
                       ======================================
                       =      _  __ _____  ____  _          =
                       =     | |/ // ____|/ __ \| |         =
                       =     | ' /| (___ | |  | | |         =
                       =     |  <  \___ \| |  | | |         =
                       =     | . \ ____) | |__| | |____     =
                       =     |_|\_\_____/ \___\_\______|    =
                       =                                    =
                       =   Streaming SQL Engine for Kafka   =
Copyright 2017 Confluent Inc.                         
CLI v0.1, Server v0.1 located at http://localhost:9098
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql> 
ストリームとテーブルの作成
Quick Start では、下記のスキーマを持つ pageviews と users というトピックを例にします。

ksql/ksql-quickstart-schemas.jpg at 0.1.x · confluentinc/ksql
pageviews と users からストリーム、およびテーブルを作成するには下記のクエリを実行します。
-- ストリームの作成 
CREATE STREAM pageviews_original(
  viewtime bigint,
  userid varchar,
  pageid varchar
) WITH(
  kafka_topic = 'pageviews',
  value_format = 'DELIMITED'
);
-- テーブルの作成
CREATE TABLE users_original(
  registertime bigint,
  gender varchar,
  regionid varchar,
  userid varchar
) WITH(
  kafka_topic = 'users',
  value_format = 'JSON'
);
KSQL CLI 上で実行してみます。ちゃんと作成されました。 ROWTIME と ROWKEY というカラムが自動作成されています。 ROWTIME には Kafka のメッセージタイムスタンプが、 ROWKEY にはメッセージキーが入ります。
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
 Message        
----------------
 Stream created 
ksql> DESCRIBE pageviews_original;
 Field    | Type            
----------------------------
 ROWTIME  | BIGINT          
 ROWKEY   | VARCHAR(STRING) 
 VIEWTIME | BIGINT          
 USERID   | VARCHAR(STRING) 
 PAGEID   | VARCHAR(STRING) 
ksql> CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON');
 Message       
---------------
 Table created 
ksql> DESCRIBE users_original;
 Field        | Type            
--------------------------------
 ROWTIME      | BIGINT          
 ROWKEY       | VARCHAR(STRING) 
 REGISTERTIME | BIGINT          
 GENDER       | VARCHAR(STRING) 
 REGIONID     | VARCHAR(STRING) 
 USERID       | VARCHAR(STRING)
ストリームとテーブルの一覧を取得するには、下記のクエリを実行します。
ksql> SHOW STREAMS;
 Stream Name              | Kafka Topic              | Format    
-----------------------------------------------------------------
 PAGEVIEWS_ORIGINAL       | pageviews                | DELIMITED 
ksql> SHOW TABLES;
 Table Name        | Kafka Topic       | Format    | Windowed 
--------------------------------------------------------------
 USERS_ORIGINAL    | users             | JSON      | false   
クエリの書き方
SELECT によるメッセージの取得
SELECT ではストリームからデータを取得できます。デフォルトではクエリが実行されたあとのメッセージが取得できるようです。 LIMIT を設定しないとメッセージが表示され続けます。データの取得を終了するときは <ctrl-c> を実行しましょう。
ksql> SELECT pageid FROM pageviews_original LIMIT 3;
Page_51
Page_37
Page_75
LIMIT reached for the partition.
Query terminated
他のストリームやテーブルを利用したストリームの作成
複数のストリームやテーブルを作って新たにストリームを作成することができます。ここでは性別が 女性 のユーザーのページビュー情報を取得するストリームを作成しています。結果は Kafka の PAGEVIEWS_FEMALE トピックに流されます。
CREATE STREAM pageviews_female AS
SELECT
  users_original.userid AS userid,
  pageid,
  regionid,
  gender
FROM
  pageviews_original
  LEFT JOIN
    users_original
  ON  pageviews_original.userid = users_original.userid
WHERE
  gender = 'FEMALE'
;
ksql> CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
 Message                    
----------------------------
 Stream created and running 
ksql> DESCRIBE pageviews_female;
 Field    | Type            
----------------------------
 ROWTIME  | BIGINT          
 ROWKEY   | VARCHAR(STRING) 
 USERID   | VARCHAR(STRING) 
 PAGEID   | VARCHAR(STRING) 
 REGIONID | VARCHAR(STRING) 
 GENDER   | VARCHAR(STRING) 
SELECT を実行してみます。ちゃんと FEMALE のデータだけが取得できました!
ksql> SELECT * FROM pageviews_female LIMIT 3;
1503994511734 | User_7 | User_7 | Page_54 | Region_1 | FEMALE
1503994518152 | User_5 | User_5 | Page_94 | Region_3 | FEMALE
1503994519403 | User_5 | User_5 | Page_14 | Region_3 | FEMALE
LIMIT reached for the partition.
Query terminated
LIKE による絞り込み
LIKE 句を使ってさらにデータを絞り込み、トピック名の指定も可能なようですね。
CREATE STREAM pageviews_female_like_89 WITH(
  kafka_topic = 'pageviews_enriched_r8_r9',
  value_format = 'DELIMITED'
) AS
SELECT
  *
FROM
  pageviews_female
WHERE
  regionid LIKE '%_8'
OR  regionid LIKE '%_9'
;
Window 処理
ストリーム処理では重要な Window 処理も実行できます。ここでは地域ごとにページビューを集計 pageviews_female から取得します。
CREATE TABLE pageviews_regions AS
SELECT
  gender,
  regionid,
  COUNT(*) AS numusers
FROM
  pageviews_female WINDOW TUMBLING(
    size 30 second
  )
GROUP BY
  gender,
  regionid
HAVING COUNT(*) > 1
;
作成した pageviews_regions ストリームを取得します。
ksql> SELECT regionid, numusers FROM pageviews_regions LIMIT 5;
Region_8 | 3
Region_8 | 4
Region_6 | 2
Region_4 | 2
Region_6 | 4
LIMIT reached for the partition.
Query terminated
クエリの確認
実行したクエリの一覧は SHOW QUERIES で取得できます。 SELECT によるメッセージの取得や、はじめに実行した CREATE STREAM のようなクエリは出力されません。 Kafka の特定のトピックに結果を出力するクエリのみ表示されます。
SHOW QUERIES;
 Query ID | Kafka Topic       | Query String                                                                                                                                                                                                                      
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 1        | PAGEVIEWS_FEMALE  | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE'; 
 2        | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';     
 3        | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;     
あとかたづけ
クエリの停止は TERMINATE を使います。クエリ ID を指定しましょう。
ksql> TERMINATE 2;
KSQL CLI を抜けるには exit を実行します。
ksql> exit
Exiting KSQL.
コンテナの停止も忘れずに。
$ docker-compose down
Stopping quickstart_ksql-cli_1 ... done
Stopping quickstart_ksql-datagen-users_1 ... done
Stopping quickstart_ksql-datagen-pageviews_1 ... done
Stopping quickstart_schema-registry_1 ... done
Stopping quickstart_kafka_1 ... done
Stopping quickstart_zookeeper_1 ... done
Removing quickstart_ksql-cli_1 ... done
Removing quickstart_ksql-datagen-users_1 ... done
Removing quickstart_ksql-datagen-pageviews_1 ... done
Removing quickstart_schema-registry_1 ... done
Removing quickstart_kafka_1 ... done
Removing quickstart_zookeeper_1 ... done
Removing network quickstart_default
感想
- 複数のトピックにまたがった集計処理が簡単に書けて便利
- 集計結果を特定のトピックにまとめることができる
 - ログからアラートに投げるとか、バッチ処理に使うとか、いろいろできそう
 
 - ストリームとテーブルの違いが難しい。
- concepts にストリームとテーブルの説明があるが…うーん?
 
 
