はじめに
会社の開発で、なにかしらマネージドなChange Data Captureを使いたい理由があり、 Datastream の存在を教えてもらったので、その検証を行った結果のまとめ。
参考にしたのは以下。(とても参考になりました。ありがとうございます)
この記事でやりたかったこと
画面から操作する場合はドキュメント等も多いのだが、それが面倒なので、全部CLI化したいと考えた。
が、そのための参考資料がなかったので記事化してみたという感じ。
事前準備
Datastream自体の作業に入る前に色々やることがあるので準備。
なお、前提として
- Cloud SQLにMySQLインスタンスは作成済みである
- binlogも有効になっている
- Cloud Storage に必要なバケットは作成済みである
という状態を想定している。
Datastream APIの有効化
ProjectにおいてAPIを有効にする。
ここだけはGUIでやった。
MySQLへのdatastreamユーザの追加
MySQL上にDataStreamの接続用ユーザーを作成する。
CREATE USER 'datastream'@'%' IDENTIFIED BY '[Password]';
GRANT REPLICATION SLAVE, SELECT, RELOAD, REPLICATION CLIENT, LOCK TABLES, EXECUTE ON *.* TO 'datastream'@'%';
FLUSH PRIVILEGES;
MySQLへのサンプルテーブル作成
Datastremが変更データをキャプチャする対象となるデータベース・テーブルを作成しておく。(あくまで検証目的)
create database test_datastream;
use test_datastream;
create table users (
id INT AUTO_INCREMENT,
name TEXT,
email TEXT,
PRIMARY KEY (id)
);
MySQL インスタンスのGlobal IPの確認
DatastreamにおけるMySQL接続プロファイルは、MySQLのパブリックIP経由で情報を取得する。このIPを事前に把握しておく(Datastreamから見るとDestinationのIPアドレスに相当する)
gcloud sql instances describe [cloud-sql instance name]
(略)
ipAddresses:
- ipAddress: XXX.XXX.XXX.XXX <-- これを記録しておく
type: PRIMARY
- ipAddress: YYY.YYY.YYY.YYY
type: PRIVATE
MySQL接続用クライアント証明書の作成
DatastreamがMySQLに接続する際に暗号化モードが選択可能だが、「暗号化なし」「サーバのみ」だと接続ができなかった。そこでクライアント証明書を作成しておく。これは後続のDatastreamにおける接続プロファイル作成時に使用するので手元にDLしておく。
クライアント証明書の作成
gcloud sql ssl client-certs create datastream client-key.pem --instance=[cloud-sql instance name]
Created [https://sqladmin.googleapis.com/sql/v1beta4/projects/[project-id]/instances/[cloud-sql instance name]/sslCerts/ba2bc40df7f9620ddcca983d4cfc0d6b82c90107].
NAME SHA1_FINGERPRINT EXPIRATION
datastream ba2bc40df7f9620ddcca983d4cfc0d6b82c90107 2032-10-04T01:13:56.978Z
クライアント証明書の公開鍵の取得
gcloud sql ssl client-certs describe datastream --instance=[cloud-sql instance name] --format="value(cert)" > client-cert.pem
サーバー証明書の取得
gcloud sql instances describe [cloud-sql instance name] --format="value(serverCaCert.cert)" > server-ca.pem
MySQL接続プロファイル作成時に必要となるStatic IPの取得
DatastreamにおけるMySQL接続プロファイルは、(今回の検証では)MySQLのパブリックIP経由で情報を取得する。その際、MySQL側に対する接続は、DatastreamのGlobal IPから行われることになる。これがSource IPに相当する。
このIP群から疎通したい場合、それらをMySQLの承認済みネットワークとして登録しておく必要があるため、まずはこの情報について事前に取得しておく。
gcloud beta datastream locations fetch-static-ips asia-northeast1
staticIps:
- 34.146.175.7
- 34.146.177.122
- 35.194.107.163
- 35.189.147.253
- 34.84.33.5
MySQLへの承認済みネットワークの追加
上記で記録してあるIPアドレスをCloud SQLのMySQLインスタンスに対し、承認済みネットワークとして追加しておく。
gcloud sql instances patch [cloud-sql instance name] --authorized-networks=34.146.175.7/32,34.146.177.122/32,35.194.107.163/32,35.189.147.253/32,34.84.33.5/32
When adding a new IP address to authorized networks, make sure to also include any IP addresses that have already been authorized. Otherwise, they will be
overwritten and de-authorized.
Do you want to continue (Y/n)? Y
The following message will be used for the patch API method.
{"name": "[cloud-sql instance name]", "project": "[project-id]", "settings": {"ipConfiguration": {"authorizedNetworks": [{"value": "34.146.175.7/32"}, {"value": "34.146.177.122/32"}, {"value": "35.194.107.163/32"}, {"value": "35.189.147.253/32"}, {"value": "34.84.33.5/32"}]}}}
Patching Cloud SQL instance...done.
Updated [https://sqladmin.googleapis.com/sql/v1beta4/projects/[project-id]/instances/[cloud-sql instance name]].
登録結果を確認しておく。
gcloud sql instances describe [cloud-sql instance name]
(中略)
ipConfiguration:
authorizedNetworks:
- kind: sql#aclEntry
name: ''
value: 34.146.175.7/32
- kind: sql#aclEntry
name: ''
value: 35.194.107.163/32
- kind: sql#aclEntry
name: ''
value: 34.146.177.122/32
- kind: sql#aclEntry
name: ''
value: 34.84.33.5/32
- kind: sql#aclEntry
name: ''
value: 35.189.147.253/32
Datastreamにおける接続プロファイルの作成
接続プロファイルとは、Datastreamからみた接続対象となる。今回は公式の DataStream UI の使用 をベースに、接続元をCloud SQL - MySQL(チュートリアルではOracle)、接続先をCloud Storageとする。
MySQL用接続プロファイルの作成
事前作業にて作成した各種証明書・鍵、およびMySQLのパスワード、Global IPを使用して以下を実行する。
ここで鍵や証明書の内容をどう渡せば良いのかがわからず苦労した。
gcloud datastream connection-profiles create \
mysql-connection-profile \
--location=asia-northeast1 \
--type=mysql \
--mysql-password='[mysql password]' \
--mysql-username=datastream \
--display-name=mysql-profile \
--mysql-hostname=[mysql global ip] \
--mysql-port=3306 \
--static-ip-connectivity \
--ca-certificate="$(cat ./server-ca.pem)" \
--client-certificate="$(cat ./client-cert.pem)" \
--client-key="$(cat ./client-key.pem)"
done: true
metadata:
'@type': type.googleapis.com/google.cloud.datastream.v1.OperationMetadata
apiVersion: v1
createTime: '2022-10-07T02:21:57.503096444Z'
endTime: '2022-10-07T02:21:57.543691267Z'
requestedCancellation: false
target: projects/[project-id]/locations/asia-northeast1/connectionProfiles/mysql-connection-profile
verb: create
name: projects/[project-id]/locations/asia-northeast1/operations/operation-1665109317432-5ea687960b49c-38fde302-5803c015
response:
'@type': type.googleapis.com/google.cloud.datastream.v1.ConnectionProfile
createTime: '2022-10-07T02:21:57.500837148Z'
displayName: mysql-profile
mysqlProfile:
hostname: 34.146.71.66
port: 3306
sslConfig:
caCertificateSet: true
clientCertificateSet: true
clientKeySet: true
username: datastream
name: projects/[project-id]/locations/asia-northeast1/connectionProfiles/mysql-connection-profile
staticServiceIpConnectivity: {}
updateTime: '2022-10-07T02:21:57.500837148Z'
Cloud Storage接続用プロファイルの作成
CDCが取得した差分をCloud Storage側に格納するための接続プロファイルを作成する。
gcloud datastream connection-profiles create \
cloud-storage-connection-profile \
--location=asia-northeast1 \
--type=google-cloud-storage \
--bucket-name=datastream-change-data-capture \
--root-path=/ \
--display-name=cloud-storage-profile \
--no-connectivity
done: true
metadata:
'@type': type.googleapis.com/google.cloud.datastream.v1.OperationMetadata
apiVersion: v1
createTime: '2022-10-07T02:26:44.363635659Z'
endTime: '2022-10-07T02:26:44.403121395Z'
requestedCancellation: false
target: projects/[project-id]/locations/asia-northeast1/connectionProfiles/cloud-storage-connection-profile
verb: create
name: projects/[project-id]/locations/asia-northeast1/operations/operation-1665109604350-5ea688a7abb41-85f94383-def717e0
response:
'@type': type.googleapis.com/google.cloud.datastream.v1.ConnectionProfile
createTime: '2022-10-07T02:26:44.360313144Z'
displayName: cloud-storage-profile
gcsProfile:
bucket: datastream-change-data-capture
rootPath: /
name: projects/[project-id]/locations/asia-northeast1/connectionProfiles/cloud-storage-connection-profile
staticServiceIpConnectivity: {}
updateTime: '2022-10-07T02:26:44.360313144Z'
Datastreamにおけるストリームの作成・起動
ストリームの作成
前節で作成した接続プロファイルを使用して、ストリームを作成する。
gcloud datastream streams create my-stream \
--location=asia-northeast1 \
--display-name=my-stream \
--source=projects/[project-id]/locations/asia-northeast1/connectionProfiles/mysql-connection-profile \
--mysql-source-config=./mysql-source-config.json \
--destination=projects/[project-id]/locations/asia-northeast1/connectionProfiles/cloud-storage-connection-profile \
--gcs-destination-config=./gcs-destination-config.json \
--backfill-all
done: false
metadata:
'@type': type.googleapis.com/google.cloud.datastream.v1.OperationMetadata
apiVersion: v1
createTime: '2022-10-07T08:23:09.720471516Z'
requestedCancellation: false
target: projects/[project-id]/locations/asia-northeast1/streams/my-stream
verb: create
name: projects/[project-id]/locations/asia-northeast1/operations/operation-1665130989417-5ea6d8520f449-5a368552-09d90bc5
なお、ここで done: false となっていても、作成自体には成功していればOK。作成されているかどうかは以下のコマンドで確認できる。
gcloud beta datastream streams list --location=asia-northeast1
NAME STATE SOURCE DESTINATION CREATE_TIME UPDATE_TIME
my-stream NOT_STARTED projects/[project-number]/locations/asia-northeast1/connectionProfiles/mysql-connection-profile projects/[project-number]/locations/asia-northeast1/connectionProfiles/cloud-storage-connection-profile 2022-10-07T08:47:34.670339445Z 2022-10-07T08:47:36.223868730Z
もしくはoperationオブジェクトから判断することも出来る。
gcloud beta datastream operations list --location=asia-northeast1
この結果からoperationを特定して、
gcloud beta datastream operations describe [operation_id] --location=asia-northeast1
ストリーム作成時の引数に使用したJSONファイルは以下の通り。
{
"include_objects": {
"mysql_databases": [
{
"database": "test_datastream"
}
]
}
}
{
"path": "/datastream",
"file_rotation_mb":5,
"file_rotation_interval":"15s",
"json_file_format": {}
}
ストリームの起動
作成されたストリームは自動で起動しない模様。state=PAUSEDの状態になると思うので、そこに対してstateのupdateをかけることで起動状態に持っていく。
gcloud beta datastream streams update my-stream --state=RUNNING --location=asia-northeast1
done: false
metadata:
'@type': type.googleapis.com/google.cloud.datastream.v1.OperationMetadata
apiVersion: v1
createTime: '2022-10-07T09:09:37.867489052Z'
requestedCancellation: false
target: projects/[project-id]/locations/asia-northeast1/streams/my-stream
verb: update
name: projects/[project-id]/locations/asia-northeast1/operations/operation-1665133777838-5ea6e2b54e2df-e98c1c03-41f53a93
以下のように、STATEが RUNNING になれば起動成功。
gcloud beta datastream streams list --location=asia-northeast1
Use `gcloud datastream streams list` instead.
NAME STATE SOURCE DESTINATION CREATE_TIME UPDATE_TIME
my-stream RUNNING projects/[project-number]/locations/asia-northeast1/connectionProfiles/mysql-connection-profile projects/[project-number]/locations/asia-northeast1/connectionProfiles/cloud-storage-connection-profile 2022-10-07T09:07:34.385858463Z 2022-10-07T09:11:06.997983272Z
Databaseの更新およびCDCによるキャプチャ結果の確認
CDCの対象となっているTableを更新する。
INSERT INTO users (name, email) VALUES('taro', 'taro@example.com');
INSERT INTO users (name, email) VALUES('jiro', 'jiro@example.com');
INSERT INTO users (name, email) VALUES('saburo', 'saburo@example.com');
結果がバケットに保存されるので内容を確認する。
gsutil ls -l "gs://datastream-change-data-capture/**/*.jsonl"
1809 2022-10-07T08:35:39Z gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/07/08/35/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-backfill-fulldump_404198369_5_0.jsonl
1803 2022-10-07T09:12:06Z gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/07/09/11/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-backfill-fulldump_-2112679048_6_0.jsonl
1920 2022-10-07T09:16:45Z gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/07/09/15/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_-2112679048_6_3.jsonl
3607 2022-10-18T01:43:56Z gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/01/43/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-backfill-fulldump_783378592_4_0.jsonl
1932 2022-10-18T04:45:48Z gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/04/44/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_1_0.jsonl
5777 2022-10-18T04:58:19Z gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/04/57/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_4_6.jsonl
1911 2022-10-18T07:40:00Z gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/07/28/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_3_0.jsonl
中身を見ると変更差分が格納されていることがわかる。
例えば gsutil ls の最新の結果をダウンロードして開いてみる。
gsutil cp gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/07/28/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_3_0.jsonl
Copying gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/07/28/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_3_0.jsonl...
/ [1 files][ 1.8 KiB/ 1.8 KiB]
Operation completed over 1 objects/1.8 KiB.
cat 9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_3_0.jsonl | jq --slurp '.' -
[
{
"uuid": "759c4298-abe5-477f-8e0d-832900000001",
"read_timestamp": "2022-10-18T07:39:33.775",
"source_timestamp": "2022-10-18T07:28:23.000",
"object": "test_datastream_users",
"read_method": "mysql-cdc-binlog",
"stream_name": "projects/520158312831/locations/asia-northeast1/streams/my-stream",
"schema_key": "9fef2a35f2c263f80aabd5f6217321da78138ed9",
"sort_keys": [
1666078103000,
"mysql-bin.000032",
200521
],
"source_metadata": {
"table": "users",
"database": "test_datastream",
"primary_keys": [
"id"
],
"log_file": "mysql-bin.000032",
"log_position": 200521,
"change_type": "INSERT",
"is_deleted": false
},
"payload": {
"id": 11,
"name": "jiro",
"email": "jiro@example.com"
}
},
{
"uuid": "759c4298-abe5-477f-8e0d-832900000000",
"read_timestamp": "2022-10-18T07:39:33.775",
"source_timestamp": "2022-10-18T07:28:23.000",
"object": "test_datastream_users",
"read_method": "mysql-cdc-binlog",
"stream_name": "projects/520158312831/locations/asia-northeast1/streams/my-stream",
"schema_key": "9fef2a35f2c263f80aabd5f6217321da78138ed9",
"sort_keys": [
1666078103000,
"mysql-bin.000032",
200190
],
"source_metadata": {
"table": "users",
"database": "test_datastream",
"primary_keys": [
"id"
],
"log_file": "mysql-bin.000032",
"log_position": 200190,
"change_type": "INSERT",
"is_deleted": false
},
"payload": {
"id": 10,
"name": "taro",
"email": "taro@example.com"
}
},
{
"uuid": "759c4298-abe5-477f-8e0d-832900000010",
"read_timestamp": "2022-10-18T07:39:33.775",
"source_timestamp": "2022-10-18T07:28:24.000",
"object": "test_datastream_users",
"read_method": "mysql-cdc-binlog",
"stream_name": "projects/520158312831/locations/asia-northeast1/streams/my-stream",
"schema_key": "9fef2a35f2c263f80aabd5f6217321da78138ed9",
"sort_keys": [
1666078104000,
"mysql-bin.000032",
200856
],
"source_metadata": {
"table": "users",
"database": "test_datastream",
"primary_keys": [
"id"
],
"log_file": "mysql-bin.000032",
"log_position": 200856,
"change_type": "INSERT",
"is_deleted": false
},
"payload": {
"id": 12,
"name": "saburo",
"email": "saburo@example.com"
}
}
]
最後に
この記事を書いている間にgcloudのupdateがかかったのだけど、update後は gcloud beta datastream
がDEPRECATED になっていた。
普通に gcloud datastream
で良さそう。
また、今回はMySQLのGlobal IP経由での情報取得を行っているが、Private側から接続する方法も調査したいと考えているので、まとまったら記事にしたい。