2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ksqlちょいテク集

Last updated at Posted at 2021-12-15

ねらい

ksql自習のために下記ガイドから気になる部分を拾って実際に動かしたものをまとめただけの記事でございます。元ネタを見れば全て済むことだが、何か少しでも参考になれば。

元ネタ:How-to guide

How-to guideから拾ったksql表現集

構造化データ

STRUCTによる構造化データ
CREATE ...
  hoge STRUCT <
       hoge_C1 VARCHAR,
       hoge_C2 INT
       >

INSERT INTO ...
       , STRUCT(hoge_C1 :='a', hoge_C2 :=2)

SELECT ...
       hoge->hoge_C1,
       hoge->hoge_C2,

このチュートリアルにもありましたね。Stream/Table内に構造化データを作成するにはCREATE時にSTRUCT<>を使用する。下記例ではAvro Schema形式を利用し、b列の中にVARCHARcINTdを格納している。

構造化データの作成
CREATE STREAM s2 (
    a VARCHAR KEY,
    b STRUCT<
        c VARCHAR,
        d INT
    >
) WITH (
    kafka_topic = 's2',
    partitions = 1,
    value_format = 'avro'
);

構造化データへのINSERTにもSTRUCT()を使用できる。

構造化データのINSERT
INSERT INTO s2 (
    a, b
) VALUES (
    'k1', STRUCT(c := 'v1', d := 5)
);

INSERT INTO s2 (
    a, b
) VALUES (
    'k2', STRUCT(c := 'v2', d := 6)
);

INSERT INTO s2 (
    a, b
) VALUES (
    'k3', STRUCT(c := 'v3', d := 7)
);

構造化データの取り出しには->を使用する。

構造化データの取り出し
SELECT a,
       b,
       b->c,
       b->d
FROM s2
EMIT CHANGES;
SELECT結果
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|A                                         |B                                         |C                                         |D                                         |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|k1                                        |{C=v1, D=5}                               |v1                                        |5                                         |
|k2                                        |{C=v2, D=6}                               |v2                                        |6                                         |
|k3                                        |{C=v3, D=7}                               |v3                                        |7                                         |

Map

MAPによる構造化データ
CREATE ...
  hoge MAP <
       VARCHAR
       INT
       >

INSERT INTO ...
       , MAP('hoge_c1' :=1, 'hoge_c2=':=2)

SELECT ...
       hoge['hoge_c1'] AS HOGE_C1,
       hoge['hoge_c2'] AS HOGE_C2,

Map形式も使用できる。keyとvalueの組み合わせで何でも入れられるが、key/valueのデータ型は常に一定である必要がある。Stream/Table内にMapを作成するにはCREATE時にMAP<>を使用する。下記例においてb列はVARCHARINTのMapになる。

MAPの作成
CREATE STREAM s3 (
    a VARCHAR KEY,
    b MAP<VARCHAR, INT>
) WITH (
    kafka_topic = 's3',
    partitions = 1,
    value_format = 'avro'
);

MapへのINSERTにはMAP()を使用する。

MAPへのINSERT
INSERT INTO s3 (
    a, b
) VALUES (
    'k1', MAP('c' := 2, 'd' := 4)
);

INSERT INTO s3 (
    a, b
) VALUES (
    'k2', MAP('c' := 4, 'd' := 8)
);

INSERT INTO s3 (
    a, b
) VALUES (
    'k3', MAP('c' := 8, 'd' := 16)
);

Mapの取り出しには[]でKeyに入れた値を指定している。

MAPの取り出し
SELECT a,
       b,
       b['c'] AS C,
       b['d'] AS D
FROM s3
EMIT CHANGES;
SELECT結果
+---------------------------+---------------------------+---------------------------+---------------------------+
|A                          |B                          |C                          |D                          |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k1                         |{c=2, d=4}                 |2                          |4                          |
|k2                         |{c=4, d=8}                 |4                          |8                          |
|k3                         |{c=8, d=16}                |8                          |16                         |

配列

ARRAYによる構造化データ
CREATE ...
  hoge ARRAY <INT>

INSERT INTO ...
       , ARRAY[10,20]

SELECT ...
       hoge[1] as hoge_1, hoge[2] as hoge_2

単一のデータ型を横に並べたものを配列として取り扱うことができる。Stream/Table内に配列を作成するにはCREATE時にARRAY<>を使用する。下記例ではb列がINT型の配列になる。

