ゼロバンク・デザインファクトリー株式会社(ZDF)で、データ基盤の構築・運用を担当しているキョウです。
背景
2022年5月28日にCloud Spanner Change StreamsがGA化してから、ヘビーなSpannerユーザーの私たちにとしては、とてもとても黙っているわけにはいきません。が、時の流れが早い...世の中には、サービスイメージをつかむための記事とCloud Skills Boostのラボがすでに存在しております。そのため、この記事は少し深いところ(Spannerトランザクション処理)を探ってみたいと思います。これからChange Streamsを検証してみたい方に少しでも役立てばいいなと思います。
前書き
今回の検証には、Applicationチームがよく使うSpring Bootを使いますが、Githubにあるサンプルソースを修正して利用しています。記事の所々リンクをつけるようにしますので、適宜にご参照いただければと思います。
準備
Spannerテーブルの作成について、後述のソースコードで実現したため、まず、準備としてはSpannerインスタンス、Databaseと格納先のBigQuery Datasetを用意します。
また、以下のDDL構文で全てのテーブルを監視するChange Streamsを作成します。
CREATE CHANGE STREAM all_change_stream
FOR ALL;
最後に、下記のようなコマンドでDataflow Template(Spanner Change Streams to BigQuery) Jobを起動します。
gcloud dataflow flex-template run {job_name} \
--project {project_name} \
--template-file-gcs-location gs://dataflow-templates-asia-northeast1/latest/flex/Spanner_Change_Streams_to_BigQuery \
--region {spanner_instance_region} \
--parameters "spannerInstanceId={spanner_instance_id},spannerDatabase={spanner_database_id},spannerMetadataInstanceId={spanner_metadata_instance_id},spannerMetadataDatabase={spanner_meta_database_id},spannerChangeStreamName=all_change_stream,bigQueryDataset={bigquery_dataset_id}"
トランザクション処理(Update -> Delete)
spring initializrからアプリケーションのひな型を作成してから、Spring Framework on Google Cloudドキュメントに記載された「Sample application using higher-level Spanner Repository capabilities」を参照して、build.gradle
に依存関係、application.properties
にSpanner接続情報を追記します。
そして、サンプルソースコードのテーブルとデータ作成部分のソースコードを残し、以下のトランザクション処理を追加します。
this.tradeRepository.performReadWriteTransaction(transactionTradeRepository -> {
Iterable<Trade> all = transactionTradeRepository.findAll();
LOGGER.info("update Action: BUY to RETURN");
StreamSupport.stream(all.spliterator(), false)
.filter(record -> record.getAction().equals("BUY"))
.forEach(record -> {
record.setAction("RETURN");
transactionTradeRepository.save(record);
LOGGER.info(record.toString());
});
LOGGER.info("delete TradeId: 1 and TraderId: demo_trader2");
StreamSupport.stream(all.spliterator(), false)
.filter(record -> record.getTradeId().equals("1") && record.getTraderId().equals("demo_trader2"))
.forEach(transactionTradeRepository::delete);
return null;
});
そうすると、tradeテーブルは以下(太字:更新、取り消し線:削除)のように更新されます。
trade_id | action | price | trader_id | ... |
---|---|---|---|---|
1 | BUY→RETURN | 100.0 | demo_trader1 | ... |
2 | BUY→RETURN | 105.0 | demo_trader1 | ... |
3 | BUY→RETURN | 100.0 | demo_trader1 | ... |
... | ||||
2 | BUY→RETURN | 103.0 | demo_trader2 | ... |
3 | SELL | 100.0 | demo_trader2 | ... |
1 | SELL | 98.0 | demo_trader3 | ... |
2 | SELL | 110.0 | demo_trader3 | ... |
BigQueryに格納されたデータ
Dataflow Jobが正常に作動すれば、BigQueryのテーブル(trade_changelog)に以下のようなレコード(※関係ない列は省略しています)が作成されます。
同じトランザクションであるため、すべてのレコードのcommit_timestamp
とserver_transaction_id
は同じ値になっています。公式Docの解釈もご参照ください。
trade_id | trader_id | action | price | mod_type | commit_timestamp | server_transaction_id | record_sequence | is_last_record_in_transaction_in_partition | number_of_records_in_transaction | number_of_partitions_in_transaction |
---|---|---|---|---|---|---|---|---|---|---|
1 | demo_trader1 | RETURN | 100 | UPDATE | 2023-01-20 06:23:44.968031 UTC | MTI5MzA1Mjk4NjEzMTExMTMzMTg= | 1 | TRUE | 2 | 1 |
2 | demo_trader1 | RETURN | 105 | UPDATE | 2023-01-20 06:23:44.968031 UTC | MTI5MzA1Mjk4NjEzMTExMTMzMTg= | 1 | TRUE | 2 | 1 |
3 | demo_trader1 | RETURN | 100 | UPDATE | 2023-01-20 06:23:44.968031 UTC | MTI5MzA1Mjk4NjEzMTExMTMzMTg= | 1 | TRUE | 2 | 1 |
1 | demo_trader2 | DELETE | 2023-01-20 06:23:44.968031 UTC | MTI5MzA1Mjk4NjEzMTExMTMzMTg= | 0 | FALSE | 2 | 1 | ||
2 | demo_trader2 | RETURN | 103 | UPDATE | 2023-01-20 06:23:44.968031 UTC | MTI5MzA1Mjk4NjEzMTExMTMzMTg= | 1 | TRUE | 2 | 1 |
trade_id=1 and trader_id=demo_trader2
の更新について、BUY → RETURNのUPDATE
とDELETE
の二つ変更レコードがBigQueryテーブルに連携されると想定しましたが、DELETE
のみ(太字の一行)がBigQueryに記録されています。
ただ、同じトランザクションでは、同じレコードに対して、更新してから削除するようなことはそもそも避けるべきだと思います。削除予定のレコードは先に消してから、他のレコードを更新するといいですね。どうしてもやむを得ない場合は、UPDATE
した変更レコードはBigQueryに記録されないので、ご注意ください。
UPDATEが記録されない原因
UPDATE
が記録されない原因について、もう少し探りたいと思います。公式Docにも記載されていますが、TVF functionのREAD_{Change Streams名}
を使うことで、上述のトランザクションのChange Recordを確認することができます。
まず、以下のクエリでpartition tokenを取得します(指定したstart_timestamp
はトラザクション処理開始前の時間です)。
SELECT ChangeRecord FROM READ_all_change_stream(
start_timestamp => "2023-02-20 18:50:00+0900",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 1000
);
返ってきたpartition tokenを使って、上記クエリのpartition_token
をその値を指定し、partition tokenの件数分を実行すれば、トランザクションのChange Recordは確認できます。結果ですが、trade_id = "1" and trader_id = "demo_trader2"
は下記のChange Record(DELETE
)しか確認できませんから、Spanner Change StreamsにはUPDATE
のChange Recordが存在していないことが判明しました。
ただ、UPDATE
がSpring Bootのトランザクション処理で消えたのか、それともSpannerの内部処理で消えたのか、続けて調べる必要があります。分かり次第に、この記事を更新したいと思います。
[
[
{
"trade_id": "1",
"trader_id": "demo_trader2"
},
{},
{
"action": "BUY",
"curve": [
99.0,
101.0
],
"price": 100.0,
"shares": 70.0,
"symbol": "STOCK2"
}
]
]
"DELETE"
蛇足
本記事はSpanner Change Streamsの使い方について、触れませんでした。Spanner Change Streamsを試したい方には、冒頭のCloud Skills Boostのラボをおすすめします。
また、90日間無償でSpannerを試せるので、GCPアカウントを所有している方もぜひSpannerインスタンスを作ってから試してみてください。※Dataflowなどの料金はかかります。
参考資料
- Google Cloud Blog: Change streams for Cloud Spanner: now generally available
- Medium: Change Streams in Cloud Spanner | Replication to BigQuery
- Google Cloud Skills Boost: Reconciling Account Data with Cloud Spanner Change Streams
- Google Cloud Doc: Google 提供の Dataflow ストリーミング テンプレート
- Google Cloud Doc: ストリーム パーティション、レコード、クエリを変更する
- Spring Doc: Spring Framework on Google Cloud
- Github: Spring Framework on Google Cloud Spanner Starter Example using Spring Data repositories