LoginSignup
0
0

More than 1 year has passed since last update.

DatastreamをつかったChange Data Capture環境構築 (MySQL -> Datastream -> Cloud Storage)

Last updated at Posted at 2022-10-07

はじめに

会社の開発で、なにかしらマネージドなChange Data Captureを使いたい理由があり、 Datastream の存在を教えてもらったので、その検証を行った結果のまとめ。

参考にしたのは以下。(とても参考になりました。ありがとうございます)

この記事でやりたかったこと

画面から操作する場合はドキュメント等も多いのだが、それが面倒なので、全部CLI化したいと考えた。
が、そのための参考資料がなかったので記事化してみたという感じ。

事前準備

Datastream自体の作業に入る前に色々やることがあるので準備。

なお、前提として

  • Cloud SQLにMySQLインスタンスは作成済みである
    • binlogも有効になっている
  • Cloud Storage に必要なバケットは作成済みである

という状態を想定している。

Datastream APIの有効化

ProjectにおいてAPIを有効にする。
ここだけはGUIでやった。

image.png

MySQLへのdatastreamユーザの追加

MySQL上にDataStreamの接続用ユーザーを作成する。

CREATE USER 'datastream'@'%' IDENTIFIED BY '[Password]';
GRANT REPLICATION SLAVE, SELECT, RELOAD, REPLICATION CLIENT, LOCK TABLES, EXECUTE ON *.* TO 'datastream'@'%';
FLUSH PRIVILEGES;

MySQLへのサンプルテーブル作成

Datastremが変更データをキャプチャする対象となるデータベース・テーブルを作成しておく。(あくまで検証目的)

create database test_datastream;

use test_datastream;

create table users (
  id INT AUTO_INCREMENT,
  name TEXT,
  email TEXT,
  PRIMARY KEY (id)
);

MySQL インスタンスのGlobal IPの確認

DatastreamにおけるMySQL接続プロファイルは、MySQLのパブリックIP経由で情報を取得する。このIPを事前に把握しておく(Datastreamから見るとDestinationのIPアドレスに相当する)

gcloud sql instances describe [cloud-sql instance name]

(略)
ipAddresses:
- ipAddress: XXX.XXX.XXX.XXX <-- これを記録しておく
  type: PRIMARY
- ipAddress: YYY.YYY.YYY.YYY
  type: PRIVATE

MySQL接続用クライアント証明書の作成

DatastreamがMySQLに接続する際に暗号化モードが選択可能だが、「暗号化なし」「サーバのみ」だと接続ができなかった。そこでクライアント証明書を作成しておく。これは後続のDatastreamにおける接続プロファイル作成時に使用するので手元にDLしておく。

クライアント証明書の作成

gcloud sql ssl client-certs create datastream client-key.pem --instance=[cloud-sql instance name]

