12
Help us understand the problem. What are the problem?

More than 5 years have passed since last update.

posted at

updated at

Kinesis AnalyticsのSQLクエリ

Kinesis AnalyticsをSQLクエリについて、AWS側がテンプレートを用意してくれてるのですが、微妙に間違っていたりするのでまとめました。また、ハマりどころも多かったのでまとめています。

Kinesis Analyticsを使用する上でのポイント

・Stream名とデータ名の名前は大文字小文字で区別される
  → Stream名もしくはデータ名を「"」で括ると小文字になり、括らないと大文字と認識される
  → (個人的には)Create Stream文内のStream名とデータ名は大文字がよい
・Kinesis Streams demoは良く停止する
  → 一旦削除して、新たに作成したほうがよい
・Reference Dataを追加すると、[Could not find Stream]というエラーが発生する
・SQLコードを[Save and Running]させると実行まで時間が掛かるときがある
  - Random_CUT関数は30分ほど時間を要する場合がある
・テンプレートが微妙に間違っているときがある

所感

・構築は楽
・KPUはオートスケーリングだが、キャップ機能はないのでお金に注意
・結構、バグがある。コードのupdatingからRUNNINGまで時間が掛かるときがある
・ドキュメントが少ない
・テンプレートが間違っているので注意。高度な分析の関数はまじでわからん・・・

紹介するクエリは下記の通りです。

基本的なクエリ

・CREATE PUMP & STREAM
・SELECT
・WHERE
・MULTI STREAMS
・GROUP BY
・WINDOW
 1.TUMBLING WINDOW
 2.SLIDING WINDOW
・INNER JOIN

高度な分析

・RANDOM_CUT_FOREST
・DISTINCT
・GROUP RANK ※できていない
・TOP-K ※できていない

基本的な構文

CREATE PUMP & STREAM, SELECT

・[OR REPLACE]はSTREAM名が既に存在すれば置き換えるという意味
・AnalyticsはPUMPにデータを投入後に、STREAMにデータを格納する
・関数名を列名に使用できない(avg、min,maxなど)

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_STNBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001";

SELECT.png

WHERE

・基本的に標準SQLと同じ

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM "SOURCE_SQL_STREAM_001"
   WHERE SECTOR LIKE '%ALTH%' 
;

WHERE.png

MULTI STREAMS

・PUMP名は一緒にしてはならない
・MULTI STREAMSを定義した場合、AWSコンソールのreal-time analyticsタブ内の「In-application streams」に複数STREAMが表れる

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR        VARCHAR(16),
   PRICE         REAL,
   CHANGE        DECIMAL);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE SECTOR LIKE '%ALTH%' 
;


CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
   SELECT STREAM ticker_symbol, sector, change, price
   FROM SOURCE_SQL_STREAM_001
   WHERE PRICE > 0.0
;

MULTISTREAM.png

GROUP BY(TUMBLING WINDOW)

・構文は通常SQLと同様だが、TIME WINDOWを設定する必要がある(TUMBLING WINDOWと言う)
・ROWTIMEはKinesis Analyticsに予め用意されている時刻で収集した時刻が格納されている

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER,
  TICKER_SYMBOL_AVG REAL
);

CREATE OR REPLACE  PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM 
   ticker_symbol, 
   COUNT(*) AS ticker_symbol_count,
   AVG(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

GROUPBY.png

SLIDING WINDOW

・SLIDING WINDOWを使用する場合、GROUP BY句は使用せず、集約関数(avg,min,count等)の直後にWINDOWを定義する
・WINDOW幅の設定はTIMEとROWの2種類が存在する(微妙に書き方が違うので注意)

CREATE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4), 
    TICKER_SYMBOL_AVG_T REAL,
    TICKER_SYMBOL_COUNT_T INTEGER,
    TICKER_SYMBOL_AVG_R REAL,
    TICKER_SYMBOL_COUNT_R INTEGER);

CREATE PUMP STREAM_PUMP as insert into DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,
              avg(price) over TSW as ticker_symbol_avg1,
              count(*)  over TSW as cnt1,
              avg(price) over RSW as ticker_symbol_avg2,
              count(*)  over RSW as cnt2
FROM SOURCE_SQL_STREAM_001
WINDOW TSW as (partition by ticker_symbol Range interval '10' second preceding),
RSW as (partition by ticker_symbol  ROWS 2 preceding);

GROUP BY SLID.png

INNER JOIN

・異なるSTREAM同士のJoin処理は可能
・Reference Dataを追加する動作しない(2017/3月時点)

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM3 (
  TICKER_SYMBOL VARCHAR(4),
  TICKER_SYMBOL_COUNT INTEGER, 
  TICKER_SYMBOL_AVG REAL);

CREATE OR REPLACE PUMP STREAM_PUMP3 AS INSERT INTO DESTINATION_SQL_STREAM3
SELECT 
   STREAM A.ticker_symbol,  
          A.ticker_symbol_count,
          B.ticker_symbol_avg
FROM DESTINATION_SQL_STREAM as A
INNER JOIN DESTINATION_SQL_STREAM2 as B
ON A.ticker_symbol = B.ticker_symbol

INNER JOIN.png

INNER JOINで使用しているDESTINATION_SQL_STREAMとDESTINATION_SQL_STREAM2の作成

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_COUNT INTEGER
);

