LoginSignup
8
5

More than 1 year has passed since last update.

Datastreamを使ってCloud SQLのCDCデータをGCSに格納する

Last updated at Posted at 2021-12-19

GCPの新サービスとしてDatastreamが今月にGAになりました。
データベースのCDC(Change Data Capture)データをサーバレスでGCPに転送することができるサービスです。
現時点ではデータソースとしてはMySQLとOracleのみ、データディスティネーションとしてはGCSのみがサポートされています。
でが、将来的にはより多くのソース・ディスティネーションをサポートする予定もあるように見えます。

この記事ではCloud SQL上に構築したMySQLからDatastreamを使ってCDCデータを取得してGCSに保存する手順を紹介します。

Cloud SQLインスタンスの作成

最初にCloud SQLインスタンスを作成します。
データベースはMySQLのバージョン5.7を指定しています。
Datastreamが対応しているMySQLのバージョンは5.6, 5.7, 8.0なので、他のバージョンでもOKです。

なお、リージョン・CPU・メモリーは単なる検証目的なので、適当に設定しています。

gcloud sql instances create test-datastream-20211219 \
  --root-password=<ルートパスワード> \
  --database-version=MYSQL_5_7 \
  --region=us-central1 \
  --cpu=1 \
  --memory=3840MiB

検証用のテーブルの作成

作成したインスタンスにログインし、検証用のテーブルを作成します。

gcloud sql connect test-datastream-20211219 --user=root
create database test_datastream;

use test_datastream;

create table users (
  id INT AUTO_INCREMENT,
  name TEXT,
  email TEXT,
  PRIMARY KEY (id)
);

Datastream用のユーザーの作成

DatastreamがMySQLにアクセスするためのユーザーを作成します。

CREATE USER 'datastream'@'%' IDENTIFIED BY '<パスワード>';
GRANT REPLICATION SLAVE, SELECT, RELOAD, REPLICATION CLIENT, LOCK TABLES, EXECUTE ON *.* TO 'datastream'@'%';
FLUSH PRIVILEGES;

binlogの有効化

Datastreamはbinlogを使ってCDCデータを読み出すので、有効化をします。
Cloud SQLでbinlogを有効化するためにはPoint-in-time recoveryの有効化をすれば良いです。

# Automated backupsが有効でないとPoint-in-time recoveryを有効化できないので、先に有効にしておく
gcloud sql instances patch test-datastream-20211219 --backup-start-time=00:00

# Point-in-time recoveryの有効化
gcloud sql instances patch test-datastream-20211219 \
                           --enable-bin-log \
                           --retained-transaction-log-days=7

# binaryLogEnabledがtrueになっていることを確認
gcloud sql instances describe test-datastream-20211219

Authorized Networkの追加

DatastreamのGlobal IPアドレスをCloud SQLのAuthorized Networkに追加して、Datastreamから接続できるようにします。

gcloud sql instances patch test-datastream-20211219 \
 --authorized-networks=34.72.28.29/32,34.67.234.134/32,34.67.6.157/32,34.72.239.218/32,34.71.242.81/32

SSL/TLS証明書の取得

Cloud SQLとDatastreamとの間の通信はパブリックネットワークを通して行われるので、SSL証明書を取得します。

# クライアント証明書の作成
gcloud sql ssl client-certs create datastream client-key.pem --instance=test-datastream-20211219

# クライアント証明書の公開鍵の取得
gcloud sql ssl client-certs describe datastream --instance=test-datastream-20211219 --format="value(cert)" > client-cert.pem

# サーバー証明書の取得
gcloud sql instances describe test-datastream-20211219 --format="value(serverCaCert.cert)" > server-ca.pem

Cloud SQL用のConnection Profileの作成

ここからDatastreamの設定に入っていきます。
DatastreamのコンソールのConnection profilesタブからConnection profileの作成を行います。

スクリーンショット 2021-12-19 12.33.14.png

最初にCloud SQLへの接続情報を入力します。
この時に指定するユーザー名とパスワードは上の手順でCloud SQLに作成したdatastreamユーザーのものにします。

スクリーンショット_2021-12-19_12_29_50.png

次にSSL/TLS証明書の設定を行います。
Encryption typeはServer-clientを設定し、それぞれ以下のファイルをアップロードして、証明書をDatastreamに設定します。