Created [https://sqladmin.googleapis.com/sql/v1beta4/projects/[project-id]/instances/[cloud-sql instance name]/sslCerts/ba2bc40df7f9620ddcca983d4cfc0d6b82c90107].
NAME        SHA1_FINGERPRINT                          EXPIRATION
datastream  ba2bc40df7f9620ddcca983d4cfc0d6b82c90107  2032-10-04T01:13:56.978Z

クライアント証明書の公開鍵の取得

gcloud sql ssl client-certs describe datastream --instance=[cloud-sql instance name] --format="value(cert)" > client-cert.pem

サーバー証明書の取得

gcloud sql instances describe [cloud-sql instance name] --format="value(serverCaCert.cert)" > server-ca.pem

MySQL接続プロファイル作成時に必要となるStatic IPの取得

DatastreamにおけるMySQL接続プロファイルは、(今回の検証では)MySQLのパブリックIP経由で情報を取得する。その際、MySQL側に対する接続は、DatastreamのGlobal IPから行われることになる。これがSource IPに相当する。

このIP群から疎通したい場合、それらをMySQLの承認済みネットワークとして登録しておく必要があるため、まずはこの情報について事前に取得しておく。

gcloud beta datastream locations fetch-static-ips asia-northeast1
staticIps:
- 34.146.175.7
- 34.146.177.122
- 35.194.107.163
- 35.189.147.253
- 34.84.33.5

MySQLへの承認済みネットワークの追加

上記で記録してあるIPアドレスをCloud SQLのMySQLインスタンスに対し、承認済みネットワークとして追加しておく。

gcloud sql instances patch [cloud-sql instance name] --authorized-networks=34.146.175.7/32,34.146.177.122/32,35.194.107.163/32,35.189.147.253/32,34.84.33.5/32
When adding a new IP address to authorized networks, make sure to also include any IP addresses that have already been authorized. Otherwise, they will be
 overwritten and de-authorized.

Do you want to continue (Y/n)?  Y

The following message will be used for the patch API method.
{"name": "[cloud-sql instance name]", "project": "[project-id]", "settings": {"ipConfiguration": {"authorizedNetworks": [{"value": "34.146.175.7/32"}, {"value": "34.146.177.122/32"}, {"value": "35.194.107.163/32"}, {"value": "35.189.147.253/32"}, {"value": "34.84.33.5/32"}]}}}
Patching Cloud SQL instance...done.
Updated [https://sqladmin.googleapis.com/sql/v1beta4/projects/[project-id]/instances/[cloud-sql instance name]].

登録結果を確認しておく。

gcloud sql instances describe [cloud-sql instance name]

(中略)
  ipConfiguration:
    authorizedNetworks:
    - kind: sql#aclEntry
      name: ''
      value: 34.146.175.7/32
    - kind: sql#aclEntry
      name: ''
      value: 35.194.107.163/32
    - kind: sql#aclEntry
      name: ''
      value: 34.146.177.122/32
    - kind: sql#aclEntry
      name: ''
      value: 34.84.33.5/32
    - kind: sql#aclEntry
      name: ''
      value: 35.189.147.253/32

Datastreamにおける接続プロファイルの作成

接続プロファイルとは、Datastreamからみた接続対象となる。今回は公式の DataStream UI の使用 をベースに、接続元をCloud SQL - MySQL(チュートリアルではOracle)、接続先をCloud Storageとする。

MySQL用接続プロファイルの作成

事前作業にて作成した各種証明書・鍵、およびMySQLのパスワード、Global IPを使用して以下を実行する。
ここで鍵や証明書の内容をどう渡せば良いのかがわからず苦労した。

gcloud datastream connection-profiles create \
          mysql-connection-profile \
          --location=asia-northeast1 \
          --type=mysql \
          --mysql-password='[mysql password]' \
          --mysql-username=datastream \
          --display-name=mysql-profile \
          --mysql-hostname=[mysql global ip] \
          --mysql-port=3306 \
          --static-ip-connectivity \
          --ca-certificate="$(cat ./server-ca.pem)" \
          --client-certificate="$(cat ./client-cert.pem)" \
          --client-key="$(cat ./client-key.pem)"
          
done: true
metadata:
  '@type': type.googleapis.com/google.cloud.datastream.v1.OperationMetadata
  apiVersion: v1
  createTime: '2022-10-07T02:21:57.503096444Z'
  endTime: '2022-10-07T02:21:57.543691267Z'
  requestedCancellation: false
  target: projects/[project-id]/locations/asia-northeast1/connectionProfiles/mysql-connection-profile
  verb: create
name: projects/[project-id]/locations/asia-northeast1/operations/operation-1665109317432-5ea687960b49c-38fde302-5803c015
response:
  '@type': type.googleapis.com/google.cloud.datastream.v1.ConnectionProfile
  createTime: '2022-10-07T02:21:57.500837148Z'
  displayName: mysql-profile
  mysqlProfile:
    hostname: 34.146.71.66
    port: 3306
    sslConfig:
      caCertificateSet: true
      clientCertificateSet: true
      clientKeySet: true
    username: datastream
  name: projects/[project-id]/locations/asia-northeast1/connectionProfiles/mysql-connection-profile
  staticServiceIpConnectivity: {}
  updateTime: '2022-10-07T02:21:57.500837148Z'

Cloud Storage接続用プロファイルの作成

CDCが取得した差分をCloud Storage側に格納するための接続プロファイルを作成する。

gcloud datastream connection-profiles create \
          cloud-storage-connection-profile \
          --location=asia-northeast1 \
          --type=google-cloud-storage \
          --bucket-name=datastream-change-data-capture \
          --root-path=/ \
          --display-name=cloud-storage-profile \
          --no-connectivity

done: true
metadata:
  '@type': type.googleapis.com/google.cloud.datastream.v1.OperationMetadata
  apiVersion: v1
  createTime: '2022-10-07T02:26:44.363635659Z'
  endTime: '2022-10-07T02:26:44.403121395Z'
  requestedCancellation: false
  target: projects/[project-id]/locations/asia-northeast1/connectionProfiles/cloud-storage-connection-profile
  verb: create
name: projects/[project-id]/locations/asia-northeast1/operations/operation-1665109604350-5ea688a7abb41-85f94383-def717e0
response:
  '@type': type.googleapis.com/google.cloud.datastream.v1.ConnectionProfile
  createTime: '2022-10-07T02:26:44.360313144Z'
  displayName: cloud-storage-profile
  gcsProfile:
    bucket: datastream-change-data-capture
    rootPath: /
  name: projects/[project-id]/locations/asia-northeast1/connectionProfiles/cloud-storage-connection-profile
  staticServiceIpConnectivity: {}
  updateTime: '2022-10-07T02:26:44.360313144Z'

Datastreamにおけるストリームの作成・起動

ストリームの作成

前節で作成した接続プロファイルを使用して、ストリームを作成する。

gcloud datastream streams create my-stream \
    --location=asia-northeast1 \
    --display-name=my-stream \
    --source=projects/[project-id]/locations/asia-northeast1/connectionProfiles/mysql-connection-profile \
    --mysql-source-config=./mysql-source-config.json \
    --destination=projects/[project-id]/locations/asia-northeast1/connectionProfiles/cloud-storage-connection-profile \
    --gcs-destination-config=./gcs-destination-config.json \
    --backfill-all
    
done: false
metadata:
  '@type': type.googleapis.com/google.cloud.datastream.v1.OperationMetadata
  apiVersion: v1
  createTime: '2022-10-07T08:23:09.720471516Z'
  requestedCancellation: false
  target: projects/[project-id]/locations/asia-northeast1/streams/my-stream
  verb: create
name: projects/[project-id]/locations/asia-northeast1/operations/operation-1665130989417-5ea6d8520f449-5a368552-09d90bc5

なお、ここで done: false となっていても、作成自体には成功していればOK。作成されているかどうかは以下のコマンドで確認できる。

gcloud beta datastream streams list --location=asia-northeast1

NAME       STATE        SOURCE                                                                                       DESTINATION                                                                                          CREATE_TIME                     UPDATE_TIME
my-stream  NOT_STARTED  projects/[project-number]/locations/asia-northeast1/connectionProfiles/mysql-connection-profile  projects/[project-number]/locations/asia-northeast1/connectionProfiles/cloud-storage-connection-profile  2022-10-07T08:47:34.670339445Z  2022-10-07T08:47:36.223868730Z

もしくはoperationオブジェクトから判断することも出来る。

gcloud beta datastream operations list --location=asia-northeast1

この結果からoperationを特定して、

gcloud beta datastream operations describe [operation_id] --location=asia-northeast1

ストリーム作成時の引数に使用したJSONファイルは以下の通り。

mysql-source-config.json
{
  "include_objects": {
    "mysql_databases": [
      {
        "database": "test_datastream"
      }
    ]
  }
}
gcs-destination-config.json
{
  "path": "/datastream",
  "file_rotation_mb":5,
  "file_rotation_interval":"15s",
  "json_file_format": {}
}

ストリームの起動

作成されたストリームは自動で起動しない模様。state=PAUSEDの状態になると思うので、そこに対してstateのupdateをかけることで起動状態に持っていく。

gcloud beta datastream streams update my-stream --state=RUNNING --location=asia-northeast1

done: false
metadata:
  '@type': type.googleapis.com/google.cloud.datastream.v1.OperationMetadata
  apiVersion: v1
  createTime: '2022-10-07T09:09:37.867489052Z'
  requestedCancellation: false
  target: projects/[project-id]/locations/asia-northeast1/streams/my-stream
  verb: update
name: projects/[project-id]/locations/asia-northeast1/operations/operation-1665133777838-5ea6e2b54e2df-e98c1c03-41f53a93

以下のように、STATEが RUNNING になれば起動成功。

gcloud beta datastream streams list --location=asia-northeast1
Use `gcloud datastream streams list` instead.

NAME       STATE    SOURCE                                                                                       DESTINATION                                                                                          CREATE_TIME                     UPDATE_TIME
my-stream  RUNNING  projects/[project-number]/locations/asia-northeast1/connectionProfiles/mysql-connection-profile  projects/[project-number]/locations/asia-northeast1/connectionProfiles/cloud-storage-connection-profile  2022-10-07T09:07:34.385858463Z  2022-10-07T09:11:06.997983272Z

Databaseの更新およびCDCによるキャプチャ結果の確認

CDCの対象となっているTableを更新する。

INSERT INTO users (name, email) VALUES('taro', 'taro@example.com');
INSERT INTO users (name, email) VALUES('jiro', 'jiro@example.com');
INSERT INTO users (name, email) VALUES('saburo', 'saburo@example.com');

結果がバケットに保存されるので内容を確認する。

gsutil ls -l "gs://datastream-change-data-capture/**/*.jsonl"
      1809  2022-10-07T08:35:39Z  gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/07/08/35/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-backfill-fulldump_404198369_5_0.jsonl
      1803  2022-10-07T09:12:06Z  gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/07/09/11/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-backfill-fulldump_-2112679048_6_0.jsonl
      1920  2022-10-07T09:16:45Z  gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/07/09/15/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_-2112679048_6_3.jsonl
      3607  2022-10-18T01:43:56Z  gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/01/43/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-backfill-fulldump_783378592_4_0.jsonl
      1932  2022-10-18T04:45:48Z  gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/04/44/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_1_0.jsonl
      5777  2022-10-18T04:58:19Z  gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/04/57/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_4_6.jsonl
      1911  2022-10-18T07:40:00Z  gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/07/28/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_3_0.jsonl

中身を見ると変更差分が格納されていることがわかる。

例えば gsutil ls の最新の結果をダウンロードして開いてみる。

gsutil cp gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/07/28/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_3_0.jsonl
Copying gs://datastream-change-data-capture/datastream/test_datastream_users/2022/10/18/07/28/9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_3_0.jsonl...
/ [1 files][  1.8 KiB/  1.8 KiB]
Operation completed over 1 objects/1.8 KiB.

cat 9fef2a35f2c263f80aabd5f6217321da78138ed9_mysql-cdc-binlog_783378592_3_0.jsonl | jq --slurp '.' -

[
  {
    "uuid": "759c4298-abe5-477f-8e0d-832900000001",
    "read_timestamp": "2022-10-18T07:39:33.775",
    "source_timestamp": "2022-10-18T07:28:23.000",
    "object": "test_datastream_users",
    "read_method": "mysql-cdc-binlog",
    "stream_name": "projects/520158312831/locations/asia-northeast1/streams/my-stream",
    "schema_key": "9fef2a35f2c263f80aabd5f6217321da78138ed9",
    "sort_keys": [
      1666078103000,
      "mysql-bin.000032",
      200521
    ],
    "source_metadata": {
      "table": "users",
      "database": "test_datastream",
      "primary_keys": [
        "id"
      ],
      "log_file": "mysql-bin.000032",
      "log_position": 200521,
      "change_type": "INSERT",
      "is_deleted": false
    },
    "payload": {
      "id": 11,
      "name": "jiro",
      "email": "jiro@example.com"
    }
  },
  {
    "uuid": "759c4298-abe5-477f-8e0d-832900000000",
    "read_timestamp": "2022-10-18T07:39:33.775",
    "source_timestamp": "2022-10-18T07:28:23.000",
    "object": "test_datastream_users",
    "read_method": "mysql-cdc-binlog",
    "stream_name": "projects/520158312831/locations/asia-northeast1/streams/my-stream",
    "schema_key": "9fef2a35f2c263f80aabd5f6217321da78138ed9",
    "sort_keys": [
      1666078103000,
      "mysql-bin.000032",
      200190
    ],
    "source_metadata": {
      "table": "users",
      "database": "test_datastream",
      "primary_keys": [
        "id"
      ],
      "log_file": "mysql-bin.000032",
      "log_position": 200190,
      "change_type": "INSERT",
      "is_deleted": false
    },
    "payload": {
      "id": 10,
      "name": "taro",
      "email": "taro@example.com"
    }
  },
  {
    "uuid": "759c4298-abe5-477f-8e0d-832900000010",
    "read_timestamp": "2022-10-18T07:39:33.775",
    "source_timestamp": "2022-10-18T07:28:24.000",
    "object": "test_datastream_users",
    "read_method": "mysql-cdc-binlog",
    "stream_name": "projects/520158312831/locations/asia-northeast1/streams/my-stream",
    "schema_key": "9fef2a35f2c263f80aabd5f6217321da78138ed9",
    "sort_keys": [
      1666078104000,
      "mysql-bin.000032",
      200856
    ],
    "source_metadata": {
      "table": "users",
      "database": "test_datastream",
      "primary_keys": [
        "id"
      ],
      "log_file": "mysql-bin.000032",
      "log_position": 200856,
      "change_type": "INSERT",
      "is_deleted": false
    },
    "payload": {
      "id": 12,
      "name": "saburo",
      "email": "saburo@example.com"
    }
  }
]

最後に

この記事を書いている間にgcloudのupdateがかかったのだけど、update後は gcloud beta datastream がDEPRECATED になっていた。

普通に gcloud datastream で良さそう。

また、今回はMySQLのGlobal IP経由での情報取得を行っているが、Private側から接続する方法も調査したいと考えているので、まとまったら記事にしたい。

0
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0