今回の要件
要件
DynamoDBに大量に投入(INSERT/UPDATE)されるデータをRedshiftにある程度リアルタイム(30分程度)で同期を行いたい。
DynamoDBのテーブル
- 商品テーブル(item)
- PK: item_id
- Attribute(属性): 300
- 想定レコード数:数十億レコード
- 更新頻度:瞬間風速10万レコード/秒。うち数万件程度のレコードは、INSERT/UPDATE後に、5秒以内にUPDATEが行われ得る。通常時は1,000レコード/秒程度。
- 初回はINSERTが行われて、2回目以降は属性値が更新されていく
- INSERT/UPDATE比率は5:5
Redshift
- DynamoDBのitemテーブルの属性値と同一のカラムをもったitemテーブルを用意
- DynamoDBでは行えない複雑な抽出や、集計を行う目的
アーキテクチャー
AWS GlueStreaming Jobを用いた同期
DynamoDB → Kinesis Data Stream → AWS Glue Streaming → Redshift
DynamoDBからKinesis Data Stream
DynamoDBは設定のみでKinesis Data Streamに繋ぐことができます。
これにより、変更データがキャプチャされて、Kinesis Data Streamに送られます。
Kinesis Data StreamからAWS Glue Streaming
こちらのブログを大いに参照しています。
Stream data from relational databases to Amazon Redshift with upserts using AWS Glue streaming jobs
上記のブログは以下をAWS Glue Streamingで実践しているものです。
https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html
簡単に述べますと以下のステップを通じて、RedshiftへのUPSERTを実現しています。
- Redshiftにターゲットテーブルと同じカラム構成の一時テーブルを作成する
- 一時テーブルに今回UPSERTしたいデータを投入する
- 一時テーブルに存在しているデータをターゲットテーブルから削除する
- 一時テーブルのデータを全てターゲットテーブルに投入する
- 一時テーブル削除
こちらのロジックを自分で実装するかAWS Glue Streamingを用いて実装するかということになり、今回はAWS Glue Streamingを利用することになりました。
なお自分で実装する場合は、
DynamoDB → Kinesis Data Stream → Kinesis Data Firehose → ALB → ECS → Redshift
を検討していました。Firehoseの出力先にHTTP Endpointを指定できるので、ALB -> ECSはECSがスケールしやすいと考えています。あとはECSの中で自分で上記のロジックを組むという算段です。
今回はコストと、保守性・耐障害性などを考えてマネージドでいこうと決断しました。AWS Glue Streamingはディザスタリカバリ対策も後々取れそうというのも大きな理由です(Streamの途中から復旧できる)。
では、AWS Glue Studioを使って、ジョブを作っていきます。
Pythonで記述します。記述内容は上記で紹介したこのブログを参考(コピー)しています。
長いですが、イメージしやすくなるため貼り付けておきます。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col
from pyspark.sql import DataFrame, Row
import datetime
from awsglue import DynamicFrame
import uuid
params = [
'JOB_NAME',
'TempDir',
'src_kinesis_data_stream_arn',
'dst_redshift_database_name',
'dst_redshift_schema_name',
'redshift_connection_name'
]
args = getResolvedOptions(sys.argv, params)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
src_kinesis_data_stream_arn = args["src_kinesis_data_stream_arn"]
dst_redshift_database_name = args["dst_redshift_database_name"]
dst_redshift_schema_name = args["dst_redshift_schema_name"]
redshift_connection_name = args["redshift_connection_name"]
# Script generated for node Kinesis Stream
dataframe_kinesis_stream_node = glueContext.create_data_frame.from_options(
connection_type="kinesis",
connection_options={
"typeOfData": "kinesis",
"streamARN": src_kinesis_data_stream_arn,
"classification": "json",
"startingPosition": "latest",
"inferSchema": "true",
},
transformation_ctx="dataframe_kinesis_stream_node",
)
def processBatch(data_frame, batchId):
if data_frame.count() > 0:
dynamic_frame = DynamicFrame.fromDF(
data_frame, glueContext, "from_data_frame"
)
data_frame = dynamic_frame.toDF()
data_frame = data_frame.filter((col('eventName') == 'INSERT') | (col('eventName') == 'MODIFY')).select(col('dynamodb.NewImage.*'))
dynamodb_data_dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
redshift_data_dynamic_frame = ApplyMapping.apply(
frame=dynamodb_data_dynamic_frame,
mappings=[
("item_id.S", "string", "item_id", "int"),
("item_name.S", "string", "item_name", "string"),
("start_sale_at.S", "string", "start_sale_at", "timestamp"),
("price.S", "string", "price", "int"),
("updated_at.S", "string", "updated_at", "timestamp"),
],
transformation_ctx="redshift_data_dynamic_frame",
)
data_frame = redshift_data_dynamic_frame.toDF()
data_frame = data_frame.orderBy(col("item_id").asc(), col("updated_at").desc())
data_frame = data_frame.dropDuplicates(["item_id"])# 1フレーム内での重複を削除
redshift_data_dynamic_frame_no_duplicate = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
redshift_data_dynamic_frame_no_duplicate.printSchema()
redshift_data_dynamic_frame_no_duplicate.show(100)
# Script generated for node Data Catalog table
randomUuidStr = str(uuid.uuid4()).replace("-", "")
stage_table_name = f"{dst_redshift_schema_name}.item_stage_{randomUuidStr}"
target_table_name = f"{dst_redshift_schema_name}.item"
pre_query = f"drop table if exists {stage_table_name}; create table {stage_table_name} as select * from {target_table_name} where 1=2;"# 一時テーブル作成
post_query = f"begin;delete from {target_table_name} using {stage_table_name} where {stage_table_name}.item_id = {target_table_name}.item_id; insert into {target_table_name} select * from {stage_table_name} ; drop table if exists {stage_table_name }; end;"# データを一時テーブルから移し替える
redshift_node = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=redshift_data_dynamic_frame_no_duplicate,
catalog_connection=redshift_connection_name,
connection_options={
"database": dst_redshift_database_name,
"dbtable": stage_table_name,
"preactions": pre_query,
"postactions": post_query,
},
redshift_tmp_dir=args['TempDir'],
transformation_ctx="redshift_node",
)
glueContext.forEachBatch(
frame=dataframe_kinesis_stream_node,
batch_function=processBatch,
options={
"windowSize": "30 seconds",
"checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
},
)
job.commit()
ここでポイントなのは1件づつINSERTするのではなくて、SQLの1Statementで一気に投入しているところです。
以下の設定条件で、10,000件程度を15秒で投入することができました。Worker typeとworker数を増やすことにより、もっとパフォーマンスを上げることが可能です。
マテリアライズドビューを作成する
Redshiftに重複データが生成される問題
「Redshiftにはプライマリーキーの制約がない」という大変大きな特徴があります。つまり重複したプライマリーキーのレコードが生成されうるのです。
https://docs.aws.amazon.com/ja_jp/redshift/latest/dg/t_Defining_constraints.html
今回のアーキテクチャーだと、Glueの部分で複数のワーカーが同じプライマリーキーのデータを捌いた場合には、重複データが生成される可能性が存在しています。これだと参照側で大変な不具合が発生しそうです。
マテリアライズドビューで重複データを隠す
RedshiftはRDBと同様にマテリアライズドビューの機能を持っています。これを使って、最新の更新日時(updated_at)をもったデータのみ参照させるということを実現します。
CREATE MATERIALIZED VIEW item_no_duplicate
AUTO REFRESH YES
AS
SELECT
item.*
FROM
item
JOIN
(SELECT
item.item_id, MAX(item.updated_at) AS latest
FROM
item
GROUP BY item.item_id) item_latest
ON item.updated_at = item_latest.latest
AND item.item_id = item.item_id
参照側はitemテーブルではなくて、item_no_duplicateテーブルを参照します。
重複データの定期削除
マテリアライズドビューを使って重複データを隠すことで参照側の問題は解消されます。
あとは、データ容量節約のために重複データをLambdaの定期実行で削除します。
このマテリアライズド・ビューの定義だと、itemテーブルにカラム追加したら追従してくれそうですが、現在はそうだはなく、カラム追加のためにはマテリアライズドビューの再生成が必要です。メンテナンスタイムが発生するので、ここは現在難点です。
module.exports.handler = async (event, context) => {
console.log('schedule clean redshift table event: ' + JSON.stringify(event));
let deleteDuplicateStatement = `BEGIN TRANSACTION;
CREATE TEMPORARY TABLE item_temp (LIKE item);
ALTER TABLE item_temp ADD COLUMN rank_no int;
INSERT INTO item_temp (WITH duplicate_items AS (SELECT *, ROW_NUMBER() OVER (PARTITION BY item_id ORDER BY item_id ASC, updated_at DESC) AS rank_no FROM item WHERE item_id IN (WITH duplicate_ids AS (SELECT item_id, count(*) AS count FROM item GROUP BY item_id HAVING count > 1) SELECT item_id FROM duplicate_ids)) SELECT * FROM duplicate_items WHERE rank_no = 1);
ALTER TABLE item_temp DROP COLUMN rank_no;
DELETE FROM item WHERE item_id IN (SELECT item_id FROM item_temp);
INSERT INTO item (SELECT * FROM item_temp);
DROP TABLE item_temp;
COMMIT;`;
console.log('delete duplicate data statement: ', deleteDuplicateStatement);
await redshift.executeStatement(deleteDuplicateStatement);
console.log(`clean duplicate data for table item in redshift successfully.`);
};
当初は
DynamoDB -> DynamoDB DataStream -> Kinesis Data Stream -> Kinesis Data Firehose(+Lambda Transform) -> Redshift
でいけるだろうと踏んでいたのですが、重複データが発生する問題にあたりました。またこのケースだとRedshiftをパブリックサブネットに配置する必要もあり、セキュリティ的に難しかったです。
他の方法も検討(外部ETL業者含む)してみたのですが、スループット問題が出たりして今回の要件を満たせる案が見つかりませんでした。
AWSのSAの方にもかなり相談しまして、ヒントを色々もらい、最終的にこの形に着地しました。
最後に、他の可能性として
Data Pipelineを利用した方法や、現在まだプレビューですが、AWS Glue Elastic Viewsというのも検討の余地があると思います。
もしくはNewSQL(TiDBなど)に手を出せば、DynamoDBとRedshiftを統合できたのかもしれません。
以上