25
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache Kafka 上で SQL が実行できる KSQL を使ってみる

Last updated at Posted at 2017-08-29

Confluent から KSQL のリリースがアナウンスされました。 Kafka 上で SQL によるストリーム処理が可能となるツールです。早速使ってみましょう。

参考

Apache Kafka とは

Apache Kafka は LinkedIn 製の分散メッセージングシステムです。トピックによるメッセージ管理、コンシューマーグループごとキューイングなどの機能を持ちます。

kafka
Apache Kafka

KSQL とは

KSQL は Kafka やストリーム処理のマネージドプラットフォームサービスを提供している Confluent が作成した Kafka 上で SQL を使うための OSS プロダクトです。

アプリケーションのログデータからエラーの発生を検知しする、などに使えるようです。

セットアップ

Docker で立ち上げる方法が手軽です。まずはリポジトリをダウンロードしてます。

$ git clone git@github.com:confluentinc/ksql.git
Cloning into 'ksql'...
remote: Counting objects: 17903, done.
remote: Total 17903 (delta 0), reused 0 (delta 0), pack-reused 17903
Receiving objects: 100% (17903/17903), 4.14 MiB | 560.00 KiB/s, done.
Resolving deltas: 100% (8522/8522), done.

あとは ksql/docs/quickstart 内で docker-compose up すれば Kafka と Zookeeper 、 KSQL が立ち上がります。

$ cd ksql/docs/quickstart 
$ ls
README.md                   docker-compose.yml          ksql-quickstart-schemas.jpg quickstart-docker.md        quickstart-non-docker.md
$ docker-compose up -d
Creating network "quickstart_default" with the default driver
Pulling zookeeper (confluentinc/cp-zookeeper:latest)...
latest: Pulling from confluentinc/cp-zookeeper
...
$ docker-compose ps
               Name                              Command               State                           Ports                          
-------------------------------------------------------------------------------------------------------------------------------------
quickstart_kafka_1                    /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp       
quickstart_ksql-cli_1                 perl -e while(1){ sleep 99 ...   Up                                                             
quickstart_ksql-datagen-pageviews_1   bash -c echo Waiting for K ...   Up                                                             
quickstart_ksql-datagen-users_1       bash -c echo Waiting for K ...   Up                                                             
quickstart_schema-registry_1          /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp                                 
quickstart_zookeeper_1                /etc/confluent/docker/run        Up      2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp

コンテナ内の KSQL CLI に入るには docker-compose exec を使います。下記のコマンドをそのまま実行するだけ。

$ docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092
                       ======================================
                       =      _  __ _____  ____  _          =
                       =     | |/ // ____|/ __ \| |         =
                       =     | ' /| (___ | |  | | |         =
                       =     |  <  \___ \| |  | | |         =
                       =     | . \ ____) | |__| | |____     =
                       =     |_|\_\_____/ \___\_\______|    =
                       =                                    =
                       =   Streaming SQL Engine for Kafka   =
Copyright 2017 Confluent Inc.                         

CLI v0.1, Server v0.1 located at http://localhost:9098

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

ストリームとテーブルの作成

Quick Start では、下記のスキーマを持つ pageviewsusers というトピックを例にします。


ksql/ksql-quickstart-schemas.jpg at 0.1.x · confluentinc/ksql

pageviewsusers からストリーム、およびテーブルを作成するには下記のクエリを実行します。

ストリームとテーブルの作成クエリ
-- ストリームの作成 
CREATE STREAM pageviews_original(
  viewtime bigint,
  userid varchar,
  pageid varchar
) WITH(
  kafka_topic = 'pageviews',
  value_format = 'DELIMITED'
);

-- テーブルの作成
CREATE TABLE users_original(
  registertime bigint,
  gender varchar,
  regionid varchar,
  userid varchar
) WITH(
  kafka_topic = 'users',
  value_format = 'JSON'
);

KSQL CLI 上で実行してみます。ちゃんと作成されました。 ROWTIMEROWKEY というカラムが自動作成されています。 ROWTIME には Kafka のメッセージタイムスタンプが、 ROWKEY にはメッセージキーが入ります。

ストリームとテーブルの作成
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');

 Message        
----------------
 Stream created 
ksql> DESCRIBE pageviews_original;

 Field    | Type            
----------------------------
 ROWTIME  | BIGINT          
 ROWKEY   | VARCHAR(STRING) 
 VIEWTIME | BIGINT          
 USERID   | VARCHAR(STRING) 
 PAGEID   | VARCHAR(STRING) 

ksql> CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON');

 Message       
---------------
 Table created 
ksql> DESCRIBE users_original;

 Field        | Type            
--------------------------------
 ROWTIME      | BIGINT          
 ROWKEY       | VARCHAR(STRING) 
 REGISTERTIME | BIGINT          
 GENDER       | VARCHAR(STRING) 
 REGIONID     | VARCHAR(STRING) 
 USERID       | VARCHAR(STRING)

ストリームとテーブルの一覧を取得するには、下記のクエリを実行します。

ストリームとテーブルの一覧
ksql> SHOW STREAMS;

 Stream Name              | Kafka Topic              | Format    
-----------------------------------------------------------------
 PAGEVIEWS_ORIGINAL       | pageviews                | DELIMITED 

ksql> SHOW TABLES;

 Table Name        | Kafka Topic       | Format    | Windowed 