スクリーンショット 2021-12-19 12.31.37.png

最後にConnectivity methodにIP allowlistingを設定します。
この時に表示されているIPアドレスが、Cloud SQLのAuthorized networksに設定したIPアドレスと一致していることを確認します。

スクリーンショット 2021-12-19 12.31.54.png

ここまででCloud SQLへの接続に必要な設定は全て完了したので、接続テストを行い正常にPassすることを確認します。

スクリーンショット 2021-12-19 12.32.29.png

GCS用のConnection Profileの作成

GCS用のConnection Profileの作成は簡単です。

CDCデータが転送されるバケットを作成します。

gsutil mb gs://test-datastream-20211219

そしてそのバケットを指定したConnection Profileを作成します。

スクリーンショット 2021-12-19 13.06.23.png

Streamの作成

ここまでの手順でソースとデスティネーション両方のConnection Profileを作成しました。
Streamを作成することで、それらを2つをつなげてCloud SQLのCDCデータをGCSにデータ連携してみます。

Streamの作成のためにDatastreamのStreamsタブを開き、Streamの新規作成画面を開きます。

スクリーンショット 2021-12-19 13.09.41.png

Streamの名前やIDなどを入力し、SourceとDestinationをそれぞれMySQLとCloud Storageにします。

スクリーンショット 2021-12-19 13.11.01.png

MySQLのConnection Profileとして、先ほど作成したものを選択します。

スクリーンショット_2021-12-19_13_11_24.png

転送対象のテーブルを選択します。
今回はusersテーブルと、それ以降にtest_datastreamデータベースで作成される全てのテーブル(Future tables)を設定します。

スクリーンショット 2021-12-19 13.12.28.png

デスティネーション側のConnection Profileには先ほど作成したGCSのものを設定します。

スクリーンショット 2021-12-19 13.15.04.png

CDCデータを配置するパスや、ファイルフォーマットを設定します。
今回はテストなので、配置するパスはトップレベル、フォーマットはJSONにしました。

スクリーンショット 2021-12-19 13.15.58.png

最後に全ての設定の確認画面が表示されます。
Validateを行うと、end-to-endで正常に動作するかどうかをチェックできます。

スクリーンショット_2021-12-19_13_16_26.png

テーブルを更新してみる

Datastreamの設定は全て完了したので、実際にusersテーブルを更新してみます。

まずは、INSERTを3行やってみます。

INSERT INTO users (name, email) VALUES('taro', 'taro@example.com');
INSERT INTO users (name, email) VALUES('jiro', 'jiro@example.com');
INSERT INTO users (name, email) VALUES('saburo', 'saburo@example.com');

その後、GCSバケットを確認したところ、CDCデータが出力されていることが分かります。

$ gsutil ls "gs://test-datastream-20211219/**/*.jsonl.gz"
gs://test-datastream-20211219/test_datastream_users/2021/12/19/04/28/a43436ac21318abf7476d9c246ec199e08556067_mysql-cdc-binlog_-320765636_0_0.jsonl.gz
gs://test-datastream-20211219/test_datastream_users/2021/12/19/04/29/a43436ac21318abf7476d9c246ec199e08556067_mysql-cdc-binlog_-320765636_3_0.jsonl.gz

ファイルをダウンロードし、表示してみます。

gsutil cp "gs://test-datastream-20211219/**/*.jsonl.gz" .

# JSONLをJSONに変換して表示
gzcat *.jsonl.gz | jq --slurp '.' -

INSERTした行の情報が正常に保存されていることが分かります。