配列の作成
CREATE STREAM s4 (
    a VARCHAR KEY,
    b ARRAY<INT>
) WITH (
    kafka_topic = 's4',
    partitions = 1,
    value_format = 'avro'
);

配列へのINSERTにはARRAY[]を使用する。

配列へのINSERT
INSERT INTO s4 (
    a, b
) VALUES (
    'k1', ARRAY[1]
);

INSERT INTO s4 (
    a, b
) VALUES (
    'k2', ARRAY[2, 3]
);

INSERT INTO s4 (
    a, b
) VALUES (
    'k3', ARRAY[4, 5, 6]
);

配列の取り出しには[]を使用する。[]内のIndex指定は左から右への要素並び順を指定する。Indexに対する負数指定(-1)は逆方向を意味するため、この例では配列中の最後の要素にアクセスすることになる。

配列の取り出し
SELECT a,
       b,
       b[1] AS b_1,
       b[2] AS b_2,
       b[3] AS b_3,
       b[-1] AS b_minus_1
FROM s4
EMIT CHANGES;
SELECT結果
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|A                |B                |B_1              |B_2              |B_3              |B_MINUS_1        |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|k1               |[1]              |1                |null             |null             |1                |
|k2               |[2, 3]           |2                |3                |null             |3                |
|k3               |[4, 5, 6]        |4                |5                |6                |6                |

最新オフセットのデータ取り出し

LATEST_BY_OFFSET関数の利用
CREATE ... AS
    SELECT ...,
           LATEST_BY_OFFSET(hoge) AS hoge,

このチュートリアルにもありましたね。SQL集計関数としてLATEST_BY_OFFSETを用いることで、最後に挿入されたデータをTableに反映させることができる。

以下のStreamがあるとして:

STREAMの作成
CREATE STREAM s1 (
    k VARCHAR KEY,
    v1 INT,
    v2 VARCHAR,
    v3 BOOLEAN
) WITH (
    kafka_topic = 's1',
    partitions = 1,
    value_format = 'avro'
);

Message Keyとして k1k2には2回、k3には1回INSERTが行われている状況で:

STREAMへのINSERT
INSERT INTO s1 (
    k, v1, v2, v3
) VALUES (
    'k1', 0, 'a', true
);

INSERT INTO s1 (
    k, v1, v2, v3
) VALUES (
    'k2', 1, 'b', false
);

INSERT INTO s1 (
    k, v1, v2, v3
) VALUES (
    'k1', 2, 'c', false
);

INSERT INTO s1 (
    k, v1, v2, v3
) VALUES (
    'k3', 3, 'd', true
);

INSERT INTO s1 (
    k, v1, v2, v3
) VALUES (
    'k2', 4, 'e', true
);

v1, v2, v3の最新オフセットのデータを取り出すためのTableを以下のように作成できる。(LATEST_BY_OFFSET使用時にGROUP BYでMessage Keyを指定しているが、集計関数だからだろう)

LATEST_BY_OFFSETを用いたTABLEの作成
CREATE TABLE t1 AS
    SELECT k,
           LATEST_BY_OFFSET(v1) AS v1,
           LATEST_BY_OFFSET(v2) AS v2,
           LATEST_BY_OFFSET(v3) AS v3
    FROM s1
    GROUP BY k
    EMIT CHANGES;

pull queryで気持ちよく検索すると、v1, v2, v3それぞれ最後にINSERTされた値(≒Topic中の最新オフセットの情報)が返る。

SELECT結果
ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k1';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K                          |V1                         |V2                         |V3                         |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k1                         |2                          |c                          |false                      |
Query terminated

ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k2';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K                          |V1                         |V2                         |V3                         |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k2                         |4                          |e                          |true                       |
Query terminated

ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k3';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K                          |V1                         |V2                         |V3                         |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k3                         |3                          |d                          |true                       |
Query terminated

表/列等の名称への英小文字使用

ksqlは省略時値で表や列の全ての名称を英大文字に変換してしまうのだが、英小文字を使いたければ``で囲みましょう、というだけ。

CREATE時の小文字使用
CREATE STREAM `s2_Case` (
    `foo` VARCHAR KEY,
    `BAR` INT,
    `Baz` VARCHAR,
    `grault` STRUCT<
        `Corge` VARCHAR,
        `garply` INT
    >,
    qux INT
) WITH (
    kafka_topic = 's2_Case',
    partitions = 1,
    value_format = 'avro'
);

