Kinesis AnalyticsをSQLクエリについて、AWS側がテンプレートを用意してくれてるのですが、微妙に間違っていたりするのでまとめました。また、ハマりどころも多かったのでまとめています。
Kinesis Analyticsを使用する上でのポイント
・Stream名とデータ名の名前は大文字小文字で区別される
→ Stream名もしくはデータ名を「"」で括ると小文字になり、括らないと大文字と認識される
→ (個人的には)Create Stream文内のStream名とデータ名は大文字がよい
・Kinesis Streams demoは良く停止する
→ 一旦削除して、新たに作成したほうがよい
・Reference Dataを追加すると、[Could not find Stream]というエラーが発生する
・SQLコードを[Save and Running]させると実行まで時間が掛かるときがある
- Random_CUT関数は30分ほど時間を要する場合がある
・テンプレートが微妙に間違っているときがある
所感
・構築は楽
・KPUはオートスケーリングだが、キャップ機能はないのでお金に注意
・結構、バグがある。コードのupdatingからRUNNINGまで時間が掛かるときがある
・ドキュメントが少ない
・テンプレートが間違っているので注意。高度な分析の関数はまじでわからん・・・
紹介するクエリは下記の通りです。
基本的なクエリ
・CREATE PUMP & STREAM
・SELECT
・WHERE
・MULTI STREAMS
・GROUP BY
・WINDOW
1.TUMBLING WINDOW
2.SLIDING WINDOW
・INNER JOIN
高度な分析
・RANDOM_CUT_FOREST
・DISTINCT
・GROUP RANK ※できていない
・TOP-K ※できていない
基本的な構文
CREATE PUMP & STREAM, SELECT
・[OR REPLACE]はSTREAM名が既に存在すれば置き換えるという意味
・AnalyticsはPUMPにデータを投入後に、STREAMにデータを格納する
・関数名を列名に使用できない(avg、min,maxなど)
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_STNBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001";

WHERE
・基本的に標準SQLと同じ
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE SECTOR LIKE '%ALTH%'
;

MULTI STREAMS
・PUMP名は一緒にしてはならない
・MULTI STREAMSを定義した場合、AWSコンソールのreal-time analyticsタブ内の「In-application streams」に複数STREAMが表れる
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(16),
PRICE REAL,
CHANGE DECIMAL);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE SECTOR LIKE '%ALTH%'
;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT STREAM ticker_symbol, sector, change, price
FROM SOURCE_SQL_STREAM_001
WHERE PRICE > 0.0
;

GROUP BY(TUMBLING WINDOW)
・構文は通常SQLと同様だが、TIME WINDOWを設定する必要がある(TUMBLING WINDOWと言う)
・ROWTIMEはKinesis Analyticsに予め用意されている時刻で収集した時刻が格納されている
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM
ticker_symbol,
COUNT(*) AS ticker_symbol_count,
AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

SLIDING WINDOW
・SLIDING WINDOWを使用する場合、GROUP BY句は使用せず、集約関数(avg,min,count等)の直後にWINDOWを定義する
・WINDOW幅の設定はTIMEとROWの2種類が存在する(微妙に書き方が違うので注意)
CREATE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG_T REAL,
TICKER_SYMBOL_COUNT_T INTEGER,
TICKER_SYMBOL_AVG_R REAL,
TICKER_SYMBOL_COUNT_R INTEGER);
CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
avg(price) over TSW as ticker_symbol_avg1,
count(*) over TSW as cnt1,
avg(price) over RSW as ticker_symbol_avg2,
count(*) over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol ROWS 2 preceding);

INNER JOIN
・異なるSTREAM同士のJoin処理は可能
・Reference Dataを追加する動作しない(2017/3月時点)
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER,
TICKER_SYMBOL_AVG REAL);
CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT
STREAM A.ticker_symbol,
A.ticker_symbol_count,
B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol

INNER JOINで使用しているDESTINATION_SQL_STREAMとDESTINATION_SQL_STREAM2の作成
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_COUNT INTEGER
);
CREATE OR REPLACE PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT
STREAM ticker_symbol,
count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
TICKER_SYMBOL_AVG REAL
);
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM ticker_symbol,
avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol,
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);
Reference Dataの追加
・AWS CLIでのみ追加が可能
・current-application-version-idはdescribe-applicationで確認する
aws kinesisanalytics add-application-reference-data-source \
--region us-east-1 \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'
アプリケーションの確認
・aws configureで事前に設定しておくこと
aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]
Reference Dataの削除
・version-idとreference-idはdescribe-applicationで確認する
aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]
高度な分析
・高度な分析ではCursol関数を使用。(標準SQLと結構違うので最初はだいぶ戸惑う)
・書き方は「SELECT STREAM * FROM(TABLE(xxxFUNCTION(COURSOR([in-application-stream],option)));」
RANDOM_CUT_FOREST(異常検知)
・記載する情報は「in-application-stream」、「numberOfTrees(default:100)」、 「subSampleSize(default:256)」、「timeDecay(default:100,000)」、 「shingleSize (default:1)」の5つ
・詳細はマニュアルを・・・(正直あまり理解していない)
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
TICKER_SYMBOL VARCHAR(4),
SECTOR VARCHAR(10),
PRICE DOUBLE,
CHANGE DOUBLE,
ANOMALY_SCORE DOUBLE);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
TABLE(RANDOM_CUT_FOREST(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
)
) ;
CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT
STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND),
ANOMALY_SCORE DESC;

DISTINCT COUNT
・COUNT_DISTINCT_ITEMS_TUMBLING関数に記載する情報は「in-application-stream」、「列名」、「TUMBLING WINDOW時間」の3つ。列名は1つのみで複数列の指定はできない
・出力される情報はDistinct Countした情報のみ
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'TICKER_SYMBOL', -- name of column in single quotes
10 -- tumbling window size in seconds
)
);

GROUP RANK
・すみません、いまのところエラーが出て不明。(ドキュメント読んだのだが・・・)
・エラー内容は「SQL error message: From line 2, column 55 to line 9, column 3: No match found for function signature GROUP_RANK(, , , , , , )」
・使用したクエリは下記。
・情報があれば共有してほしいです
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
TICKER_SYMBOL VARCHAR(4),
PRICE real,
RANK_NUM integer
);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
'PRICE', -- rankByColumnName
'rank_num', -- rankOutColumnName
'desc', -- sortOrder
'asc', --outputOrder
10, --maxIdle
5 --outputMax
)
);
TOP-K
・ドキュメントを読んで、下記の通りで記載したが、SELECT STREAM後のticker_symbolがないというエラーが発生
・ただ、テンプレートにあるクエリは回っていて、TOP-K関数からでてくる値はITEM,ITEM_COUNTという謎の2列のようです。(詳細についてドキュメントがされず、色々と試したけど何も出力されてないので)
・情報があれば共有してほしいです
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(
TICKER_SYMBOL VARCHAR(4),
PRICE REAL)
;
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
'ticker_symbol', -- name of column in single quotes
10, -- number of top items
10 -- tumbling window size in seconds
)
);
いまのところ、やってみたクエリを整理して紹介しました。