--------------------------------------------------------------
 USERS_ORIGINAL    | users             | JSON      | false   

クエリの書き方

SELECT によるメッセージの取得

SELECT ではストリームからデータを取得できます。デフォルトではクエリが実行されたあとのメッセージが取得できるようです。 LIMIT を設定しないとメッセージが表示され続けます。データの取得を終了するときは <ctrl-c> を実行しましょう。

SELECT文の実行
ksql> SELECT pageid FROM pageviews_original LIMIT 3;
Page_51
Page_37
Page_75
LIMIT reached for the partition.
Query terminated

他のストリームやテーブルを利用したストリームの作成

複数のストリームやテーブルを作って新たにストリームを作成することができます。ここでは性別が 女性 のユーザーのページビュー情報を取得するストリームを作成しています。結果は Kafka の PAGEVIEWS_FEMALE トピックに流されます。

ストリームを作成するクエリ
CREATE STREAM pageviews_female AS
SELECT
  users_original.userid AS userid,
  pageid,
  regionid,
  gender
FROM
  pageviews_original
  LEFT JOIN
    users_original
  ON  pageviews_original.userid = users_original.userid
WHERE
  gender = 'FEMALE'
;
ストリームの作成
ksql> CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';

 Message                    
----------------------------
 Stream created and running 
ksql> DESCRIBE pageviews_female;

 Field    | Type            
----------------------------
 ROWTIME  | BIGINT          
 ROWKEY   | VARCHAR(STRING) 
 USERID   | VARCHAR(STRING) 
 PAGEID   | VARCHAR(STRING) 
 REGIONID | VARCHAR(STRING) 
 GENDER   | VARCHAR(STRING) 

SELECT を実行してみます。ちゃんと FEMALE のデータだけが取得できました!

SELECTの実行
ksql> SELECT * FROM pageviews_female LIMIT 3;
1503994511734 | User_7 | User_7 | Page_54 | Region_1 | FEMALE
1503994518152 | User_5 | User_5 | Page_94 | Region_3 | FEMALE
1503994519403 | User_5 | User_5 | Page_14 | Region_3 | FEMALE
LIMIT reached for the partition.
Query terminated

LIKE による絞り込み

LIKE 句を使ってさらにデータを絞り込み、トピック名の指定も可能なようですね。

LIKE句による絞り込みと結果のKafkaへの出力
CREATE STREAM pageviews_female_like_89 WITH(
  kafka_topic = 'pageviews_enriched_r8_r9',
  value_format = 'DELIMITED'
) AS
SELECT
  *
FROM
  pageviews_female
WHERE
  regionid LIKE '%_8'
OR  regionid LIKE '%_9'
;

Window 処理

ストリーム処理では重要な Window 処理も実行できます。ここでは地域ごとにページビューを集計 pageviews_female から取得します。

Window処理
CREATE TABLE pageviews_regions AS
SELECT
  gender,
  regionid,
  COUNT(*) AS numusers
FROM
  pageviews_female WINDOW TUMBLING(
    size 30 second
  )
GROUP BY
  gender,
  regionid
HAVING COUNT(*) > 1
;

作成した pageviews_regions ストリームを取得します。

Window集計の取得
ksql> SELECT regionid, numusers FROM pageviews_regions LIMIT 5;
Region_8 | 3
Region_8 | 4
Region_6 | 2
Region_4 | 2
Region_6 | 4
LIMIT reached for the partition.
Query terminated

クエリの確認

実行したクエリの一覧は SHOW QUERIES で取得できます。 SELECT によるメッセージの取得や、はじめに実行した CREATE STREAM のようなクエリは出力されません。 Kafka の特定のトピックに結果を出力するクエリのみ表示されます。

SHOW_QUERIES
SHOW QUERIES;

 Query ID | Kafka Topic       | Query String                                                                                                                                                                                                                      
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 1        | PAGEVIEWS_FEMALE  | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE'; 
 2        | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';     
 3        | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;     

あとかたづけ

クエリの停止は TERMINATE を使います。クエリ ID を指定しましょう。

ksql> TERMINATE 2;

KSQL CLI を抜けるには exit を実行します。

ksql> exit
Exiting KSQL.

コンテナの停止も忘れずに。

$ docker-compose down
Stopping quickstart_ksql-cli_1 ... done
Stopping quickstart_ksql-datagen-users_1 ... done
Stopping quickstart_ksql-datagen-pageviews_1 ... done
Stopping quickstart_schema-registry_1 ... done
Stopping quickstart_kafka_1 ... done
Stopping quickstart_zookeeper_1 ... done
Removing quickstart_ksql-cli_1 ... done
Removing quickstart_ksql-datagen-users_1 ... done
Removing quickstart_ksql-datagen-pageviews_1 ... done
Removing quickstart_schema-registry_1 ... done
Removing quickstart_kafka_1 ... done
Removing quickstart_zookeeper_1 ... done
Removing network quickstart_default

感想

  • 複数のトピックにまたがった集計処理が簡単に書けて便利
    • 集計結果を特定のトピックにまとめることができる
    • ログからアラートに投げるとか、バッチ処理に使うとか、いろいろできそう
  • ストリームとテーブルの違いが難しい。
    • concepts にストリームとテーブルの説明があるが…うーん?
25
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
25
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?