DESCRIBEしてみる。

DESCRIBE結果
Name                 : s2_Case
 Field  | Type
--------------------------------------------------------
 foo    | VARCHAR(STRING)  (key)
 BAR    | INTEGER
 Baz    | VARCHAR(STRING)
 grault | STRUCT<Corge VARCHAR(STRING), garply INTEGER>
 QUX    | INTEGER
--------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

INSERTやSELECTでもStream/Table名や列名を``で囲む必要があるのが面倒くさい。SELECT指定で囲まなかった列名quxおよびqux2はksql応答時に大文字に変換されている点に注意。

INSERTでの小文字指定
INSERT INTO `s2_Case` (
    `foo`, `BAR`, `Baz`, `grault`, qux
) VALUES (
    'k1', 1, 'x', STRUCT(`Corge` := 'v1', `garply` := 5), 2
);
SELECTでの小文字指定
SELECT `foo`,
       `BAR`,
       `Baz`,
       `grault`->`Corge`,
       `grault`->`garply`,
       qux,
       QUX AS qux2
FROM `s2`
EMIT CHANGES;
SELECT結果
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|foo           |BAR           |Baz           |Corge         |garply        |QUX           |QUX2          |
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|k1            |1             |x             |v1            |5             |2             |2             |

タイムスタンプ列の定義と使用

イベント処理を念頭に置くとKafkaメッセージ中のタイムスタンプの取り扱いはかなり大切になると思われる。例えば以下のようなタイムスタンプ的な情報のts列を含むStreamがあるとして:

STREAM例
CREATE STREAM s1_time (
    k VARCHAR KEY,
    ts VARCHAR,
    v1 INT,
    v2 VARCHAR
) WITH (
    kafka_topic = 's1_time',
    partitions = 1,
    value_format = 'avro'
);

以下のデータを入れたとする:

INSERTによるデータ投入
INSERT INTO s1_time (
    k, ts, v1, v2
) VALUES (
    'k1', '2020-05-04 01:00:00', 0, 'a'
);

INSERT INTO s1_time (
    k, ts, v1, v2
) VALUES (
    'k2', '2020-05-04 02:00:00', 1, 'b'
);

Stream内には暗黙的なシステム列としてROWTIMEというものがあり、何も指定しなければKafkaメッセージがTopicに書き込まれた時間が保持されている。これをTIMESTAMPTOSTRING関数を用いて人間が読める形式にしてみる。

ROWTIME列およびTIMESTAMPTOSTRINGの検索
SELECT k,
       ROWTIME,
       TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS rowtime_formatted,
       ts,
       v1,
       v2
FROM s1_time
EMIT CHANGES;
SELECT結果
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|K                                    |ROWTIME                              |ROWTIME_FORMATTED                    |TS                                   |V1                                   |V2                                   |
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|k1                                   |1638949784646                        |2021-12-08 16:49:44.646              |2020-05-04 01:00:00                  |0                                    |a                                    |
|k2                                   |1638949802709                        |2021-12-08 16:50:02.709              |2020-05-04 02:00:00                  |1                                    |b                                    |

一方、Kafkaメッセージが書き込まれた時間ではなくKafkaメッセージ中の特定のフィールドをタイムスタンプとして取り扱いたいのであれば、CREATE時のWITH内に以下のようにtimestampおよびtimestamp_format指定を行う。

CREATE時のタイムスタンプ列指定
CREATE STREAM s2_time WITH (
    timestamp = 'ts',
    timestamp_format = 'yyyy-MM-dd HH:mm:ss'
)   AS
    SELECT *
    FROM s1_time
    EMIT CHANGES;

このStreamの中を見ると、ROWTIMEがメッセージ書き込み時間ではなくts列に格納されていた時間に変わっていることが分かる。これは便利そう。

ROWTIME列およびTIMESTAMPTOSTRINGの検索
SELECT k,
       ROWTIME,
       TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS rowtime_formatted,
       ts,
       v1,
       v2
FROM s2_time
EMIT CHANGES;
SELECT結果
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|K                                    |ROWTIME                              |ROWTIME_FORMATTED                    |TS                                   |V1                                   |V2                                   |
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|k1                                   |1588521600000                        |2020-05-04 01:00:00.000              |2020-05-04 01:00:00                  |0                                    |a                                    |
|k2                                   |1588525200000                        |2020-05-04 02:00:00.000              |2020-05-04 02:00:00                  |1                                    |b                                    |

