GCPの新サービスとしてDatastreamが今月にGAになりました。
データベースのCDC(Change Data Capture)データをサーバレスでGCPに転送することができるサービスです。
現時点ではデータソースとしてはMySQLとOracleのみ、データディスティネーションとしてはGCSのみがサポートされています。
でが、将来的にはより多くのソース・ディスティネーションをサポートする予定もあるように見えます。
この記事ではCloud SQL上に構築したMySQLからDatastreamを使ってCDCデータを取得してGCSに保存する手順を紹介します。
Cloud SQLインスタンスの作成
最初にCloud SQLインスタンスを作成します。
データベースはMySQLのバージョン5.7を指定しています。
Datastreamが対応しているMySQLのバージョンは5.6, 5.7, 8.0なので、他のバージョンでもOKです。
なお、リージョン・CPU・メモリーは単なる検証目的なので、適当に設定しています。
gcloud sql instances create test-datastream-20211219 \
--root-password=<ルートパスワード> \
--database-version=MYSQL_5_7 \
--region=us-central1 \
--cpu=1 \
--memory=3840MiB
検証用のテーブルの作成
作成したインスタンスにログインし、検証用のテーブルを作成します。
gcloud sql connect test-datastream-20211219 --user=root
create database test_datastream;
use test_datastream;
create table users (
id INT AUTO_INCREMENT,
name TEXT,
email TEXT,
PRIMARY KEY (id)
);
Datastream用のユーザーの作成
DatastreamがMySQLにアクセスするためのユーザーを作成します。
CREATE USER 'datastream'@'%' IDENTIFIED BY '<パスワード>';
GRANT REPLICATION SLAVE, SELECT, RELOAD, REPLICATION CLIENT, LOCK TABLES, EXECUTE ON *.* TO 'datastream'@'%';
FLUSH PRIVILEGES;
binlogの有効化
Datastreamはbinlogを使ってCDCデータを読み出すので、有効化をします。
Cloud SQLでbinlogを有効化するためにはPoint-in-time recoveryの有効化をすれば良いです。
# Automated backupsが有効でないとPoint-in-time recoveryを有効化できないので、先に有効にしておく
gcloud sql instances patch test-datastream-20211219 --backup-start-time=00:00
# Point-in-time recoveryの有効化
gcloud sql instances patch test-datastream-20211219 \
--enable-bin-log \
--retained-transaction-log-days=7
# binaryLogEnabledがtrueになっていることを確認
gcloud sql instances describe test-datastream-20211219
Authorized Networkの追加
DatastreamのGlobal IPアドレスをCloud SQLのAuthorized Networkに追加して、Datastreamから接続できるようにします。
gcloud sql instances patch test-datastream-20211219 \
--authorized-networks=34.72.28.29/32,34.67.234.134/32,34.67.6.157/32,34.72.239.218/32,34.71.242.81/32
SSL/TLS証明書の取得
Cloud SQLとDatastreamとの間の通信はパブリックネットワークを通して行われるので、SSL証明書を取得します。
# クライアント証明書の作成
gcloud sql ssl client-certs create datastream client-key.pem --instance=test-datastream-20211219
# クライアント証明書の公開鍵の取得
gcloud sql ssl client-certs describe datastream --instance=test-datastream-20211219 --format="value(cert)" > client-cert.pem
# サーバー証明書の取得
gcloud sql instances describe test-datastream-20211219 --format="value(serverCaCert.cert)" > server-ca.pem
Cloud SQL用のConnection Profileの作成
ここからDatastreamの設定に入っていきます。
DatastreamのコンソールのConnection profilesタブからConnection profileの作成を行います。
最初にCloud SQLへの接続情報を入力します。
この時に指定するユーザー名とパスワードは上の手順でCloud SQLに作成したdatastreamユーザーのものにします。
次にSSL/TLS証明書の設定を行います。
Encryption typeはServer-clientを設定し、それぞれ以下のファイルをアップロードして、証明書をDatastreamに設定します。
最後にConnectivity methodにIP allowlistingを設定します。
この時に表示されているIPアドレスが、Cloud SQLのAuthorized networksに設定したIPアドレスと一致していることを確認します。
ここまででCloud SQLへの接続に必要な設定は全て完了したので、接続テストを行い正常にPassすることを確認します。
GCS用のConnection Profileの作成
GCS用のConnection Profileの作成は簡単です。
CDCデータが転送されるバケットを作成します。
gsutil mb gs://test-datastream-20211219
そしてそのバケットを指定したConnection Profileを作成します。
Streamの作成
ここまでの手順でソースとデスティネーション両方のConnection Profileを作成しました。
Streamを作成することで、それらを2つをつなげてCloud SQLのCDCデータをGCSにデータ連携してみます。
Streamの作成のためにDatastreamのStreamsタブを開き、Streamの新規作成画面を開きます。
Streamの名前やIDなどを入力し、SourceとDestinationをそれぞれMySQLとCloud Storageにします。
MySQLのConnection Profileとして、先ほど作成したものを選択します。
転送対象のテーブルを選択します。
今回はusersテーブルと、それ以降にtest_datastreamデータベースで作成される全てのテーブル(Future tables)を設定します。
デスティネーション側のConnection Profileには先ほど作成したGCSのものを設定します。
CDCデータを配置するパスや、ファイルフォーマットを設定します。
今回はテストなので、配置するパスはトップレベル、フォーマットはJSONにしました。
最後に全ての設定の確認画面が表示されます。
Validateを行うと、end-to-endで正常に動作するかどうかをチェックできます。
テーブルを更新してみる
Datastreamの設定は全て完了したので、実際にusersテーブルを更新してみます。
まずは、INSERTを3行やってみます。
INSERT INTO users (name, email) VALUES('taro', 'taro@example.com');
INSERT INTO users (name, email) VALUES('jiro', 'jiro@example.com');
INSERT INTO users (name, email) VALUES('saburo', 'saburo@example.com');
その後、GCSバケットを確認したところ、CDCデータが出力されていることが分かります。
$ gsutil ls "gs://test-datastream-20211219/**/*.jsonl.gz"
gs://test-datastream-20211219/test_datastream_users/2021/12/19/04/28/a43436ac21318abf7476d9c246ec199e08556067_mysql-cdc-binlog_-320765636_0_0.jsonl.gz
gs://test-datastream-20211219/test_datastream_users/2021/12/19/04/29/a43436ac21318abf7476d9c246ec199e08556067_mysql-cdc-binlog_-320765636_3_0.jsonl.gz
ファイルをダウンロードし、表示してみます。
gsutil cp "gs://test-datastream-20211219/**/*.jsonl.gz" .
# JSONLをJSONに変換して表示
gzcat *.jsonl.gz | jq --slurp '.' -
INSERTした行の情報が正常に保存されていることが分かります。
[
{
"uuid": "52cbe1e1-d089-430a-9137-413800000000",
"read_timestamp": "2021-12-19T04:28:55.306",
"source_timestamp": "2021-12-19T04:28:54.000",
"object": "test_datastream_users",
"read_method": "mysql-cdc-binlog",
"stream_name": "projects/XXXXXXXXXXXX/locations/us-central1/streams/test-datastream",
"schema_key": "a43436ac21318abf7476d9c246ec199e08556067",
"sort_keys": [
1639888134000,
"mysql-bin.000001",
374634
],
"source_metadata": {
"table": "users",
"database": "test_datastream",
"primary_keys": [
"id"
],
"log_file": "mysql-bin.000001",
"log_position": 374634,
"change_type": "INSERT",
"is_deleted": false
},
"payload": {
"id": 1,
"name": "taro",
"email": "taro@example.com"
}
},
{
"uuid": "15754f0b-8e01-42e0-a81e-319e00000000",
"read_timestamp": "2021-12-19T04:30:04.455",
"source_timestamp": "2021-12-19T04:29:18.000",
"object": "test_datastream_users",
"read_method": "mysql-cdc-binlog",
"stream_name": "projects/XXXXXXXXXXXX/locations/us-central1/streams/test-datastream",
"schema_key": "a43436ac21318abf7476d9c246ec199e08556067",
"sort_keys": [
1639888158000,
"mysql-bin.000001",
376365
],
"source_metadata": {
"table": "users",
"database": "test_datastream",
"primary_keys": [
"id"
],
"log_file": "mysql-bin.000001",
"log_position": 376365,
"change_type": "INSERT",
"is_deleted": false
},
"payload": {
"id": 2,
"name": "jiro",
"email": "jiro@example.com"
}
},
{
"uuid": "15754f0b-8e01-42e0-a81e-319e00000001",
"read_timestamp": "2021-12-19T04:30:04.455",
"source_timestamp": "2021-12-19T04:29:26.000",
"object": "test_datastream_users",
"read_method": "mysql-cdc-binlog",
"stream_name": "projects/XXXXXXXXXXXX/locations/us-central1/streams/test-datastream",
"schema_key": "a43436ac21318abf7476d9c246ec199e08556067",
"sort_keys": [
1639888166000,
"mysql-bin.000001",
377245
],
"source_metadata": {
"table": "users",
"database": "test_datastream",
"primary_keys": [
"id"
],
"log_file": "mysql-bin.000001",
"log_position": 377245,
"change_type": "INSERT",
"is_deleted": false
},
"payload": {
"id": 3,
"name": "saburo",
"email": "saburo@example.com"
}
}
]
UPDATEをした場合はこのようになります。
change_typeにSQL文の種類が入るようです。
UPDATE users SET name = 'TARO' WHERE id = 1;
[
{
"uuid": "5f317b3a-94b0-438f-a944-fc5f00000000",
"read_timestamp": "2021-12-19T04:56:52.301",
"source_timestamp": "2021-12-19T04:55:44.000",
"object": "test_datastream_users",
"read_method": "mysql-cdc-binlog",
"stream_name": "projects/XXXXXXXXXXXX/locations/us-central1/streams/test-datastream",
"schema_key": "a43436ac21318abf7476d9c246ec199e08556067",
"sort_keys": [
1639889744000,
"mysql-bin.000001",
467356
],
"source_metadata": {
"table": "users",
"database": "test_datastream",
"primary_keys": [
"id"
],
"log_file": "mysql-bin.000001",
"log_position": 467356,
"change_type": "UPDATE-INSERT",
"is_deleted": false
},
"payload": {
"id": 1,
"name": "TARO",
"email": "taro@example.com"
}
}
]
DELETEの場合はこのようになります。
DELETE FROM users WHERE id = 2;
[
{
"uuid": "7e518792-e133-42c6-bf4b-080d00000000",
"read_timestamp": "2021-12-19T05:01:05.389",
"source_timestamp": "2021-12-19T05:01:03.000",
"object": "test_datastream_users",
"read_method": "mysql-cdc-binlog",
"stream_name": "projects/XXXXXXXXXXXX/locations/us-central1/streams/test-datastream",
"schema_key": "a43436ac21318abf7476d9c246ec199e08556067",
"sort_keys": [
1639890063000,
"mysql-bin.000001",
485617
],
"source_metadata": {
"table": "users",
"database": "test_datastream",
"primary_keys": [
"id"
],
"log_file": "mysql-bin.000001",
"log_position": 485617,
"change_type": "DELETE",
"is_deleted": true
},
"payload": {
"id": 2,
"name": "jiro",
"email": "jiro@example.com"
}
}
]
参考: