1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Redshift Streaming Injectionのデータ取得に失敗したときのメモ

Last updated at Posted at 2022-10-16

背景・目的

RedshiftのStreaming Injectionを試していたときに、データが取得できない場合があったので検証したときのメモを残しておきます。

2022年10月15日現在、Redshift Streaming Injectionはパブリックプレビューです。
本件は、将来的に解消される可能性があります。

オチとしては、こちらのドキュメントにかかれているとおり、KPLで集計されたレコードの解析をサポートしていないことが原因でした。(以下に、該当箇所を載せています。)

Redshift ストリームの取り込みでは、Kinesis プロデューサーライブラリ (KPL の重要なコンセプト) によって集計されたレコードの解析をサポートしていません. 集計されたレコードは取り込まれますが、バイナリプロトコルのバッファデータとして格納されます。(詳細については「Protocol buffers」(プロトコルバッファ) を参照してください。) Kinesis へのデータのプッシュ方法によっては、この機能の無効化が必要となる場合があります。例えば、Kinesis Data Analytics を使用している場合は、Kinesis シンクテーブルを作成する際に 'sink.producer.aggregation-enabled' = 'false' を追加することで、無効化ができます。詳細については、「Amazon Kinesis Data Streams SQL Connector」(Amazon Kinesis Data Streams の SQL コネクター) を参照してください。

まとめ

  • 2022/10/15現在、Streaming Injectionはパブリックプレビューです。
  • 集計を有効化したときには、エラーは発生せずデータが取得できません。一見、遅延しているように見えます。

概要

Redshift Streaming Injectionとは?

「Redshift Streaming Ingestion for Kinesis Data Streams(Public Preview)を試してみた」に、仕様等をまとめました。詳細はそちらのドキュメント、または、こちらのドキュメントをご確認ください。

実践

こちらで作成したKDAでデータを使用してAggregationのfalse/trueを入れ替えて事象を確認します。

事前準備

IAMポリシーの作成とアタッチ

こちらを参考にIAMロールとポリシーを作成します。

  1. 以下の内容でIAMポリシーを作成します。
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:ap-northeast-1:${AWSアカウントID}:stream/ExampleOutputStream"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:ListShards"
            ],
            "Resource": "*"
        }
    ]
}

2.Redshiftクラスタに設定しているIAMロールにアタッチします。

Mterialized viewを作成

  1. KDS用の外部スキーマを作成する。
  CREATE EXTERNAL SCHEMA kinesis
    FROM KINESIS
IAM_ROLE 'arn:aws:iam:ap-northeast-1:${AWSアカウントID}:role/RedshiftStreamInjectionRole'

2.Materialized View用のスキーマを作成する。

CREATE SCHEMA stream_test
  1. Materialized viewを作成する。
CREATE MATERIALIZED VIEW stream_test.m_stock
       DISTSTYLE KEY
       DISTKEY(shardid)
       SORTKEY(sequencenumber)
AS
SELECT 
       approximatearrivaltimestamp
       ,partitionkey
       ,shardid
       ,sequencenumber
       ,FROM_VARBYTE(data, 'utf-8')::SUPER AS payload
  FROM kinesis."ExampleOutputStream"
 WHERE is_utf8(data) AND is_valid_json(from_varbyte(data, 'utf-8'));

4.データが取れるかテストする。
Refresh materialized viewでデータが取得できるか確認する。

REFRESH MATERIALIZED VIEW stream_test."m_stock"

5.とれました。

SELECT COUNT(1) FROM stream_test."m_stock"

===
count
80124

~~~~~~~


SELECT * FROM stream_test."m_stock" LIMIT 5

=== 
approximatearrivaltimestamp,partitionkey,shardid,sequencenumber,payload
2022-10-15 14:47:28.232,0,shardId-000000000003,49634247142778356192916115443600758999805560093377822770,"""{\""EVENT_TIME\"": \""2022-10-15T14:47:27.934564\"", \""TICKER\"": \""AMZN\"", \""PRICE\"": 67.51}"""
2022-10-15 14:47:28.234,0,shardId-000000000003,49634247142778356192916115443601967925625174722552528946,"""{\""EVENT_TIME\"": \""2022-10-15T14:47:28.009548\"", \""TICKER\"": \""AMZN\"", \""PRICE\"": 78.44}"""
2022-10-15 14:47:28.234,0,shardId-000000000003,49634247142778356192916115443603176851444789351727235122,"""{\""EVENT_TIME\"": \""2022-10-15T14:47:28.019476\"", \""TICKER\"": \""INTC\"", \""PRICE\"": 66.43}"""
2022-10-15 14:47:28.234,0,shardId-000000000003,49634247142778356192916115443604385777264403980901941298,"""{\""EVENT_TIME\"": \""2022-10-15T14:47:28.029698\"", \""TICKER\"": \""TBV\"", \""PRICE\"": 52.91}"""
2022-10-15 14:47:28.234,0,shardId-000000000003,49634247142778356192916115443605594703084018610076647474,"""{\""EVENT_TIME\"": \""2022-10-15T14:47:28.039276\"", \""TICKER\"": \""MSFT\"", \""PRICE\"": 58.93}"""