もちろん、上記例のようにStreamからStreamを再定義するのではなく、最初から以下のようにタイムスタンプに使用する列を指定してStreamを作成してもよい。この場合、INSERT直後にROWTIMEts列由来の値になる。

CREATE時のタイムスタンプ列指定
CREATE STREAM s3_time (
    k VARCHAR KEY,
    ts VARCHAR,
    v1 INT
) WITH (
    kafka_topic = 's3_time',
    partitions = 1,
    value_format = 'avro',
    timestamp = 'ts',
    timestamp_format = 'yyyy-MM-dd HH:mm:ss'
);

なお、timestamp_formatを指定しない場合、タイムスタンプの元になる列はROWTIME列と同じようにBIGINTのUnix時間形式で格納されている必要がある点に注意。

変数

DEFINEで変数を定義してksql中で使用することが出来る。ksql内での変数の呼び出しには${}を記載する。

DEFINEによる変数定義
DEFINE format = 'AVRO';
DEFINE replicas = '3';

CREATE STREAM str1_variables (
  id INT
) WITH (
  kafka_topic = 'str1_variables',
  value_format = '${format}',
  partitions = ${replicas}
);

定義した変数はSHOW VARIABLESで確認できる。

SHOW_VARIABLES応答
ksql> SHOW VARIABLES;

 Variable Name | Value
-----------------------
 format        | AVRO
 replicas      | 3
-----------------------

変数定義の削除にはUNDEFINEを、エスケープには$$を使用する。(エスケープの意味はあまりよく分からないが…)

変数定義の削除とエスケープ
UNDEFINE replicas;

DEFINE format = 'AVRO';
SELECT '$${format}' FROM stream;

変数が指定できるのはテキストやリテラル、列名、Stream/Table名であり、予約語には使用できない。以下に変数を使用したksql発行例を示す。

変数を使用したksql発行例
DEFINE streamName = 'str2_variables'
DEFINE colName1 = 'col1'
DEFINE colName2 = 'col2'
DEFINE format = 'AVRO'
DEFINE replicas = '3'
DEFINE topicName = 'str2_variables'
DEFINE val1 = '1'
DEFINE val2 = 'HOGE'

CREATE STREAM ${streamName} (
  ${colName1} INT,
  ${colName2} STRING
) WITH (
  kafka_topic = '${topicName}',
  value_format = '${format}',
  partitions = ${replicas}
);

INSERT INTO ${streamName} (
  ${colName1},
  ${colName2}
) VALUES (
  ${val1},
  '${val2}'
);

SELECT * FROM ${streamName}
WHERE ${colName1} = ${val1} and ${colName2} = '${val2}'
EMIT CHANGES; 

(注意)なぜかSELECTにおいて列名に変数を使用している部分({colName1}{colName2})がIllegal argumentで実行エラーになってしまった(confluent 6.2.1)が、あまり深く追及していない・・・

ラムダ式

ksqlDBは構造化データをラムダ式で処理することもできる。演算子には=>を使用、引数は最大3個。呼び出し関数はTRANSFORMによる変換、 REDUCEによる集約、FILTERによる選択の3つ。

TRANSFORM

Mapを含むStreamを作りTRANSFORMを適用したStreamに変換する例を示す。変換条件は下記のとおりとする。

  • Map中のKeyをUCASE関数で大文字化

  • Map中のValueを+5

TRANSFORMの利用
CREATE STREAM stream1_lambda (
  id INT,
  lambda_map MAP<STRING, INTEGER>
) WITH (
  kafka_topic = 'stream1_lambda',
  partitions = 1,
  value_format = 'avro'
);

CREATE STREAM output AS
  SELECT id, 
  TRANSFORM(lambda_map, (k, v) => UCASE(k), (k, v) => v + 5) 
  FROM stream1_lambda;

実際に値を挿入してTRANSFORMされたMapを観察してみる(KSQL_COL_0は内部的に命名されたTRANSFORM列)。指定の通りにMap内の値が変換されていることがわかる。

値の挿入と確認
INSERT INTO stream1_lambda (
  id, lambda_map
) VALUES (
  3, MAP('hello':= 15, 'goodbye':= -5)
);

SELECT * FROM stream1_lambda EMIT CHAGES;

