背景・目的
2022年2月に、こちらの記事でKinesis Data Streamに蓄積されたストリーミングデータをRedshiftで接続して参照することが発表されました。
今回は、この機能を実際に試してみたいと思います。
以下の前提となっている。
- 本機能は2022年8月23日現在でパブリックプレビューとなります。 そのため、GAまでに変更される可能性があります。
- クラスターのバージョンが1.0.35480 以降であること。
まとめ
- 現在パブリックプレビュー。
- Redshiftのマテリアライズドビューの仕組みを利用して、ストリームデータのある時点の断面をRedshiftで参照できる。
- KDF→KDF→S3→COPY(もしくはSpectrumで直接参照)としていたが、構成がシンプルになる。
- マテビューを介するので、リアルタイムに近いデータを参照するためには定期的にリフレッシュが必要になる。ただし秒単位で最新単位でのデータを参照するには、ECSやEC2などでデーモンなどを動かして、sleppをはさみつつ定期的に外部からリフレッシュのクエリを実行するなど工夫が必要と思われる。
- Redshiftスケジューラー(EventBridge)は、cronやrateは最小が「分(minute)」なので秒単位は設定ができない。
- そもそも秒単位のストリームデータをDWHで見ることは、あまり多くはなさそう。
概要
こちら の記事を参考に確認します。
以前は、いくつかの手順により、Amazon Kinesis Streams などのストリーミングサービスから Amazon Redshift へのデータのロードを行っていました。例えば、ストリームを Amazon Kinesis Data Firehose に接続した上で、Kinesis Data Firehose がさまざまなサイズのバッファー間隔でサイズのバッチを変更しながら、Amazon S3 内でデータをステージングするのを待つ必要がありました。この後、Amazon S3 から Redshift のテーブルにデータをロードするための COPY コマンドが、Kinesis Data Firehose によりトリガーされていました。
本機能が登場する前は、Kinesis Data Firehose(以降、KDFといいます。)を使用してS3にデータをフィードし、Spectrumで参照する。もしくはCOPYコマンドでRedshiftクラスタのテーブルにロードして参照していました。
Amazon S3 に事前ステージングを含めなくても、ストリーミングの取り込みにより、Kinesis Data Streams から Amazon Redshift マテリアライズドビューへの、低レイテンシーかつ高速なストリームデータの取り込みが提供されます。これにより速度が向上し、ストレージコストが削減されます。Amazon Redshift でのマテリアライズドビューの作成 で説明されているように SQL ステートメントを使用して Redshift クラスターを設定することで、ストリーミング取り込みを有効にし、マテリアライズドビューを作成できます。その後、マテリアライズドビューの更新を使用して、毎秒数百メガバイトのデータを、Amazon Kinesis Data Streams から Amazon Redshift に取り込むことができます。これにより、外部データにすばやくアクセスできるので、更新が高速化されます。
Kinesis Data Streaming(以降、KDSといいます。)から、Redshiftのマテリアライズドビュー(以降、マテビューといいます。)への取り込みを行い実現します。
これにより、S3のストレージコストが削減されるとのこと。
マテビューを更新することで、毎秒数百MBのデータをKDSからRedshiftに取り込むことができる。
データフロー
RedshiftはKDFと同様にストリームコンシューマという位置づけ。
マテビューは、ストリームから読み込まれるランディングゾーンとであり、データが到着時に処理される。
JSONデータをSQLでRedshiftのデータ列にマッピングできる。
マテビューの更新時に、Redshiftのコンピューティングノードは各データシャードをコンピュティングスライスに割り当てる。
ストリーミング取り込みのユースケース
以下のようなユースケースに当てはまります。
- 継続的に生成 (ストリーミング) され、その生成から短期間 (低レイテンシー) の内に処理する必要があるデータの処理。
- 実際には、以下のようなものが考えられる。
- IoTデバイス
- システムテレメトリー
- Webサイトのクリックストリーム
制約事項
現在、ストリームで作成されたマテリアライズドビューでは、JOIN の使用はサポートされていません。ストリームを使用してマテリアライズドビューを作成した後、別のマテリアライズドビューを作成して、ストリーミングのマテリアライズドビューと他のマテリアライズドビュー、テーブル、またはビューとの結合のために使用できます。
ストリームのマテビューに対してSQLでは、ジョインができない。
マテリアライズドビュー更新時の、ストリームからのデータの消費は、1 時間以内に完了する必要があります。更新処理が 1 時間を超過すると、認証情報の有効期限が切れるため、その更新は失敗します。
1H以内にリフレッシュが必要。既存のストリームに対して、新規にマテビューを作成する際など初回のリフレッシュに時間がかかるので、挙動を確認しておきたい。
以下によって、リフレッシュ時間を短縮できると思われる。確認してみたい。
- KDSの保持期間を減らし、全体のデータ量を減らす。(3日を1日など)
- シャード数を増やしつつ、Redshiftのスライス数を増やす。
Redshift ストリームの取り込みでは、Kinesis プロデューサーライブラリ (KPL の重要なコンセプト) によって集計されたレコードの解析をサポートしていません. 集計されたレコードは取り込まれますが、バイナリプロトコルのバッファデータとして格納されます。(詳細については「Protocol buffers」(プロトコルバッファ) を参照してください。) Kinesis へのデータのプッシュ方法によっては、この機能の無効化が必要となる場合があります。例えば、Kinesis Data Analytics を使用している場合は、Kinesis シンクテーブルを作成する際に 'sink.producer.aggregation-enabled' = 'false' を追加することで、無効化ができます。詳細については、「Amazon Kinesis Data Streams SQL Connector」(Amazon Kinesis Data Streams の SQL コネクター) を参照してください。
- プロトコルバッファでプッシュされたデータはデコードが必要になるが、Redshiftでは実装されていないので読み取れないためと思われる。
マテリアライズドビューの作成後の最初の更新は、ストリームの TRIM_HORIZON から開始されます。
マテリアライズドビューは、増分的な保守が可能である必要があります。Kinesis のデフォルトでは、24 時間前のストリームの履歴は保持されないため、これによる再計算は完全ではありません。より長いデータ保持を Kinesis で設定することは可能ですが、これにより技術的なメンテナンス量が増え、コストが高くなる可能性があります。詳細については、「REFRESH MATERIALIZED VIEW」を参照してください。
クラスターが一時停止している場合、そのクラスターが再開されるまで、マテリアライズドビューを更新することはできません。ストリームからのレコードが一時停止期間中に期限切れになった場合、そのストリームは使用できなくなります。
- 作成後、リフレッシュを放置したことにより期限ぎれになる場合も処理できなくなるのか?(おそらく、最後のレコードのポインタがなくなるためと思われる。)その場合、最初から読み取ってはくれないのかもしれない。クラスタを停止することはあり得るので、挙動について確認しておく。
サポートされるデータ形式は、VARBYTEからの変換が可能なデータ形式に限られます。詳細については、「VARBYTE 型」を参照してください。
-
VARBYTE型を読むとキャスト可能なデータイプは、以下のデータタイプとなる。このタイプ以外(Doubleなど)は、マテビューのカラムにマッピングができないと思われる。回避策としては、JSONデータとして取り込み、ELTなどでJSONを読み解きながらSELECT&INSERTでロードすることが考えられる。これも試してみる。
- CHAR
- VARCHAR
- SMALLINT
- INTEGER
- BIGINT
現在、VARBYTEに対しては解凍処理がサポートされていないため、圧縮データを含む Kinesis レコードを Amazon Redshift 内で クエリすることはできません。クエリするためには、ストリームにプッシュする前にデータを解凍する必要があります。
おそらくこの仕様により、プロトコルバッファが使えないと言われていると思われる。
実践
IAMポリシーの修正
KDSからストリームデータを読み込むため、以下の権限をIAMポリシーに付与します。
〜省略〜
{
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": [
"arn:aws:kinesis:*:{アカウントID}:stream/*"
],
"Effect": "Allow"
},
{
"Action": [
"kinesis:ListStreams",
"kinesis:ListShards"
],
"Resource": "*",
"Effect": "Allow"
}
〜省略〜
データの登録
スクリプトを実行し、KDSストリームに対してデータを登録し続けます。
スクリプト
import datetime
import json
import random
import boto3
STREAM_NAME = "{KDSストリーム名}"
def get_data():
return {
'EVENT_TIME': datetime.datetime.now().isoformat(),
'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'PRICE': round(random.random() * 100, 2)
}
def generate(stream_name, kinesis_client):
while True:
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="partitionkey")
if __name__ == '__main__':
generate(STREAM_NAME, boto3.client('kinesis',region_name='ap-northeast-1'))
実行
$ python3 stock.py
{'EVENT_TIME': '2022-08-24T23:52:54.592998', 'TICKER': 'AAPL', 'PRICE': 43.11}
{'EVENT_TIME': '2022-08-24T23:52:54.630761', 'TICKER': 'TBV', 'PRICE': 66.47}
{'EVENT_TIME': '2022-08-24T23:52:54.640961', 'TICKER': 'MSFT', 'PRICE': 4.38}
{'EVENT_TIME': '2022-08-24T23:52:54.651158', 'TICKER': 'INTC', 'PRICE': 63.14}
{'EVENT_TIME': '2022-08-24T23:52:54.662936', 'TICKER': 'TBV', 'PRICE': 6.87}
{'EVENT_TIME': '2022-08-24T23:52:54.671815', 'TICKER': 'AAPL', 'PRICE': 25.43}
{'EVENT_TIME': '2022-08-24T23:52:54.681416', 'TICKER': 'AAPL', 'PRICE': 32.74}
{'EVENT_TIME': '2022-08-24T23:52:54.692083', 'TICKER': 'AAPL', 'PRICE': 7.24}
{'EVENT_TIME': '2022-08-24T23:52:54.704872', 'TICKER': 'MSFT', 'PRICE': 9.33}
{'EVENT_TIME': '2022-08-24T23:52:54.713166', 'TICKER': 'INTC', 'PRICE': 66.98}
{'EVENT_TIME': '2022-08-24T23:52:54.724080', 'TICKER': 'AMZN', 'PRICE': 86.32}
{'EVENT_TIME': '2022-08-24T23:52:54.738239', 'TICKER': 'AAPL', 'PRICE': 3.79}
{'EVENT_TIME': '2022-08-24T23:52:54.748847', 'TICKER': 'MSFT', 'PRICE': 98.54}
{'EVENT_TIME': '2022-08-24T23:52:54.759416', 'TICKER': 'MSFT', 'PRICE': 17.76}
{'EVENT_TIME': '2022-08-24T23:52:54.770748', 'TICKER': 'AMZN', 'PRICE': 73.49}
・・・
スキーマの準備
外部スキーマの作成
クエリエディタから、Redshiftに接続し外部スキーマを作成します。
カタログの指定は、KINESISとします。
CREATE EXTERNAL SCHEMA ext_kinesis_schema
FROM KINESIS
iam_role 'arn:aws:iam::{AWSアカウント}:role/{IAMロール名}'
実際には、AWSアカウント、IAMロール名、別途埋めて実行しています。
マテビューの作成〜確認まで
サンプルに従い、試してみます。
マテビューの作成
JSONソースをRedshift形式に変換せずにRedshiftに格納しています。
CREATE MATERIALIZED VIEW {スキーマ名}.{マテビュー名} AS
SELECT ApproximateArrivalTimestamp,
JSON_PARSE(from_varbyte(Data, 'utf-8')) as Data
FROM ext_kinesis_schema."{KDSストリーム名}"
WHERE is_utf8(Data) AND is_valid_json(from_varbyte(Data, 'utf-8'));
実際にはKDSストリーム名、スキーマ名、マテビュー名には、作成したKDSストリームを指定しています。
マテビューのリフレッシュ
初回は多いため、801,315件(後述)のデータで4分ほどかかった。
REFRESH MATERIALIZED VIEW {スキーマ名}.{マテビュー名};
2回目は、約4万件(840,923-801,315=39,608件)で15秒かかった。
データの参照
件数の確認
SELECT * FROM {スキーマ名}.{マテビュー名};
データの確認
SELECT * FROM {スキーマ名}.{マテビュー名};
考察
比較的簡単にセットアップ〜データの参照まで可能でした。この手軽さでストリームデータが取り扱えるのは嬉しいと感じました。
また、いくつかの制約事項があるので、別途挙動を確認してみたいと思います。
参考