Aggregationの設定を事前確認

FlinkのアプリケーションでAggregationの設定を確認します。

仕様を確認

こちらのドキュメントには、以下のように記載されていました。
AggregationEnabledで制御するようです。

// Disable Aggregation if it's not supported by a consumer
// producerConfig.put("AggregationEnabled", "false");

FlinkのJavaコードを確認

  • コンシューマのmainメソッドで「createSourceFromStaticConfig」でSinkの設定を追加している。
  • mainメソッドは、以下の通り。
    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        System.out.println("main start.");

        /* if you would like to use runtime configuration properties, uncomment the lines below
         * DataStream<String> input = createSourceFromApplicationProperties(env);
         */
        DataStream<String> input = createSourceFromStaticConfig(env);

        /* if you would like to use runtime configuration properties, uncomment the lines below
         * input.addSink(createSinkFromApplicationProperties())
         */
        input.addSink(createSinkFromStaticConfig());

        env.execute("Flink Streaming Java API Skeleton");
    }

  • createSourceFromStaticConfigメソッドで、AggregationEnabledのデフォルトで「false」を設定している。
  • createSourceFromStaticConfigメソッドは以下の通り。
    private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
        Properties outputProperties = new Properties();
        outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        outputProperties.setProperty("AggregationEnabled", "false");

        FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
        sink.setDefaultStream(outputStreamName);
        sink.setDefaultPartition("0");
        return sink;
    }

KDAの設定値を確認

こちらで作成したKinesisDataApplicationTestのランタイムプロパティで設定していました。

  • AggregationEnabledをFalseに設定
    image.png

AggregationEnabledをFalseに設定

falseの状態でデータを取得します。

問題なくデータが取得できました。

実行前のデータを確認

RedhiftのマテビューとS3は同じ状態。

S3を確認

10/15 11:59(UTCで10/14 14:59)で、Redshiftのマテビューと同じタイムスタンプ。
image.png

Redshiftのマテビュー

実行前は、80,124件、最新のタイムスタンプは、「2022-10-15 14:59:38.836」でした。

REFRESH MATERIALIZED VIEW stream_test."m_stock"
SELECT MAX(approximatearrivaltimestamp) AS approximatearrivaltimestamp ,MAX(sequencenumber) AS sequencenumber , COUNT(1) AS CNT FROM stream_test."m_stock" 

===

approximatearrivaltimestamp,sequencenumber,cnt
2022-10-15 14:59:38.836,49634247142778356192916115540463522444788543623578779698,80124

実行

  1. AggregationEnabledをFalseに設定
    image.png

  2. KDAを実行
    image.png

  3. ログ生成(stock.py)を停止

$ ps -ef | grep stock.py 
ec2-user 15228  6511 17 Oct15 pts/2    01:57:54 python3 stock.py
ec2-user 19364  6511  0 02:08 pts/2    00:00:00 grep --color=auto stock.py
$ kill -15 15228
[1]+  Terminated              python3 stock.py > log
$ 

実行後のデータを確認

S3を確認

おおよそ、2022/10/16 11:06(JST)頃まで確認
image.png

/* SQL クエリを記述するためのリファレンスポイントを作成するには、次の SQL クエリを実行して入力データの最初の 5 つのレコードを表示できます。 SELECT * FROM s3object s LIMIT 5 */
SELECT * FROM s3object s LIMIT 5

===

{
  "EVENT_TIME": "2022-10-16T02:06:10.930143",
  "TICKER": "AMZN",
  "PRICE": 0.36
}
{
  "EVENT_TIME": "2022-10-16T02:07:33.902039",
  "TICKER": "AAPL",
  "PRICE": 40.84
}
{
  "EVENT_TIME": "2022-10-16T02:06:45.229515",
  "TICKER": "INTC",
  "PRICE": 17.85
}
{
  "EVENT_TIME": "2022-10-16T02:07:43.729067",
  "TICKER": "INTC",
  "PRICE": 8.38
}
{
  "EVENT_TIME": "2022-10-16T02:07:26.961327",
  "TICKER": "INTC",
  "PRICE": 91.21
}

