3
0

More than 1 year has passed since last update.

Spanner Change Streams to BigQuery: Spannerトランザクション処理はどう連携されるか?

Posted at

ゼロバンク・デザインファクトリー株式会社(ZDF)で、データ基盤の構築・運用を担当しているキョウです。

背景

2022年5月28日にCloud Spanner Change StreamsがGA化してから、ヘビーなSpannerユーザーの私たちにとしては、とてもとても黙っているわけにはいきません。が、時の流れが早い...世の中には、サービスイメージをつかむための記事とCloud Skills Boostのラボがすでに存在しております。そのため、この記事は少し深いところ(Spannerトランザクション処理)を探ってみたいと思います。これからChange Streamsを検証してみたい方に少しでも役立てばいいなと思います。

前書き

今回の検証には、Applicationチームがよく使うSpring Bootを使いますが、Githubにあるサンプルソースを修正して利用しています。記事の所々リンクをつけるようにしますので、適宜にご参照いただければと思います。

準備

Spannerテーブルの作成について、後述のソースコードで実現したため、まず、準備としてはSpannerインスタンス、Databaseと格納先のBigQuery Datasetを用意します。
また、以下のDDL構文で全てのテーブルを監視するChange Streamsを作成します。

CREATE CHANGE STREAM all_change_stream
  FOR ALL;

最後に、下記のようなコマンドでDataflow Template(Spanner Change Streams to BigQuery) Jobを起動します。

gcloud dataflow flex-template run {job_name} \
  --project {project_name} \
  --template-file-gcs-location gs://dataflow-templates-asia-northeast1/latest/flex/Spanner_Change_Streams_to_BigQuery \
  --region {spanner_instance_region} \
  --parameters "spannerInstanceId={spanner_instance_id},spannerDatabase={spanner_database_id},spannerMetadataInstanceId={spanner_metadata_instance_id},spannerMetadataDatabase={spanner_meta_database_id},spannerChangeStreamName=all_change_stream,bigQueryDataset={bigquery_dataset_id}"

トランザクション処理(Update -> Delete)

spring initializrからアプリケーションのひな型を作成してから、Spring Framework on Google Cloudドキュメントに記載された「Sample application using higher-level Spanner Repository capabilities」を参照して、build.gradleに依存関係、application.propertiesにSpanner接続情報を追記します。

そして、サンプルソースコードのテーブルとデータ作成部分のソースコードを残し、以下のトランザクション処理を追加します。

this.tradeRepository.performReadWriteTransaction(transactionTradeRepository -> {
  Iterable<Trade> all = transactionTradeRepository.findAll();

  LOGGER.info("update Action: BUY to RETURN");
  StreamSupport.stream(all.spliterator(), false)
      .filter(record -> record.getAction().equals("BUY"))
      .forEach(record -> {
        record.setAction("RETURN");
        transactionTradeRepository.save(record);
        LOGGER.info(record.toString());
      });

  LOGGER.info("delete TradeId: 1 and TraderId: demo_trader2");
  StreamSupport.stream(all.spliterator(), false)
      .filter(record -> record.getTradeId().equals("1") && record.getTraderId().equals("demo_trader2"))
      .forEach(transactionTradeRepository::delete);

  return null;
});

そうすると、tradeテーブルは以下(太字:更新、取り消し線:削除)のように更新されます。

trade_id action price trader_id ...
1 BUY→RETURN 100.0 demo_trader1 ...
2 BUY→RETURN 105.0 demo_trader1 ...
3 BUY→RETURN 100.0 demo_trader1 ...
1 BUY→RETURN 100.0 demo_trader2 ...
2 BUY→RETURN 103.0 demo_trader2 ...
3 SELL 100.0 demo_trader2 ...
1 SELL 98.0 demo_trader3 ...
2 SELL 110.0 demo_trader3 ...

BigQueryに格納されたデータ

Dataflow Jobが正常に作動すれば、BigQueryのテーブル(trade_changelog)に以下のようなレコード(※関係ない列は省略しています)が作成されます。
同じトランザクションであるため、すべてのレコードのcommit_timestampserver_transaction_idは同じ値になっています。公式Docの解釈もご参照ください。

trade_id trader_id action price mod_type commit_timestamp server_transaction_id record_sequence is_last_record_in_transaction_in_partition number_of_records_in_transaction number_of_partitions_in_transaction
1 demo_trader1 RETURN 100 UPDATE 2023-01-20 06:23:44.968031 UTC MTI5MzA1Mjk4NjEzMTExMTMzMTg= 1 TRUE 2 1
2 demo_trader1 RETURN 105 UPDATE 2023-01-20 06:23:44.968031 UTC MTI5MzA1Mjk4NjEzMTExMTMzMTg= 1 TRUE 2 1
3 demo_trader1 RETURN 100 UPDATE 2023-01-20 06:23:44.968031 UTC MTI5MzA1Mjk4NjEzMTExMTMzMTg= 1 TRUE 2 1
1 demo_trader2 DELETE 2023-01-20 06:23:44.968031 UTC MTI5MzA1Mjk4NjEzMTExMTMzMTg= 0 FALSE 2 1
2 demo_trader2 RETURN 103 UPDATE 2023-01-20 06:23:44.968031 UTC MTI5MzA1Mjk4NjEzMTExMTMzMTg= 1 TRUE 2 1

trade_id=1 and trader_id=demo_trader2の更新について、BUY → RETURNのUPDATEDELETEの二つ変更レコードがBigQueryテーブルに連携されると想定しましたが、DELETEのみ(太字の一行)がBigQueryに記録されています。
ただ、同じトランザクションでは、同じレコードに対して、更新してから削除するようなことはそもそも避けるべきだと思います。削除予定のレコードは先に消してから、他のレコードを更新するといいですね。どうしてもやむを得ない場合は、UPDATEした変更レコードはBigQueryに記録されないので、ご注意ください。

UPDATEが記録されない原因

UPDATEが記録されない原因について、もう少し探りたいと思います。公式Docにも記載されていますが、TVF functionのREAD_{Change Streams名}を使うことで、上述のトランザクションのChange Recordを確認することができます。

まず、以下のクエリでpartition tokenを取得します(指定したstart_timestampはトラザクション処理開始前の時間です)。

SELECT ChangeRecord FROM READ_all_change_stream(
  start_timestamp => "2023-02-20 18:50:00+0900",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 1000
);

返ってきたpartition tokenを使って、上記クエリのpartition_tokenをその値を指定し、partition tokenの件数分を実行すれば、トランザクションのChange Recordは確認できます。結果ですが、trade_id = "1" and trader_id = "demo_trader2"は下記のChange Record(DELETE)しか確認できませんから、Spanner Change StreamsにはUPDATEのChange Recordが存在していないことが判明しました。
ただ、UPDATEがSpring Bootのトランザクション処理で消えたのか、それともSpannerの内部処理で消えたのか、続けて調べる必要があります。分かり次第に、この記事を更新したいと思います。

[
  [
    {
      "trade_id": "1",
      "trader_id": "demo_trader2"
    },
    {},
    {
      "action": "BUY",
      "curve": [
        99.0,
        101.0
      ],
      "price": 100.0,
      "shares": 70.0,
      "symbol": "STOCK2"
    }
  ]
]
"DELETE"

蛇足

本記事はSpanner Change Streamsの使い方について、触れませんでした。Spanner Change Streamsを試したい方には、冒頭のCloud Skills Boostのラボをおすすめします。
また、90日間無償でSpannerを試せるので、GCPアカウントを所有している方もぜひSpannerインスタンスを作ってから試してみてください。※Dataflowなどの料金はかかります。

参考資料

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0