[
  {
    "uuid": "52cbe1e1-d089-430a-9137-413800000000",
    "read_timestamp": "2021-12-19T04:28:55.306",
    "source_timestamp": "2021-12-19T04:28:54.000",
    "object": "test_datastream_users",
    "read_method": "mysql-cdc-binlog",
    "stream_name": "projects/XXXXXXXXXXXX/locations/us-central1/streams/test-datastream",
    "schema_key": "a43436ac21318abf7476d9c246ec199e08556067",
    "sort_keys": [
      1639888134000,
      "mysql-bin.000001",
      374634
    ],
    "source_metadata": {
      "table": "users",
      "database": "test_datastream",
      "primary_keys": [
        "id"
      ],
      "log_file": "mysql-bin.000001",
      "log_position": 374634,
      "change_type": "INSERT",
      "is_deleted": false
    },
    "payload": {
      "id": 1,
      "name": "taro",
      "email": "taro@example.com"
    }
  },
  {
    "uuid": "15754f0b-8e01-42e0-a81e-319e00000000",
    "read_timestamp": "2021-12-19T04:30:04.455",
    "source_timestamp": "2021-12-19T04:29:18.000",
    "object": "test_datastream_users",
    "read_method": "mysql-cdc-binlog",
    "stream_name": "projects/XXXXXXXXXXXX/locations/us-central1/streams/test-datastream",
    "schema_key": "a43436ac21318abf7476d9c246ec199e08556067",
    "sort_keys": [
      1639888158000,
      "mysql-bin.000001",
      376365
    ],
    "source_metadata": {
      "table": "users",
      "database": "test_datastream",
      "primary_keys": [
        "id"
      ],
      "log_file": "mysql-bin.000001",
      "log_position": 376365,
      "change_type": "INSERT",
      "is_deleted": false
    },
    "payload": {
      "id": 2,
      "name": "jiro",
      "email": "jiro@example.com"
    }
  },
  {
    "uuid": "15754f0b-8e01-42e0-a81e-319e00000001",
    "read_timestamp": "2021-12-19T04:30:04.455",
    "source_timestamp": "2021-12-19T04:29:26.000",
    "object": "test_datastream_users",
    "read_method": "mysql-cdc-binlog",
    "stream_name": "projects/XXXXXXXXXXXX/locations/us-central1/streams/test-datastream",
    "schema_key": "a43436ac21318abf7476d9c246ec199e08556067",
    "sort_keys": [
      1639888166000,
      "mysql-bin.000001",
      377245
    ],
    "source_metadata": {
      "table": "users",
      "database": "test_datastream",
      "primary_keys": [
        "id"
      ],
      "log_file": "mysql-bin.000001",
      "log_position": 377245,
      "change_type": "INSERT",
      "is_deleted": false
    },
    "payload": {
      "id": 3,
      "name": "saburo",
      "email": "saburo@example.com"
    }
  }
]

UPDATEをした場合はこのようになります。
change_typeにSQL文の種類が入るようです。

UPDATE users SET name = 'TARO' WHERE id = 1;
[
  {
    "uuid": "5f317b3a-94b0-438f-a944-fc5f00000000",
    "read_timestamp": "2021-12-19T04:56:52.301",
    "source_timestamp": "2021-12-19T04:55:44.000",
    "object": "test_datastream_users",
    "read_method": "mysql-cdc-binlog",
    "stream_name": "projects/XXXXXXXXXXXX/locations/us-central1/streams/test-datastream",
    "schema_key": "a43436ac21318abf7476d9c246ec199e08556067",
    "sort_keys": [
      1639889744000,
      "mysql-bin.000001",
      467356
    ],
    "source_metadata": {
      "table": "users",
      "database": "test_datastream",
      "primary_keys": [
        "id"
      ],
      "log_file": "mysql-bin.000001",
      "log_position": 467356,
      "change_type": "UPDATE-INSERT",
      "is_deleted": false
    },
    "payload": {
      "id": 1,
      "name": "TARO",
      "email": "taro@example.com"
    }
  }
]

DELETEの場合はこのようになります。

DELETE FROM users WHERE id = 2;
[
  {
    "uuid": "7e518792-e133-42c6-bf4b-080d00000000",
    "read_timestamp": "2021-12-19T05:01:05.389",
    "source_timestamp": "2021-12-19T05:01:03.000",
    "object": "test_datastream_users",
    "read_method": "mysql-cdc-binlog",
    "stream_name": "projects/XXXXXXXXXXXX/locations/us-central1/streams/test-datastream",
    "schema_key": "a43436ac21318abf7476d9c246ec199e08556067",
    "sort_keys": [
      1639890063000,
      "mysql-bin.000001",
      485617
    ],
    "source_metadata": {
      "table": "users",
      "database": "test_datastream",
      "primary_keys": [
        "id"
      ],
      "log_file": "mysql-bin.000001",
      "log_position": 485617,
      "change_type": "DELETE",
      "is_deleted": true
    },
    "payload": {
      "id": 2,
      "name": "jiro",
      "email": "jiro@example.com"
    }
  }
]

参考:

8
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
5