3
1

More than 1 year has passed since last update.

Redshift Streaming Ingestion の使い方まとめ

Last updated at Posted at 2022-12-07

これは AWS Analytics Advent Calendar 2022 の 8 日目の記事です。

1.はじめに

AWS re:Invent 2022 で Redshift の Streaming Ingestion 機能が GA 発表されました。Streaming Ingestion とは Amazon Kinesis Data Streams と Amazon Managed Streaming for Apache Kafka (MSK) からストリーミングデータを 直接 Redshift に リアルタイム で投入することができる機能です。
この機能が出るまで Redshift 上でリアルタイムにデータを入れるには、Amazon Kinesis Data Firehose を使って一旦データを S3 に保存し、そのデータを逐次的に Redshift の COPY 文を使って投入する必要がありました。逐次的に COPY 文を叩く仕組み、すでにロードした S3 上のファイルを除外する仕組みなど、様々な仕組みを実装・運用する手間がありました。
この機能によって、そういった実装・運用の手間から開放され、しかも従来の方法よりもよりリアルタイムに近い鮮度でデータを投入することができます。

この記事では、Amazon Kinesis Data Streams を利用した際の Streaming Ingestion の基本的な使い方と使う上での注意点をプロビジョニングされた Redshift を対象にまとめます。

2. 使い方

大まかな使い方は以下の通りです。

  1. Redshift のクラスターに関連付けされた IAM ロールに Amazon Kinesis Data Streams の stream へのアクセス権を付与する
  2. Redshift 上で Stream のデータを読み取るための Materialized View を作成する
  3. 定期的に Materialized View を更新して Stream からデータを取り込む

1. アクセス権の付与

Redshift から Amazon Kinesis Data Streams の stream にアクセスするために、Redshift のクラスターに関連付けされた IAM ロールに Kinesis へのアクセス権限を付与した IAM ポリシーをアタッチします。IAM ポリシーは以下の通りです。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:*:0123456789:stream/*" ←ここに読み込み元の stream の ARN を記載する
        },
        {
            "Sid": "ListStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:ListShards"
            ],
            "Resource": "*"
        }
    ]
}

2. Materialized View の作成

注意するポイントとして Materialized View を作成する前に、Redshift のクラスターと Amazon Kinesis Data Streams がネットワーク的に導通できるかどうかを確認する必要があります。つまり、インターネットを経由しても良いのであれば、クラスターが属しているVPCのサブネットグループがインターネットゲートウェイや NAT ゲートウェイを利用してインターネットにアクセスできるか確認する必要があります。また、AWS内のネットワークに閉じたい場合は、 Kinesis interface VPC Endpoint を利用する必要があります。ここでは interface VPC Endpoint を作る解説を挟みます。

Interface VPC Endpoint の作成

[VPCダッシュボード]→左側のハンバーガーメニュー(三)をクリック→[エンドポイント]→[エンドポイントを作成]で VPC Endpoint を作成できます。
Screen Shot 2022-12-06 at 18.25.55 copy.jpg

ポイントは以下の通りです。

  • Amazon Kinesis Data Streams にアクセスするための Endpoint なのでサービスとして kinesis-streams を選択
  • VPC は Redshift のクラスターが配置されている場所を選択
  • サブネットはクラスターが配置されているサブネットグループに含まれるサブネットを選択

Materialized View の作成

Redshift クエリエディタ (クエリエディタv2) から外部 SCHEMA を作ります。

CREATE EXTERNAL SCHEMA {外部 SCHEMA 名}
FROM KINESIS
IAM_ROLE 'iam-role-arn'; # デフォルト IAM ロールが設定されている場合は IAM_ROLE default でも可

その後、Materialized View を作成します。

CREATE MATERIALIZED VIEW {VIEW 名} AS
SELECT
... # ここでデータのマッピングを行う
FROM {外部 SCHEMA 名}.{stream 名};

{stream 名}には以下のメタデータが入っているので、そのメタデータを利用してテーブルのカラムにマッピングします。

メタデータ名 説明
approximate_arrival_timestamp TIMESTAMP 各レコードが Kinesis stream に投入されたおおよその時間
partition_key VARCHAR(256) Kinesis stream に投入する時に指定した partition_key
shard_id CHAR(20) Kinesis stream 内の shard の id
sequence_number VARCHAR(128) shard 内で一意の値で、時間とともに増加
kinesis_data VARBYTE(1024000) Kinesis stream に投入された実データ
refresh_time TIMESTAMP Materialized View が REFRESH を開始した時間 (= Redshift に取り込みを開始した時間)

注意するポイントとして kinesis_data はバイナリで保存されているため、カラムにマッピングする際は from_varbyte 関数を利用してバイナリから文字列に変換します。加えて、stream に投入されたデータは json や csv 形式の物が多いため、json_extract_path_text 関数や split_part 関数を利用してカラムに分解します。例えば以下のような式になります。

json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'), '{フィールド名}')::int as {カラム名}

3. Materialized View を更新してデータを取り込む

以下のクエリを実行することで、Materialized View を更新してデータを取り込みます。

REFRESH MATERIALIZED VIEW {VIEW 名};

このクエリは、前回の更新時点から現在の更新時点で増分されたデータのみを取り込みます。すでに取り込まれたデータは更新されません。また、前回の更新時点が無い初期の更新時は、stream に保存されているすべてのデータを取り込みます。そのため、View を DROP してしまった場合、stream に現時点で保存されているデータのみが取り込まれます。stream の保持期間を過ぎたものは再度取り込むことはできません。

定期的な Materialized View の更新

REFRESH で更新しない限りデータが取り込まれないため、リアルタイムで取り込むためには高頻度且つ定期的に更新を行う必要があります。プロビジョニングされた Redshift クラスターでは クエリのスケジュール設定 が利用できるので、こちらを利用することで、REFRESH 文を定期的に実行することができます。但し、このスケジューラーでは最小単位が分で、実行時に計算リソースを利用します。そのため、更新頻度はリアルタイム要件と計算リソースの許容度を加味して決める必要があります。

12/08追記

Materialized View の作成時に Materialized View の自動更新機能を使うことで、能動的に REFRESH をしなくても自動更新が可能です。その場合は以下の通りに Materialized View を作成するときに AUTO REFRESH YES を追加します。

CREATE MATERIALIZED VIEW {VIEW 名} AUTO REFRESH YES AS
SELECT
... # ここでデータのマッピングを行う
FROM {外部 SCHEMA 名}.{stream 名};

こちらの機能は手動での REFRESH と併用可能です。

注意するポイントとしてこの自動更新はAWS側のロジックで行われるため、更新を自身のロジックでコントロールできない点です。私が実際に使ってみた限りだと、データが stream に投入されてもすぐに更新が入るわけではなかったです(数分では更新されない模様)。高頻度で更新が必要な場合は、手動で REFRESH が必要になりそうです。

まとめ

プロビジョニングされた Redshift のクラスターでの Streaming Ingestion の基本的な使い方を説明しました。なお、Redshift Serverless でも Streaming Ingestion を利用することは可能ですが、更新時に RPU を消費する(ハズ)なのと、クエリのスケジュール設定が使えないので、Materialized View の自動更新機能を使わない場合は独自で Materialized View を更新する手段を用意する必要があります。

参考文献

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1