1
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Redshiftでリアルタイム分析:Kinesisストリーミング取り込みを試す

Last updated at Posted at 2023-04-28

概要

image.png

  • Kinesis Data Streamのストリームデータを、ニアリアルタイムで直接Redshiftに取り込む機能
  • Redshift内ではMaterialized Viewとして取り込まれる
  • 今回はRedshift Serverlessを利用して試す
  • シナリオは以下公式docのチュートリアル

参考)
KinesisストリームデータをRedshiftに投入する従来の方法として、Kinesis Data Firehoseの利用がある。ただしこの方式は、内部的にはS3を中継してCOPYでマイクロバッチ的に取り込む方式であり少なくとも数分単位以上のラグがあり、かつ2023年4月時点ではパブリックアプセス可能なProvisioned型Redshiftクラスターが対象(閉域内のRedshift Serverlessは未対応)など制約事項もある。

全体の流れ

  • Redshift Serverlessをセットアップ
  • ストリームデータ取り込みのためのRedshfit用IAM ROLEの設定
  • Kinesis Data Streamのセットアップ
  • Kinesis Data Generatorで検証用ダミーデータを作成
  • 実際にRedshift Serverlessでリアルタイムにクエリ

いざ試す

[1] Redshift Serverlessをセットアップ

  • 今回は記載省略
  • 注意事項としては、NAT-GW または I-GW を使用してインターネット経由で Kinesis Data Streams エンドポイントに到達するルートがある、またはKinesis インターフェイス VPC エンドポイントがあること。

[2] ストリームデータ取り込みのためのRedshfit用IAM ROLEの設定

  • RedshiftのIAM Roleに以下ポリシーをアタッチ
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:*:0123456789:stream/*"
        },
        {
            "Sid": "ListStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:ListShards"
            ],
            "Resource": "*"
        }
    ]
}

[3] Kinesis Data Streamのセットアップ

  • Amazon Kinesis Data Streams コンソールから、データストリームを作成
  • データストリーム名は任意(例: redshiftinjection)
  • [Capacity mode] (キャパシティーモード) で、[On-demand] (オンデマンド) を選択して作るだけ。

image.png

[4] Kinesis Data Generatorで検証用ダミーデータを作成

image.png

{
    
   "_id" : "{{random.uuid}}",
   "clusterID": "{{random.number(
        {   "min":1,
            "max":50
        }
    )}}", 
    "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss")}}",
    "kWhDelivered": "{{commerce.price}}",
    "stationID": "{{random.number(
        {   "min":1,
            "max":467
        }
    )}}",
      "spaceID": "{{random.word}}-{{random.number(
        {   "min":1,
            "max":20
        }
    )}}",
 
   "timezone": "America/Los_Angeles",
   "userID": "{{random.number(
        {   "min":1000,
            "max":500000
        }
    )}}"
}

[5] 実際にRedshift Serverlessでリアルタイムにクエリ

1) 外部スキーマ作成

データを Kinesis から Redshift オブジェクトにマッピングするために、外部スキーマを作成

CREATE EXTERNAL SCHEMA [任意のスキーマ名] FROM KINESIS
IAM_ROLE 'REDSHIFTのIAM ROLEのarn';

2) Materialized View作成

  • ストリームデータを利用するためのマテリアライズドビューを作成
  • Kinesisのデータは下記フォーマットで入力され、kinesis_dataが実際のストリームレコードで、それ以外の情報は必要に応じデバッグ用で利用する
項目 説明
approximate_arrival_timestamp timestamp without time zone レコードが Kinesis Streams に挿入された、おおよその時間
partition_key varchar(256) レコードをシャードに割り当てるために Kinesis によって使用されるキー
shard_id char(20) レコードを取得したストリーム内のシャードの一意識別子
sequence_number varchar(128) Kinesis シャードのレコードの一意識別子
refresh_time timestamp without time zone 更新の開始時間
kinesis_data varbyte Kinesis ストリームからのレコード

パターン1)payloadをSUPER型でJSONフォーマットのまま

CREATE MATERIALIZED VIEW ev_station_data AS
    SELECT approximate_arrival_timestamp,
    partition_key,
    shard_id,
    sequence_number,
    json_parse(kinesis_data) as payload
    FROM kds."redshiftinjection" WHERE can_json_parse(kinesis_data);

内容はこんな感じ
image.png
※ 右端のpayloadがJSONフォーマットのまま

パターン2)payloadをRedshiftで定義されたスキーマに変換

CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS
    SELECT refresh_time,
    approximate_arrival_timestamp,
    partition_key,
    shard_id,
    sequence_number,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'_id',true)::character(36) as ID,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'clusterID',true)::varchar(30) as clusterID,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'connectionTime',true)::varchar(20) as connectionTime,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'kWhDelivered',true)::DECIMAL(10,2) as kWhDelivered,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'stationID',true)::DECIMAL(10,2) as stationID,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'spaceID',true)::varchar(100) as spaceID,
    json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'timezone',true)::varchar(30)as timezone,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'userID',true)::varchar(30) as userID
    FROM kds."redshiftinjection"
    WHERE LENGTH(kinesis_data) < 65355;

内容はこんな感じ
image.png

3) ビューを更新

ストリームからデータを読み取り、マテリアライズドビューにロードされる

## パターン1のテーブル
REFRESH MATERIALIZED VIEW ev_station_data;

## パターン2のテーブル
REFRESH MATERIALIZED VIEW ev_station_data_extract;

ビューの更新を自動化するなら下記でON/OFF可能

ALTER MATERIALIZED VIEW ev_station_data AUTO REFRESH YES

備考)

  • 自動更新を有効化したところ、10秒以下程度のラグで最新のストリームデータに追従していた模様。公式には「数百MB/秒の取り込みを秒単位のレイテンシーで実現」
  • 自動更新タイミングを任意に調整したい場合は「スケジュールされたクエリ」機能、またはEventBridgeやGlue Python Shell Jobなど任意のサービスでスケジュール実行が考えられる。
  • 不要になったMVはDROP MATERIALIZED VIEW [VIEW名]で削除。2023年4月時点ではMVに対してDELETETRUNCATEはできない。
  • MVに新規で取り込めるデータは、Kinesis Data Streamsのデータ保持期間(デフォルトで24H)が対象。なお挙動を確認したところ、すでに取り込み済みのデータについてはMVをREFRESHしても残り、新規データが追加される。
  • MVを元にUNLOADでS3出力したり、CTASで実テーブルに複製することなどは自由にできる。

4)マテリアライズドビュー一覧確認

  • マテリアライズドビューの一覧は以下SVV_MV_INFOで確認できる
    • Redshift Serverlessはプロビジョニング型より参照可能なシステムテーブル・ビューが限定的な点は注意が必要
    • Kinesisストリーミング取り込み用だけではなく、すべてのMVの一覧

④ SELECT実行

結果は上述

select * from ev_station_data;

select * from ev_station_data_extract;

考慮事項

  • 推奨は各ストリームのデータを 1 つのマテリアライズドビューに配置。ただし、ストリームを取り込んでデータを複数のマテリアライズドビューに配置することは可能。
  • Kinesis Streams で作成されたMVでは、JOIN は未サポート。ストリームでMVを作成した後、別のMVを作成して、ストリーミングのMVと他のMV、テーブル、またはビューとJOINなら可能。
  • データのサイズが最大サイズを超えているためにレコードを Amazon Redshift に取り込めない場合は、そのレコードはスキップされ、各エラーレコードのセグメントが SYS_STREAM_SCAN_ERRORS システムテーブルに書き込まれる
  • その他下記参照

参考情報

1
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?