以下の記事の設定が完了していることを前提にしているので、事前にお読みください。
以下の記事ではCDCデータをGCSに格納しました。
しかし、このままですと扱いづらいので、GCSに格納されたデータをBigQueryにもロードしてみます。
GCSに格納されたCDCをBigQueryにロードするためにはDataflowを使います。
そのためのテンプレートが用意されたいるので、これを使います。
GCSのPubSub通知を有効化する
GCSバケットにファイルが配置されたことをDataflowに通知するための設定を行います。
# GCSのPubSub通知のためのPubSub Topicを作成
gsutil notification create -t test-datastream -f json gs://test-datastream-20211219
# 上で作成したTopicからDataflowが読み出しを行うためのSubscriptionを作成
gcloud pubsub subscriptions create test-datastream --topic=test-datastream
BigQueryにデータセットを作成する
ロードされたデータを格納するためのデータセットをBigQueryに作成します。
# CDCデータそのものを格納するためのデータセット
bq mk --dataset test_datastream_log
# 定期的にCDCデータを反映した結果を格納するためのデータセット
bq mk --dataset test_datastream_final
Dataflow JOBの作成
最後にDataflow JOBを作成します。
gcloud beta dataflow flex-template run test-datastream \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Cloud_Datastream_to_BigQuery \
--region us-central1 \
--parameters inputFilePattern=gs://test-datastream-20211219,\
gcsPubSubSubscription=projects/<プロジェクトID>/subscriptions/test-datastream,\
inputFileFormat=json,\
outputStagingDatasetTemplate=test_datastream_log,\
outputDatasetTemplate=test_datastream_final,\
deadLetterQueueDirectory=gs://test-datastream-20211219/dlq
この後しばらく待つとDataflow JOBが動きだし、test_datastream_logとtest_datastream_finalにテーブルが作成されます。
BigQueryのテーブルにクエリを実行する
しばらく待つと、以下の2つのテーブルが生成されることが分かります。
上のテーブルにはCloud SQLから読みだしたCDCデータそのものが格納されています。
そのため、過去の特定の時点での情報を参照することに活用できます。
例えば、商品テーブルで考えれば過去時点での価格や在庫などが時系列データとして活用できます。
下のテーブルはそのCDCデータを定期的にBigQueryに対して反映し、Cloud SQLと同等になるように再構成されたテーブルです。
異種間データベースレプリケーションのようなものだと考えれば分かりやすいです。
Dataflowが mergeFrequencyMinutes
間隔でBigQueryのMERGE文を実行して再構成を行っています。
この値はデフォルトでは5分ですが、変更することも可能です。
費用とデータ鮮度のトレードオフになるので、実際に運用する際には適切な値にチューニングする必要がありそうです。
test_datastream_log.users_log
test_datastream_final.users