概要
- Kinesis Data Streamのストリームデータを、ニアリアルタイムで直接Redshiftに取り込む機能
- Redshift内ではMaterialized Viewとして取り込まれる
- 今回はRedshift Serverlessを利用して試す
- シナリオは以下公式docのチュートリアル
参考)
KinesisストリームデータをRedshiftに投入する従来の方法として、Kinesis Data Firehoseの利用がある。ただしこの方式は、内部的にはS3を中継してCOPYでマイクロバッチ的に取り込む方式であり少なくとも数分単位以上のラグがあり、かつ2023年4月時点ではパブリックアプセス可能なProvisioned型Redshiftクラスターが対象(閉域内のRedshift Serverlessは未対応)など制約事項もある。
全体の流れ
- Redshift Serverlessをセットアップ
- ストリームデータ取り込みのためのRedshfit用IAM ROLEの設定
- Kinesis Data Streamのセットアップ
- Kinesis Data Generatorで検証用ダミーデータを作成
- 実際にRedshift Serverlessでリアルタイムにクエリ
いざ試す
[1] Redshift Serverlessをセットアップ
- 今回は記載省略
- 注意事項としては、NAT-GW または I-GW を使用してインターネット経由で Kinesis Data Streams エンドポイントに到達するルートがある、またはKinesis インターフェイス VPC エンドポイントがあること。
[2] ストリームデータ取り込みのためのRedshfit用IAM ROLEの設定
- RedshiftのIAM Roleに以下ポリシーをアタッチ
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadStream",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:*:0123456789:stream/*"
},
{
"Sid": "ListStream",
"Effect": "Allow",
"Action": [
"kinesis:ListStreams",
"kinesis:ListShards"
],
"Resource": "*"
}
]
}
[3] Kinesis Data Streamのセットアップ
- Amazon Kinesis Data Streams コンソールから、データストリームを作成
- データストリーム名は任意(例: redshiftinjection)
-
[Capacity mode]
(キャパシティーモード) で、[On-demand]
(オンデマンド) を選択して作るだけ。
[4] Kinesis Data Generatorで検証用ダミーデータを作成
- 以下のCloudFormation Templateで簡単にKinesis Data Generator(KDG)環境を構築できる
- 詳細手順は下記参照
- CFでKDGセットアップができたら、設定したIDとpwdでログインして、リージョンとストリーム名を前述でセットアップしたものを指定(例:redshiftinjection)。さらに、Templateに下記JSONを貼り付けて「Send data」実行すると、指定したデータストリームにダミーデータが出力される
{
"_id" : "{{random.uuid}}",
"clusterID": "{{random.number(
{ "min":1,
"max":50
}
)}}",
"connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss")}}",
"kWhDelivered": "{{commerce.price}}",
"stationID": "{{random.number(
{ "min":1,
"max":467
}
)}}",
"spaceID": "{{random.word}}-{{random.number(
{ "min":1,
"max":20
}
)}}",
"timezone": "America/Los_Angeles",
"userID": "{{random.number(
{ "min":1000,
"max":500000
}
)}}"
}
[5] 実際にRedshift Serverlessでリアルタイムにクエリ
1) 外部スキーマ作成
データを Kinesis から Redshift オブジェクトにマッピングするために、外部スキーマを作成
CREATE EXTERNAL SCHEMA [任意のスキーマ名] FROM KINESIS
IAM_ROLE 'REDSHIFTのIAM ROLEのarn';
2) Materialized View作成
- ストリームデータを利用するためのマテリアライズドビューを作成
- Kinesisのデータは下記フォーマットで入力され、
kinesis_data
が実際のストリームレコードで、それ以外の情報は必要に応じデバッグ用で利用する
項目 | 型 | 説明 |
---|---|---|
approximate_arrival_timestamp | timestamp without time zone | レコードが Kinesis Streams に挿入された、おおよその時間 |
partition_key | varchar(256) | レコードをシャードに割り当てるために Kinesis によって使用されるキー |
shard_id | char(20) | レコードを取得したストリーム内のシャードの一意識別子 |
sequence_number | varchar(128) | Kinesis シャードのレコードの一意識別子 |
refresh_time | timestamp without time zone | 更新の開始時間 |
kinesis_data | varbyte | Kinesis ストリームからのレコード |
パターン1)payloadをSUPER型でJSONフォーマットのまま
CREATE MATERIALIZED VIEW ev_station_data AS
SELECT approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
json_parse(kinesis_data) as payload
FROM kds."redshiftinjection" WHERE can_json_parse(kinesis_data);
内容はこんな感じ
※ 右端のpayloadがJSONフォーマットのまま
パターン2)payloadをRedshiftで定義されたスキーマに変換
CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS
SELECT refresh_time,
approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'_id',true)::character(36) as ID,
json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'clusterID',true)::varchar(30) as clusterID,
json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'connectionTime',true)::varchar(20) as connectionTime,
json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'kWhDelivered',true)::DECIMAL(10,2) as kWhDelivered,
json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'stationID',true)::DECIMAL(10,2) as stationID,
json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'spaceID',true)::varchar(100) as spaceID,
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'timezone',true)::varchar(30)as timezone,
json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'userID',true)::varchar(30) as userID
FROM kds."redshiftinjection"
WHERE LENGTH(kinesis_data) < 65355;
3) ビューを更新
ストリームからデータを読み取り、マテリアライズドビューにロードされる
## パターン1のテーブル
REFRESH MATERIALIZED VIEW ev_station_data;
## パターン2のテーブル
REFRESH MATERIALIZED VIEW ev_station_data_extract;
ビューの更新を自動化するなら下記でON/OFF可能
ALTER MATERIALIZED VIEW ev_station_data AUTO REFRESH YES
備考)
- 自動更新を有効化したところ、10秒以下程度のラグで最新のストリームデータに追従していた模様。公式には「数百MB/秒の取り込みを秒単位のレイテンシーで実現」
- 自動更新タイミングを任意に調整したい場合は「スケジュールされたクエリ」機能、またはEventBridgeやGlue Python Shell Jobなど任意のサービスでスケジュール実行が考えられる。
- 不要になったMVは
DROP MATERIALIZED VIEW [VIEW名]
で削除。2023年4月時点ではMVに対してDELETE
やTRUNCATE
はできない。 - MVに新規で取り込めるデータは、Kinesis Data Streamsのデータ保持期間(デフォルトで24H)が対象。なお挙動を確認したところ、すでに取り込み済みのデータについてはMVを
REFRESH
しても残り、新規データが追加される。 - MVを元に
UNLOAD
でS3出力したり、CTAS
で実テーブルに複製することなどは自由にできる。
4)マテリアライズドビュー一覧確認
- マテリアライズドビューの一覧は以下
SVV_MV_INFO
で確認できる- Redshift Serverlessはプロビジョニング型より参照可能なシステムテーブル・ビューが限定的な点は注意が必要
- Kinesisストリーミング取り込み用だけではなく、すべてのMVの一覧
④ SELECT実行
結果は上述
select * from ev_station_data;
select * from ev_station_data_extract;
考慮事項
- 推奨は各ストリームのデータを 1 つのマテリアライズドビューに配置。ただし、ストリームを取り込んでデータを複数のマテリアライズドビューに配置することは可能。
- Kinesis Streams で作成されたMVでは、JOIN は未サポート。ストリームでMVを作成した後、別のMVを作成して、ストリーミングのMVと他のMV、テーブル、またはビューとJOINなら可能。
- データのサイズが最大サイズを超えているためにレコードを Amazon Redshift に取り込めない場合は、そのレコードはスキップされ、各エラーレコードのセグメントが
SYS_STREAM_SCAN_ERRORS
システムテーブルに書き込まれる - その他下記参照
参考情報