CREATE OR REPLACE  PUMP STREAM_PUMP1 AS INSERT INTO DESTINATION_SQL_STREAM
SELECT 
  STREAM ticker_symbol, 
         count(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
  TICKER_SYMBOL VARCHAR(4), 
  TICKER_SYMBOL_AVG REAL
);

CREATE OR REPLACE  PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
  STREAM ticker_symbol, 
         avg(price) AS ticker_symbol_avg
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
         FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

Reference Dataの追加

・AWS CLIでのみ追加が可能
・current-application-version-idはdescribe-applicationで確認する

aws kinesisanalytics add-application-reference-data-source \
--region us-east-1   \
--application-name [your-kinesis-analytics-name] \
--current-application-version-id [?] \
--reference-data-source '{"TableName":"companyname","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::[バケット名]","FileKey":"[ファイル名]","ReferenceRoleARN":"arn:aws:iam::[アカウントID]:role/service-role/[ロール名]"},
"ReferenceSchema":{ "RecordFormat":{"RecordFormatType":[CSV or json], "MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","} }},
"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"type","SqlType":"varchar(5)"},{ "Name":"company","SqlType":"varchar(10)"}]}}'

アプリケーションの確認

・aws configureで事前に設定しておくこと

aws kinesisanalytics describe-application --application-name [your-kinesis-analytics-name]

Reference Dataの削除

・version-idとreference-idはdescribe-applicationで確認する

aws kinesisanalytics delete-application-reference-data-source --application-name [your-kinesis-analytics-name] --current-application-version-id [?] --reference-id [?]

高度な分析

・高度な分析ではCursol関数を使用。(標準SQLと結構違うので最初はだいぶ戸惑う)
・書き方は「SELECT STREAM * FROM(TABLE(xxxFUNCTION(COURSOR([in-application-stream],option)));」

RANDOM_CUT_FOREST(異常検知)

・記載する情報は「in-application-stream」、「numberOfTrees(default:100)」、 「subSampleSize(default:256)」、「timeDecay(default:100,000)」、 「shingleSize (default:1)」の5つ
・詳細はマニュアルを・・・(正直あまり理解していない)

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM2 (
   TICKER_SYMBOL    VARCHAR(4),
   SECTOR VARCHAR(10),
   PRICE   DOUBLE,
   CHANGE DOUBLE,
   ANOMALY_SCORE  DOUBLE);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM  ticker_symbol,sector,price,change,ANOMALY_SCORE FROM
      TABLE(RANDOM_CUT_FOREST(
        CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
      )
    ) ;

CREATE OR REPLACE PUMP STREAM_PUMP2 AS INSERT INTO DESTINATION_SQL_STREAM2
SELECT 
   STREAM * FROM DESTINATION_SQL_STREAM
ORDER BY FLOOR(DESTINATION_SQL_STREAM.ROWTIME TO SECOND), 
         ANOMALY_SCORE DESC;

RANDOMCUT.png

DISTINCT COUNT

・COUNT_DISTINCT_ITEMS_TUMBLING関数に記載する情報は「in-application-stream」、「列名」、「TUMBLING WINDOW時間」の3つ。列名は1つのみで複数列の指定はできない
・出力される情報はDistinct Countした情報のみ

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (NUMBER_OF_DISTINCT_ITEMS BIGINT);
CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM NUMBER_OF_DISTINCT_ITEMS FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'TICKER_SYMBOL', -- name of column in single quotes
  10 -- tumbling window size in seconds
  )
);

Distinct.png

GROUP RANK

・すみません、いまのところエラーが出て不明。(ドキュメント読んだのだが・・・)
・エラー内容は「SQL error message: From line 2, column 55 to line 9, column 3: No match found for function signature GROUP_RANK(, , , , , , )」
・使用したクエリは下記。
・情報があれば共有してほしいです

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    TICKER_SYMBOL VARCHAR(4),
    PRICE real,
    RANK_NUM integer
);

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol,price,rank_num FROM TABLE(GROUP_RANK(
  CURSOR(SELECT STREAM ticker_symbol,price FROM "SOURCE_SQL_STREAM_001"),
  'PRICE', -- rankByColumnName
  'rank_num', -- rankOutColumnName
  'desc', -- sortOrder
  'asc', --outputOrder
  10, --maxIdle
  5 --outputMax
  )
);

TOP-K

・ドキュメントを読んで、下記の通りで記載したが、SELECT STREAM後のticker_symbolがないというエラーが発生
・ただ、テンプレートにあるクエリは回っていて、TOP-K関数からでてくる値はITEM,ITEM_COUNTという謎の2列のようです。(詳細についてドキュメントがされず、色々と試したけど何も出力されてないので)
・情報があれば共有してほしいです

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM(
    TICKER_SYMBOL VARCHAR(4), 
    PRICE REAL)
;

CREATE OR REPLACE PUMP STREAM_PUMP AS INSERT INTO DESTINATION_SQL_STREAM
SELECT STREAM ticker_symbol, price FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
  'ticker_symbol', -- name of column in single quotes
  10, -- number of top items
  10 -- tumbling window size in seconds
  )
);

いまのところ、やってみたクエリを整理して紹介しました。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
12
Help us understand the problem. What are the problem?