SELECT id, ksq_col_0 AS final_output 
  FROM output EMIT CHANGES;
SELECT結果(元のSTREAM)
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |LAMBDA_MAP                                              |
+--------------------------------------------------------+--------------------------------------------------------+
|3                                                       |{goodbye=-5, hello=15}                                  |
SELECT結果(TRANSFORM後)
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |FINAL_OUTPUT                                            |
+--------------------------------------------------------+--------------------------------------------------------+
|3                                                       |{GOODBYE=0, HELLO=20}                                   |

REDUCE

配列を含むStreamを作りREDUCEを適用したStreamに変換する例を示す。CEIL関数で2値の割り算の小数点切り上げによる配列の集約をおこなっているのだが、第2パラメーターstate(この例では2)の意味が分からず理解できていない… むう。

REDUCEの利用
CREATE STREAM stream2_lambda (
  id INT,
  lambda_arr ARRAY<INTEGER>
) WITH (
  kafka_topic = 'stream2_lambda',
  partitions = 1,
  value_format = 'avro'
);

CREATE STREAM output2 AS
  SELECT id, 
  REDUCE(lambda_arr, 2, (s, x) => CEIL(x/s)) 
  FROM stream2_lambda
  EMIT CHANGES;

実際に値を挿入してREDUCEされた配列を観察してみる(KSQL_COL_0は内部的に命名されたREDUCE列)。集約結果は元ネタと同じ値の5になっているが、その理由が理解できておらず居心地が悪い。

値の挿入と確認
INSERT INTO stream2_lambda (
  id, lambda_arr
) VALUES (
  1, ARRAY[2, 3, 4, 5]
);

SELECT * FROM stream2_lambda EMIT CHAGES;

SELECT id, ksq_col_0 AS final_output 
  FROM output2 EMIT CHANGES;
SELECT結果(元のSTREAM)
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |LAMBDA_ARR                                              |
+--------------------------------------------------------+--------------------------------------------------------+
|1                                                       |[2, 3, 4, 5]                                            |
SELECT結果(REDUCE後)
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |FINAL_OUTPUT                                            |
+--------------------------------------------------------+--------------------------------------------------------+
|1                                                       |5                                                       |

FILTER

Mapを含むStreamを作りFILTERを適用したStreamに変換する例を示す。フィルターは下記のAND条件とする。

  • Map中のKeyに関しnameという文字列が出現するかをINSTR関数で調査(>0なので必然的に出現有無のみの判別になる)

  • Map中のValueに関しゼロ以外かを調査

TRANSFORMの利用
CREATE STREAM stream3_lambda (
  id INT,
  lambda_map MAP<STRING, INTEGER>
) WITH (
  kafka_topic = 'stream3_lambda',
  partitions = 1,
  value_format = 'avro'
);

CREATE STREAM output3 AS
  SELECT id, 
  FILTER(lambda_map, (k, v) => instr(k, 'name') > 0 AND v != 0) 
  FROM stream3_lambda
  EMIT CHANGES;

実際に値を挿入してFILTERされたMapを観察してみる(KSQL_COL_0は内部的に命名されたFILTER列)。指定の通りにMap内の値が選択されていることがわかる。

値の挿入と確認
INSERT INTO stream3_lambda (
  id, lambda_map
) VALUES (
  1, MAP('first name':= 15, 'middle':= 25, 'last name':= 0, 'alt name':= 33)
);

SELECT * FROM stream3_lambda EMIT CHAGES;

SELECT id, ksq_col_0 AS final_output 
  FROM output3 EMIT CHANGES;
SELECT結果(元のSTREAM)
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |LAMBDA_MAP                                              |
+--------------------------------------------------------+--------------------------------------------------------+
|1                                                       |{middle=25, last name=0, first name=15, alt name=33}    |
SELECT結果(FILTER後)
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |FINAL_OUTPUT                                            |
+--------------------------------------------------------+--------------------------------------------------------+
|1                                                       |{first name=15, alt name=33}                            |

まとめ

  • Kafkaメッセージのデータ構造をCREATEで指定する際には様々な構造化表現が使える
  • Kafkaメッセージ中のタイムスタンプ取り込みは非常に役立ちそう。
  • ラムダ式としてTRANSFORMREDUCEFILTERの3つが使える。これをうまく使えればわざわざユーザー定義関数をJavaで書かなくても済むかもしれない。
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?