ねらい
ksql自習のために下記ガイドから気になる部分を拾って実際に動かしたものをまとめただけの記事でございます。元ネタを見れば全て済むことだが、何か少しでも参考になれば。
元ネタ:How-to guide
How-to guideから拾ったksql表現集
構造化データ
CREATE ...
hoge STRUCT <
hoge_C1 VARCHAR,
hoge_C2 INT
>
INSERT INTO ...
, STRUCT(hoge_C1 :='a', hoge_C2 :=2)
SELECT ...
hoge->hoge_C1,
hoge->hoge_C2,
このチュートリアルにもありましたね。Stream/Table内に構造化データを作成するにはCREATE
時にSTRUCT<>
を使用する。下記例ではAvro Schema形式を利用し、b
列の中にVARCHAR
のc
とINT
のd
を格納している。
CREATE STREAM s2 (
a VARCHAR KEY,
b STRUCT<
c VARCHAR,
d INT
>
) WITH (
kafka_topic = 's2',
partitions = 1,
value_format = 'avro'
);
構造化データへのINSERT
にもSTRUCT()
を使用できる。
INSERT INTO s2 (
a, b
) VALUES (
'k1', STRUCT(c := 'v1', d := 5)
);
INSERT INTO s2 (
a, b
) VALUES (
'k2', STRUCT(c := 'v2', d := 6)
);
INSERT INTO s2 (
a, b
) VALUES (
'k3', STRUCT(c := 'v3', d := 7)
);
構造化データの取り出しには->
を使用する。
SELECT a,
b,
b->c,
b->d
FROM s2
EMIT CHANGES;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|A |B |C |D |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|k1 |{C=v1, D=5} |v1 |5 |
|k2 |{C=v2, D=6} |v2 |6 |
|k3 |{C=v3, D=7} |v3 |7 |
Map
CREATE ...
hoge MAP <
VARCHAR
INT
>
INSERT INTO ...
, MAP('hoge_c1' :=1, 'hoge_c2=':=2)
SELECT ...
hoge['hoge_c1'] AS HOGE_C1,
hoge['hoge_c2'] AS HOGE_C2,
Map形式も使用できる。keyとvalueの組み合わせで何でも入れられるが、key/valueのデータ型は常に一定である必要がある。Stream/Table内にMapを作成するにはCREATE
時にMAP<>
を使用する。下記例においてb
列はVARCHAR
とINT
のMapになる。
CREATE STREAM s3 (
a VARCHAR KEY,
b MAP<VARCHAR, INT>
) WITH (
kafka_topic = 's3',
partitions = 1,
value_format = 'avro'
);
MapへのINSERT
にはMAP()
を使用する。
INSERT INTO s3 (
a, b
) VALUES (
'k1', MAP('c' := 2, 'd' := 4)
);
INSERT INTO s3 (
a, b
) VALUES (
'k2', MAP('c' := 4, 'd' := 8)
);
INSERT INTO s3 (
a, b
) VALUES (
'k3', MAP('c' := 8, 'd' := 16)
);
Mapの取り出しには[]
でKeyに入れた値を指定している。
SELECT a,
b,
b['c'] AS C,
b['d'] AS D
FROM s3
EMIT CHANGES;
+---------------------------+---------------------------+---------------------------+---------------------------+
|A |B |C |D |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k1 |{c=2, d=4} |2 |4 |
|k2 |{c=4, d=8} |4 |8 |
|k3 |{c=8, d=16} |8 |16 |
配列
CREATE ...
hoge ARRAY <INT>
INSERT INTO ...
, ARRAY[10,20]
SELECT ...
hoge[1] as hoge_1, hoge[2] as hoge_2
単一のデータ型を横に並べたものを配列として取り扱うことができる。Stream/Table内に配列を作成するにはCREATE
時にARRAY<>
を使用する。下記例ではb
列がINT
型の配列になる。
CREATE STREAM s4 (
a VARCHAR KEY,
b ARRAY<INT>
) WITH (
kafka_topic = 's4',
partitions = 1,
value_format = 'avro'
);
配列へのINSERT
にはARRAY[]
を使用する。
INSERT INTO s4 (
a, b
) VALUES (
'k1', ARRAY[1]
);
INSERT INTO s4 (
a, b
) VALUES (
'k2', ARRAY[2, 3]
);
INSERT INTO s4 (
a, b
) VALUES (
'k3', ARRAY[4, 5, 6]
);
配列の取り出しには[]
を使用する。[]
内のIndex指定は左から右への要素並び順を指定する。Indexに対する負数指定(-1)は逆方向を意味するため、この例では配列中の最後の要素にアクセスすることになる。
SELECT a,
b,
b[1] AS b_1,
b[2] AS b_2,
b[3] AS b_3,
b[-1] AS b_minus_1
FROM s4
EMIT CHANGES;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|A |B |B_1 |B_2 |B_3 |B_MINUS_1 |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|k1 |[1] |1 |null |null |1 |
|k2 |[2, 3] |2 |3 |null |3 |
|k3 |[4, 5, 6] |4 |5 |6 |6 |
最新オフセットのデータ取り出し
CREATE ... AS
SELECT ...,
LATEST_BY_OFFSET(hoge) AS hoge,
このチュートリアルにもありましたね。SQL集計関数としてLATEST_BY_OFFSET
を用いることで、最後に挿入されたデータをTableに反映させることができる。
以下のStreamがあるとして:
CREATE STREAM s1 (
k VARCHAR KEY,
v1 INT,
v2 VARCHAR,
v3 BOOLEAN
) WITH (
kafka_topic = 's1',
partitions = 1,
value_format = 'avro'
);
Message Keyとして k1
、k2
には2回、k3
には1回INSERTが行われている状況で:
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k1', 0, 'a', true
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k2', 1, 'b', false
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k1', 2, 'c', false
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k3', 3, 'd', true
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k2', 4, 'e', true
);
v1
, v2
, v3
の最新オフセットのデータを取り出すためのTableを以下のように作成できる。(LATEST_BY_OFFSET
使用時にGROUP BY
でMessage Keyを指定しているが、集計関数だからだろう)
CREATE TABLE t1 AS
SELECT k,
LATEST_BY_OFFSET(v1) AS v1,
LATEST_BY_OFFSET(v2) AS v2,
LATEST_BY_OFFSET(v3) AS v3
FROM s1
GROUP BY k
EMIT CHANGES;
pull queryで気持ちよく検索すると、v1, v2, v3それぞれ最後にINSERTされた値(≒Topic中の最新オフセットの情報)が返る。
ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k1';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K |V1 |V2 |V3 |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k1 |2 |c |false |
Query terminated
ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k2';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K |V1 |V2 |V3 |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k2 |4 |e |true |
Query terminated
ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k3';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K |V1 |V2 |V3 |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k3 |3 |d |true |
Query terminated
表/列等の名称への英小文字使用
ksqlは省略時値で表や列の全ての名称を英大文字に変換してしまうのだが、英小文字を使いたければ``で囲みましょう、というだけ。
CREATE STREAM `s2_Case` (
`foo` VARCHAR KEY,
`BAR` INT,
`Baz` VARCHAR,
`grault` STRUCT<
`Corge` VARCHAR,
`garply` INT
>,
qux INT
) WITH (
kafka_topic = 's2_Case',
partitions = 1,
value_format = 'avro'
);
DESCRIBEしてみる。
Name : s2_Case
Field | Type
--------------------------------------------------------
foo | VARCHAR(STRING) (key)
BAR | INTEGER
Baz | VARCHAR(STRING)
grault | STRUCT<Corge VARCHAR(STRING), garply INTEGER>
QUX | INTEGER
--------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
INSERTやSELECTでもStream/Table名や列名を``で囲む必要があるのが面倒くさい。SELECT指定で囲まなかった列名qux
およびqux2
はksql応答時に大文字に変換されている点に注意。
INSERT INTO `s2_Case` (
`foo`, `BAR`, `Baz`, `grault`, qux
) VALUES (
'k1', 1, 'x', STRUCT(`Corge` := 'v1', `garply` := 5), 2
);
SELECT `foo`,
`BAR`,
`Baz`,
`grault`->`Corge`,
`grault`->`garply`,
qux,
QUX AS qux2
FROM `s2`
EMIT CHANGES;
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|foo |BAR |Baz |Corge |garply |QUX |QUX2 |
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|k1 |1 |x |v1 |5 |2 |2 |
タイムスタンプ列の定義と使用
イベント処理を念頭に置くとKafkaメッセージ中のタイムスタンプの取り扱いはかなり大切になると思われる。例えば以下のようなタイムスタンプ的な情報のts
列を含むStreamがあるとして:
CREATE STREAM s1_time (
k VARCHAR KEY,
ts VARCHAR,
v1 INT,
v2 VARCHAR
) WITH (
kafka_topic = 's1_time',
partitions = 1,
value_format = 'avro'
);
以下のデータを入れたとする:
INSERT INTO s1_time (
k, ts, v1, v2
) VALUES (
'k1', '2020-05-04 01:00:00', 0, 'a'
);
INSERT INTO s1_time (
k, ts, v1, v2
) VALUES (
'k2', '2020-05-04 02:00:00', 1, 'b'
);
Stream内には暗黙的なシステム列としてROWTIME
というものがあり、何も指定しなければKafkaメッセージがTopicに書き込まれた時間が保持されている。これをTIMESTAMPTOSTRING
関数を用いて人間が読める形式にしてみる。
SELECT k,
ROWTIME,
TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS rowtime_formatted,
ts,
v1,
v2
FROM s1_time
EMIT CHANGES;
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|K |ROWTIME |ROWTIME_FORMATTED |TS |V1 |V2 |
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|k1 |1638949784646 |2021-12-08 16:49:44.646 |2020-05-04 01:00:00 |0 |a |
|k2 |1638949802709 |2021-12-08 16:50:02.709 |2020-05-04 02:00:00 |1 |b |
一方、Kafkaメッセージが書き込まれた時間ではなくKafkaメッセージ中の特定のフィールドをタイムスタンプとして取り扱いたいのであれば、CREATE
時のWITH
内に以下のようにtimestamp
およびtimestamp_format
指定を行う。
CREATE STREAM s2_time WITH (
timestamp = 'ts',
timestamp_format = 'yyyy-MM-dd HH:mm:ss'
) AS
SELECT *
FROM s1_time
EMIT CHANGES;
このStreamの中を見ると、ROWTIME
がメッセージ書き込み時間ではなくts
列に格納されていた時間に変わっていることが分かる。これは便利そう。
SELECT k,
ROWTIME,
TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS rowtime_formatted,
ts,
v1,
v2
FROM s2_time
EMIT CHANGES;
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|K |ROWTIME |ROWTIME_FORMATTED |TS |V1 |V2 |
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|k1 |1588521600000 |2020-05-04 01:00:00.000 |2020-05-04 01:00:00 |0 |a |
|k2 |1588525200000 |2020-05-04 02:00:00.000 |2020-05-04 02:00:00 |1 |b |
もちろん、上記例のようにStreamからStreamを再定義するのではなく、最初から以下のようにタイムスタンプに使用する列を指定してStreamを作成してもよい。この場合、INSERT直後にROWTIME
がts
列由来の値になる。
CREATE STREAM s3_time (
k VARCHAR KEY,
ts VARCHAR,
v1 INT
) WITH (
kafka_topic = 's3_time',
partitions = 1,
value_format = 'avro',
timestamp = 'ts',
timestamp_format = 'yyyy-MM-dd HH:mm:ss'
);
なお、timestamp_format
を指定しない場合、タイムスタンプの元になる列はROWTIME
列と同じようにBIGINT
のUnix時間形式で格納されている必要がある点に注意。
変数
DEFINE
で変数を定義してksql中で使用することが出来る。ksql内での変数の呼び出しには${}
を記載する。
DEFINE format = 'AVRO';
DEFINE replicas = '3';
CREATE STREAM str1_variables (
id INT
) WITH (
kafka_topic = 'str1_variables',
value_format = '${format}',
partitions = ${replicas}
);
定義した変数はSHOW VARIABLES
で確認できる。
ksql> SHOW VARIABLES;
Variable Name | Value
-----------------------
format | AVRO
replicas | 3
-----------------------
変数定義の削除にはUNDEFINE
を、エスケープには$$
を使用する。(エスケープの意味はあまりよく分からないが…)
UNDEFINE replicas;
DEFINE format = 'AVRO';
SELECT '$${format}' FROM stream;
変数が指定できるのはテキストやリテラル、列名、Stream/Table名であり、予約語には使用できない。以下に変数を使用したksql発行例を示す。
DEFINE streamName = 'str2_variables'
DEFINE colName1 = 'col1'
DEFINE colName2 = 'col2'
DEFINE format = 'AVRO'
DEFINE replicas = '3'
DEFINE topicName = 'str2_variables'
DEFINE val1 = '1'
DEFINE val2 = 'HOGE'
CREATE STREAM ${streamName} (
${colName1} INT,
${colName2} STRING
) WITH (
kafka_topic = '${topicName}',
value_format = '${format}',
partitions = ${replicas}
);
INSERT INTO ${streamName} (
${colName1},
${colName2}
) VALUES (
${val1},
'${val2}'
);
SELECT * FROM ${streamName}
WHERE ${colName1} = ${val1} and ${colName2} = '${val2}'
EMIT CHANGES;
(注意)なぜかSELECTにおいて列名に変数を使用している部分({colName1}
と{colName2}
)がIllegal argument
で実行エラーになってしまった(confluent 6.2.1)が、あまり深く追及していない・・・
ラムダ式
ksqlDBは構造化データをラムダ式で処理することもできる。演算子には=>
を使用、引数は最大3個。呼び出し関数はTRANSFORM
による変換、 REDUCE
による集約、FILTER
による選択の3つ。
TRANSFORM
Mapを含むStreamを作りTRANSFORM
を適用したStreamに変換する例を示す。変換条件は下記のとおりとする。
-
Map中のKeyを
UCASE
関数で大文字化 -
Map中のValueを+5
CREATE STREAM stream1_lambda (
id INT,
lambda_map MAP<STRING, INTEGER>
) WITH (
kafka_topic = 'stream1_lambda',
partitions = 1,
value_format = 'avro'
);
CREATE STREAM output AS
SELECT id,
TRANSFORM(lambda_map, (k, v) => UCASE(k), (k, v) => v + 5)
FROM stream1_lambda;
実際に値を挿入してTRANSFORM
されたMapを観察してみる(KSQL_COL_0
は内部的に命名されたTRANSFORM列)。指定の通りにMap内の値が変換されていることがわかる。
INSERT INTO stream1_lambda (
id, lambda_map
) VALUES (
3, MAP('hello':= 15, 'goodbye':= -5)
);
SELECT * FROM stream1_lambda EMIT CHAGES;
SELECT id, ksq_col_0 AS final_output
FROM output EMIT CHANGES;
+--------------------------------------------------------+--------------------------------------------------------+
|ID |LAMBDA_MAP |
+--------------------------------------------------------+--------------------------------------------------------+
|3 |{goodbye=-5, hello=15} |
+--------------------------------------------------------+--------------------------------------------------------+
|ID |FINAL_OUTPUT |
+--------------------------------------------------------+--------------------------------------------------------+
|3 |{GOODBYE=0, HELLO=20} |
REDUCE
配列を含むStreamを作りREDUCE
を適用したStreamに変換する例を示す。CEIL
関数で2値の割り算の小数点切り上げによる配列の集約をおこなっているのだが、第2パラメーターのstate
(この例では2
)の意味が分からず理解できていない… むう。
CREATE STREAM stream2_lambda (
id INT,
lambda_arr ARRAY<INTEGER>
) WITH (
kafka_topic = 'stream2_lambda',
partitions = 1,
value_format = 'avro'
);
CREATE STREAM output2 AS
SELECT id,
REDUCE(lambda_arr, 2, (s, x) => CEIL(x/s))
FROM stream2_lambda
EMIT CHANGES;
実際に値を挿入してREDUCE
された配列を観察してみる(KSQL_COL_0
は内部的に命名されたREDUCE列)。集約結果は元ネタと同じ値の5になっているが、その理由が理解できておらず居心地が悪い。
INSERT INTO stream2_lambda (
id, lambda_arr
) VALUES (
1, ARRAY[2, 3, 4, 5]
);
SELECT * FROM stream2_lambda EMIT CHAGES;
SELECT id, ksq_col_0 AS final_output
FROM output2 EMIT CHANGES;
+--------------------------------------------------------+--------------------------------------------------------+
|ID |LAMBDA_ARR |
+--------------------------------------------------------+--------------------------------------------------------+
|1 |[2, 3, 4, 5] |
+--------------------------------------------------------+--------------------------------------------------------+
|ID |FINAL_OUTPUT |
+--------------------------------------------------------+--------------------------------------------------------+
|1 |5 |
FILTER
Mapを含むStreamを作りFILTER
を適用したStreamに変換する例を示す。フィルターは下記のAND条件とする。
-
Map中のKeyに関し
name
という文字列が出現するかをINSTR
関数で調査(>0
なので必然的に出現有無のみの判別になる) -
Map中のValueに関しゼロ以外かを調査
CREATE STREAM stream3_lambda (
id INT,
lambda_map MAP<STRING, INTEGER>
) WITH (
kafka_topic = 'stream3_lambda',
partitions = 1,
value_format = 'avro'
);
CREATE STREAM output3 AS
SELECT id,
FILTER(lambda_map, (k, v) => instr(k, 'name') > 0 AND v != 0)
FROM stream3_lambda
EMIT CHANGES;
実際に値を挿入してFILTER
されたMapを観察してみる(KSQL_COL_0
は内部的に命名されたFILTER列)。指定の通りにMap内の値が選択されていることがわかる。
INSERT INTO stream3_lambda (
id, lambda_map
) VALUES (
1, MAP('first name':= 15, 'middle':= 25, 'last name':= 0, 'alt name':= 33)
);
SELECT * FROM stream3_lambda EMIT CHAGES;
SELECT id, ksq_col_0 AS final_output
FROM output3 EMIT CHANGES;
+--------------------------------------------------------+--------------------------------------------------------+
|ID |LAMBDA_MAP |
+--------------------------------------------------------+--------------------------------------------------------+
|1 |{middle=25, last name=0, first name=15, alt name=33} |
+--------------------------------------------------------+--------------------------------------------------------+
|ID |FINAL_OUTPUT |
+--------------------------------------------------------+--------------------------------------------------------+
|1 |{first name=15, alt name=33} |
まとめ
- Kafkaメッセージのデータ構造をCREATEで指定する際には様々な構造化表現が使える
- Kafkaメッセージ中のタイムスタンプ取り込みは非常に役立ちそう。
- ラムダ式として
TRANSFORM
、REDUCE
、FILTER
の3つが使える。これをうまく使えればわざわざユーザー定義関数をJavaで書かなくても済むかもしれない。