Kinesis AnalyticsのSQLクエリ

More than 1 year has passed since last update.

Kinesis AnalyticsをSQLクエリについて、AWS側がテンプレートを用意してくれてるのですが、微妙に間違っていたりするのでまとめました。また、ハマりどころも多かったのでまとめています。


Kinesis Analyticsを使用する上でのポイント

・Stream名とデータ名の名前は大文字小文字で区別される

  → Stream名もしくはデータ名を「"」で括ると小文字になり、括らないと大文字と認識される

  → (個人的には)Create Stream文内のStream名とデータ名は大文字がよい

・Kinesis Streams demoは良く停止する

  → 一旦削除して、新たに作成したほうがよい

・Reference Dataを追加すると、[Could not find Stream]というエラーが発生する

・SQLコードを[Save and Running]させると実行まで時間が掛かるときがある

  - Random_CUT関数は30分ほど時間を要する場合がある

・テンプレートが微妙に間違っているときがある


所感

・構築は楽

・KPUはオートスケーリングだが、キャップ機能はないのでお金に注意

・結構、バグがある。コードのupdatingからRUNNINGまで時間が掛かるときがある

・ドキュメントが少ない

・テンプレートが間違っているので注意。高度な分析の関数はまじでわからん・・・

紹介するクエリは下記の通りです。


基本的なクエリ

・CREATE PUMP & STREAM

・SELECT

・WHERE

・MULTI STREAMS

・GROUP BY

・WINDOW

 1.TUMBLING WINDOW

 2.SLIDING WINDOW

・INNER JOIN


高度な分析

・RANDOM_CUT_FOREST

・DISTINCT

・GROUP RANK ※できていない

・TOP-K ※できていない


基本的な構文


CREATE PUMP & STREAM, SELECT

・[OR REPLACE]はSTREAM名が既に存在すれば置き換えるという意味

・AnalyticsはPUMPにデータを投入後に、STREAMにデータを格納する

・関数名を列名に使用できない(avg、min,maxなど)

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (

TICKER_STNBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001";

SELECT.png


WHERE

・基本的に標準SQLと同じ

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (

TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE SECTOR LIKE '%ALTH%'
;

WHERE.png


MULTI STREAMS

・PUMP名は一緒にしてはならない

・MULTI STREAMSを定義した場合、AWSコンソールのreal-time analyticsタブ内の「In-application streams」に複数STREAMが表れる

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (

TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE SECTOR LIKE '%ALTH%'
;

CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE PRICE > 0.0
;

MULTISTREAM.png


GROUP BY(TUMBLING WINDOW)

・構文は通常SQLと同様だが、TIME WINDOWを設定する必要がある(TUMBLING WINDOWと言う)

・ROWTIMEはKinesis Analyticsに予め用意されている時刻で収集した時刻が格納されている

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (

TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL
);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM
ticker_symbol,
COUNT(*) AS ticker_symbol_count,
AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

GROUPBY.png


SLIDING WINDOW

・SLIDING WINDOWを使用する場合、GROUP BY句は使用せず、集約関数(avg,min,count等)の直後にWINDOWを定義する

・WINDOW幅の設定はTIMEとROWの2種類が存在する(微妙に書き方が違うので注意)

CREATE STREAM DESTINATION_SQL_STREAM (

TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG_T REAL,
TICKER_SYMBOL_COUNT_T INTEGER,
TICKER_SYMBOL_AVG_R REAL,
TICKER_SYMBOL_COUNT_R INTEGER);

CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
avg(price) over TSW as ticker_symbol_avg1,
count(*) over TSW as cnt1,
avg(price) over RSW as ticker_symbol_avg2,
count(*) over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol ROWS 2 preceding);

GROUP BY SLID.png


INNER JOIN

・異なるSTREAM同士のJoin処理は可能

・Reference Dataを追加する動作しない(2017/3月時点)

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (

TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL);

CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT
STREAM A.ticker_symbol,
A.ticker_symbol_count,
B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol

INNER JOIN.png


INNER JOINで使用しているDESTINATION_SQL_STREAMとDESTINATION_SQL_STREAM2の作成

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (

TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER
);

CREATE OR REPLACE PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT
STREAM ticker_symbol,
count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG REAL
);

CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM ticker_symbol,
avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);


Reference Dataの追加

・AWS CLIでのみ追加が可能

・current-application-version-idはdescribe-applicationで確認する

aws kinesisanalytics add-application-reference-data-source \

--region us-east-1 \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'


アプリケーションの確認

・aws configureで事前に設定しておくこと

aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]


Reference Dataの削除

・version-idとreference-idはdescribe-applicationで確認する

aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]


高度な分析

・高度な分析ではCursol関数を使用。(標準SQLと結構違うので最初はだいぶ戸惑う)

・書き方は「SELECT STREAM * FROM(TABLE(xxxFUNCTION(COURSOR([in-application-stream],option)));」


RANDOM_CUT_FOREST(異常検知)

・記載する情報は「in-application-stream」、「numberOfTrees(default:100)」、 「subSampleSize(default:256)」、「timeDecay(default:100,000)」、 「shingleSize (default:1)」の5つ

・詳細はマニュアルを・・・(正直あまり理解していない)

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (

TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
TABLE(RANDOM_CUT_FOREST(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
)
) ;

CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND),
ANOMALY_SCORE DESC;

RANDOMCUT.png


DISTINCT COUNT

・COUNT_DISTINCT_ITEMS_TUMBLING関数に記載する情報は「in-application-stream」、「列名」、「TUMBLING WINDOW時間」の3つ。列名は1つのみで複数列の指定はできない

・出力される情報はDistinct Countした情報のみ

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'TICKER_SYMBOL', -- name of column in single quotes
10 -- tumbling window size in seconds
)
);

Distinct.png


GROUP RANK

・すみません、いまのところエラーが出て不明。(ドキュメント読んだのだが・・・)

・エラー内容は「SQL error message: From line 2, column 55 to line 9, column 3: No match found for function signature GROUP_RANK(, , , , , , )」

・使用したクエリは下記。

・情報があれば共有してほしいです

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (

TICKER_SYMBOL VARCHAR(4),
PRICE real,
RANK_NUM integer
);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
'PRICE', -- rankByColumnName
'rank_num', -- rankOutColumnName
'desc', -- sortOrder
'asc', --outputOrder
10, --maxIdle
5 --outputMax
)
);


TOP-K

・ドキュメントを読んで、下記の通りで記載したが、SELECT STREAM後のticker_symbolがないというエラーが発生

・ただ、テンプレートにあるクエリは回っていて、TOP-K関数からでてくる値はITEM,ITEM_COUNTという謎の2列のようです。(詳細についてドキュメントがされず、色々と試したけど何も出力されてないので)

・情報があれば共有してほしいです

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(

TICKER_SYMBOL VARCHAR(4),
PRICE REAL)
;

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'ticker_symbol', -- name of column in single quotes
10, -- number of top items
10 -- tumbling window size in seconds
)
);

いまのところ、やってみたクエリを整理して紹介しました。