Redshiftのマテビューを確認

実行後は、413,207件、最新のタイムスタンプは、「2022-10-16 02:12:36.803」。

REFRESH MATERIALIZED VIEW stream_test."m_stock"
SELECT MAX(approximatearrivaltimestamp) AS approximatearrivaltimestamp ,MAX(sequencenumber) AS sequencenumber , COUNT(1) AS CNT FROM stream_test."m_stock" 

===

approximatearrivaltimestamp,sequencenumber,cnt
2022-10-16 02:12:36.803,49634265348415413172162399223022302411889445730605072450,413207

AggregationEnabledをFalseに設定

trueの状態でデータを取得します。
::: note info
Aggregationするとデータは、更新されない。
:::

コードとプロパティを修正します。

  1. stock.pyを修正します。(以下のメソッドで設定しているAggregationEnabledをfalseからtrueに変更します。)
    private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
        Properties outputProperties = new Properties();
        outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        outputProperties.setProperty("AggregationEnabled", "true");

        FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
        sink.setDefaultStream(outputStreamName);
        sink.setDefaultPartition("0");
        return sink;
    }

  1. ビルド&Artifact用のバケットにJarファイルをアップロードします。

実行前のデータを確認

Redshift

実行前は、3,049,946件、最新のタイムスタンプは、「2022-10-16 08:29:47.796」。

REFRESH MATERIALIZED VIEW stream_test."m_stock"
SELECT MAX(approximatearrivaltimestamp) AS approximatearrivaltimestamp ,MAX(sequencenumber) AS sequencenumber , COUNT(1) AS CNT FROM stream_test."m_stock" 

===
approximatearrivaltimestamp,sequencenumber,cnt
2022-10-16 08:29:47.796	49634265755426313790552428346534646053406161846577135858	3049946

S3

最新のデータは、10/15 17:37 (10/15 8:37 UTC)。
image.png

実行

  1. AggregationEnabledをTrueに設定
    image.png

  2. 実行
    image.png
    image.png

  3. ログを生成(stock.py)

$ python3 stock.py > log7 &
[1] 16404
$ 

4.30分程度実行してログ出力を停止する。

$ ps -ef |grep 16404
ec2-user 16404  6511 15 08:50 pts/2    00:04:50 python3 stock.py
ec2-user 28596  6511  0 09:21 pts/2    00:00:00 grep --color=auto 16404
$ 

$ kill -15 16404

実行後のデータを確認

出力の件数を確認(EC2のstock.py)

約16.3万件(162,958件)増加。

$ ls -l log7
-rw-rw-r-- 1 ec2-user ec2-user 12808719 Oct 16 09:22 log7
$ wc -l log7
162958 log7
$ 

S3を確認

1.ファイル数を確認。対象は32ファイル(10ファイル+22ファイル)
image.png
image.png

2.レコード件数を確認。1ファイル約5300件程度(5,228件)
image.png

3.レコード件数 おおよそ169,600件(5,300件*32ファイル)と見積もる。EC2の件数とほぼ一致。

Redshiftのマテビューを確認

  • 実行後は、3,049,946件、最新のタイムスタンプは、「2022-10-16 08:29:47.796」。
  • 更新件数は、 0件(実行後:3,049,946 - 実行前:3,049,946)。
  • 3049944
    image.png
REFRESH MATERIALIZED VIEW stream_test."m_stock"
SELECT MAX(approximatearrivaltimestamp) AS approximatearrivaltimestamp ,MAX(sequencenumber) AS sequencenumber , COUNT(1) AS CNT FROM stream_test."m_stock" 

approximatearrivaltimestamp,sequencenumber,cnt
2022-10-16 08:29:47.796	49634265755426313790552428346534646053406161846577135858	3049946

考察

  • AggregationEnabledを有効化すると、Redshift Streaming Injectionではコンシュームできません。
  • 無効化した場合は、取得できますが、高いスループットのシステムでは、若干きついかもしれません。
  • GA後に、コンシュームできるようになることを期待します。
  • はじめに、ドキュメントの制約を読んでおくこと。(特にプレビューの場合)

参考

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?