背景・目的
RedshiftのStreaming Injectionを試していたときに、データが取得できない場合があったので検証したときのメモを残しておきます。
2022年10月15日現在、Redshift Streaming Injectionはパブリックプレビューです。
本件は、将来的に解消される可能性があります。
オチとしては、こちらのドキュメントにかかれているとおり、KPLで集計されたレコードの解析をサポートしていないことが原因でした。(以下に、該当箇所を載せています。)
Redshift ストリームの取り込みでは、Kinesis プロデューサーライブラリ (KPL の重要なコンセプト) によって集計されたレコードの解析をサポートしていません. 集計されたレコードは取り込まれますが、バイナリプロトコルのバッファデータとして格納されます。(詳細については「Protocol buffers」(プロトコルバッファ) を参照してください。) Kinesis へのデータのプッシュ方法によっては、この機能の無効化が必要となる場合があります。例えば、Kinesis Data Analytics を使用している場合は、Kinesis シンクテーブルを作成する際に 'sink.producer.aggregation-enabled' = 'false' を追加することで、無効化ができます。詳細については、「Amazon Kinesis Data Streams SQL Connector」(Amazon Kinesis Data Streams の SQL コネクター) を参照してください。
まとめ
- 2022/10/15現在、Streaming Injectionはパブリックプレビューです。
- 集計を有効化したときには、エラーは発生せずデータが取得できません。一見、遅延しているように見えます。
概要
Redshift Streaming Injectionとは?
「Redshift Streaming Ingestion for Kinesis Data Streams(Public Preview)を試してみた」に、仕様等をまとめました。詳細はそちらのドキュメント、または、こちらのドキュメントをご確認ください。
実践
こちらで作成したKDAでデータを使用してAggregationのfalse/trueを入れ替えて事象を確認します。
事前準備
IAMポリシーの作成とアタッチ
こちらを参考にIAMロールとポリシーを作成します。
- 以下の内容でIAMポリシーを作成します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:ap-northeast-1:${AWSアカウントID}:stream/ExampleOutputStream"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"kinesis:ListStreams",
"kinesis:ListShards"
],
"Resource": "*"
}
]
}
2.Redshiftクラスタに設定しているIAMロールにアタッチします。
Mterialized viewを作成
- KDS用の外部スキーマを作成する。
CREATE EXTERNAL SCHEMA kinesis
FROM KINESIS
IAM_ROLE 'arn:aws:iam:ap-northeast-1:${AWSアカウントID}:role/RedshiftStreamInjectionRole'
2.Materialized View用のスキーマを作成する。
CREATE SCHEMA stream_test
- Materialized viewを作成する。
CREATE MATERIALIZED VIEW stream_test.m_stock
DISTSTYLE KEY
DISTKEY(shardid)
SORTKEY(sequencenumber)
AS
SELECT
approximatearrivaltimestamp
,partitionkey
,shardid
,sequencenumber
,FROM_VARBYTE(data, 'utf-8')::SUPER AS payload
FROM kinesis."ExampleOutputStream"
WHERE is_utf8(data) AND is_valid_json(from_varbyte(data, 'utf-8'));
4.データが取れるかテストする。
Refresh materialized viewでデータが取得できるか確認する。
REFRESH MATERIALIZED VIEW stream_test."m_stock"
5.とれました。
SELECT COUNT(1) FROM stream_test."m_stock"
===
count
80124
~~~~~~~
SELECT * FROM stream_test."m_stock" LIMIT 5
===
approximatearrivaltimestamp,partitionkey,shardid,sequencenumber,payload
2022-10-15 14:47:28.232,0,shardId-000000000003,49634247142778356192916115443600758999805560093377822770,"""{\""EVENT_TIME\"": \""2022-10-15T14:47:27.934564\"", \""TICKER\"": \""AMZN\"", \""PRICE\"": 67.51}"""
2022-10-15 14:47:28.234,0,shardId-000000000003,49634247142778356192916115443601967925625174722552528946,"""{\""EVENT_TIME\"": \""2022-10-15T14:47:28.009548\"", \""TICKER\"": \""AMZN\"", \""PRICE\"": 78.44}"""
2022-10-15 14:47:28.234,0,shardId-000000000003,49634247142778356192916115443603176851444789351727235122,"""{\""EVENT_TIME\"": \""2022-10-15T14:47:28.019476\"", \""TICKER\"": \""INTC\"", \""PRICE\"": 66.43}"""
2022-10-15 14:47:28.234,0,shardId-000000000003,49634247142778356192916115443604385777264403980901941298,"""{\""EVENT_TIME\"": \""2022-10-15T14:47:28.029698\"", \""TICKER\"": \""TBV\"", \""PRICE\"": 52.91}"""
2022-10-15 14:47:28.234,0,shardId-000000000003,49634247142778356192916115443605594703084018610076647474,"""{\""EVENT_TIME\"": \""2022-10-15T14:47:28.039276\"", \""TICKER\"": \""MSFT\"", \""PRICE\"": 58.93}"""
Aggregationの設定を事前確認
FlinkのアプリケーションでAggregationの設定を確認します。
仕様を確認
こちらのドキュメントには、以下のように記載されていました。
AggregationEnabledで制御するようです。
// Disable Aggregation if it's not supported by a consumer
// producerConfig.put("AggregationEnabled", "false");
FlinkのJavaコードを確認
- コンシューマのmainメソッドで「createSourceFromStaticConfig」でSinkの設定を追加している。
- mainメソッドは、以下の通り。
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println("main start.");
/* if you would like to use runtime configuration properties, uncomment the lines below
* DataStream<String> input = createSourceFromApplicationProperties(env);
*/
DataStream<String> input = createSourceFromStaticConfig(env);
/* if you would like to use runtime configuration properties, uncomment the lines below
* input.addSink(createSinkFromApplicationProperties())
*/
input.addSink(createSinkFromStaticConfig());
env.execute("Flink Streaming Java API Skeleton");
}
- createSourceFromStaticConfigメソッドで、AggregationEnabledのデフォルトで「false」を設定している。
- createSourceFromStaticConfigメソッドは以下の通り。
private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
outputProperties.setProperty("AggregationEnabled", "false");
FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
sink.setDefaultStream(outputStreamName);
sink.setDefaultPartition("0");
return sink;
}
KDAの設定値を確認
こちらで作成したKinesisDataApplicationTestのランタイムプロパティで設定していました。
AggregationEnabledをFalseに設定
falseの状態でデータを取得します。
問題なくデータが取得できました。
実行前のデータを確認
RedhiftのマテビューとS3は同じ状態。
S3を確認
10/15 11:59(UTCで10/14 14:59)で、Redshiftのマテビューと同じタイムスタンプ。
Redshiftのマテビュー
実行前は、80,124件、最新のタイムスタンプは、「2022-10-15 14:59:38.836」でした。
REFRESH MATERIALIZED VIEW stream_test."m_stock"
SELECT MAX(approximatearrivaltimestamp) AS approximatearrivaltimestamp ,MAX(sequencenumber) AS sequencenumber , COUNT(1) AS CNT FROM stream_test."m_stock"
===
approximatearrivaltimestamp,sequencenumber,cnt
2022-10-15 14:59:38.836,49634247142778356192916115540463522444788543623578779698,80124
実行
$ ps -ef | grep stock.py
ec2-user 15228 6511 17 Oct15 pts/2 01:57:54 python3 stock.py
ec2-user 19364 6511 0 02:08 pts/2 00:00:00 grep --color=auto stock.py
$ kill -15 15228
[1]+ Terminated python3 stock.py > log
$
実行後のデータを確認
S3を確認
おおよそ、2022/10/16 11:06(JST)頃まで確認
/* SQL クエリを記述するためのリファレンスポイントを作成するには、次の SQL クエリを実行して入力データの最初の 5 つのレコードを表示できます。 SELECT * FROM s3object s LIMIT 5 */
SELECT * FROM s3object s LIMIT 5
===
{
"EVENT_TIME": "2022-10-16T02:06:10.930143",
"TICKER": "AMZN",
"PRICE": 0.36
}
{
"EVENT_TIME": "2022-10-16T02:07:33.902039",
"TICKER": "AAPL",
"PRICE": 40.84
}
{
"EVENT_TIME": "2022-10-16T02:06:45.229515",
"TICKER": "INTC",
"PRICE": 17.85
}
{
"EVENT_TIME": "2022-10-16T02:07:43.729067",
"TICKER": "INTC",
"PRICE": 8.38
}
{
"EVENT_TIME": "2022-10-16T02:07:26.961327",
"TICKER": "INTC",
"PRICE": 91.21
}
Redshiftのマテビューを確認
実行後は、413,207件、最新のタイムスタンプは、「2022-10-16 02:12:36.803」。
REFRESH MATERIALIZED VIEW stream_test."m_stock"
SELECT MAX(approximatearrivaltimestamp) AS approximatearrivaltimestamp ,MAX(sequencenumber) AS sequencenumber , COUNT(1) AS CNT FROM stream_test."m_stock"
===
approximatearrivaltimestamp,sequencenumber,cnt
2022-10-16 02:12:36.803,49634265348415413172162399223022302411889445730605072450,413207
AggregationEnabledをFalseに設定
trueの状態でデータを取得します。
::: note info
Aggregationするとデータは、更新されない。
:::
コードとプロパティを修正します。
- stock.pyを修正します。(以下のメソッドで設定しているAggregationEnabledをfalseからtrueに変更します。)
private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
outputProperties.setProperty("AggregationEnabled", "true");
FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
sink.setDefaultStream(outputStreamName);
sink.setDefaultPartition("0");
return sink;
}
- ビルド&Artifact用のバケットにJarファイルをアップロードします。
実行前のデータを確認
Redshift
実行前は、3,049,946件、最新のタイムスタンプは、「2022-10-16 08:29:47.796」。
REFRESH MATERIALIZED VIEW stream_test."m_stock"
SELECT MAX(approximatearrivaltimestamp) AS approximatearrivaltimestamp ,MAX(sequencenumber) AS sequencenumber , COUNT(1) AS CNT FROM stream_test."m_stock"
===
approximatearrivaltimestamp,sequencenumber,cnt
2022-10-16 08:29:47.796 49634265755426313790552428346534646053406161846577135858 3049946
S3
最新のデータは、10/15 17:37 (10/15 8:37 UTC)。
実行
$ python3 stock.py > log7 &
[1] 16404
$
4.30分程度実行してログ出力を停止する。
$ ps -ef |grep 16404
ec2-user 16404 6511 15 08:50 pts/2 00:04:50 python3 stock.py
ec2-user 28596 6511 0 09:21 pts/2 00:00:00 grep --color=auto 16404
$
$ kill -15 16404
実行後のデータを確認
出力の件数を確認(EC2のstock.py)
約16.3万件(162,958件)増加。
$ ls -l log7
-rw-rw-r-- 1 ec2-user ec2-user 12808719 Oct 16 09:22 log7
$ wc -l log7
162958 log7
$
S3を確認
1.ファイル数を確認。対象は32ファイル(10ファイル+22ファイル)
2.レコード件数を確認。1ファイル約5300件程度(5,228件)
3.レコード件数 おおよそ169,600件(5,300件*32ファイル)と見積もる。EC2の件数とほぼ一致。
Redshiftのマテビューを確認
- 実行後は、3,049,946件、最新のタイムスタンプは、「2022-10-16 08:29:47.796」。
- 更新件数は、 0件(実行後:3,049,946 - 実行前:3,049,946)。
- 3049944
REFRESH MATERIALIZED VIEW stream_test."m_stock"
SELECT MAX(approximatearrivaltimestamp) AS approximatearrivaltimestamp ,MAX(sequencenumber) AS sequencenumber , COUNT(1) AS CNT FROM stream_test."m_stock"
approximatearrivaltimestamp,sequencenumber,cnt
2022-10-16 08:29:47.796 49634265755426313790552428346534646053406161846577135858 3049946
考察
- AggregationEnabledを有効化すると、Redshift Streaming Injectionではコンシュームできません。
- 無効化した場合は、取得できますが、高いスループットのシステムでは、若干きついかもしれません。
- GA後に、コンシュームできるようになることを期待します。
- はじめに、ドキュメントの制約を読んでおくこと。(特にプレビューの場合)
参考