はじめに ―筆者の課題感―
筆者が目下開発に取り組んでいるシステムは、マイクロサービス的に分割された複数のサービスが連携して動作する構成をとっています。各サービスは独自のデータベースを持っていて、サービス間でデータの整合性を保つために、あるサービスでデータが更新された際に他のサービスがその変更内容に応じた自身のデータベースの更新を行う必要性が出てきました。
このような課題に対して、下で説明するChange Data Capture (CDC) や、より広い概念としてデータストリーミングに関する技術を用いる必要性を感じました。一般にデータストリーミングツールとしてApache KafkaやクラウドサービスであるAmazon KinesisやGoogle Cloud Pub/Sub、CDCに関してはDebeziumなどが知られていますが、クラウドでの使用が前提であったり、そうでなくとも、筆者の要件に対してはオーバースペックでシステム構成が重厚になりすぎると感じました。
このような背景から、より軽量にCDCを実現する手段を探したところ、Go製のデータストリーミングツールであるConduitに辿り着きました。本記事では、Conduitを使用してCDCを実現する方法について紹介します。
Change Data Capture (CDC) について
Change Data Capture (CDC) とは、データベースに発生したデータの変更(挿入、更新、削除)をリアルタイムまたはニアリアルタイムで検知し、その変更情報を他のシステムに伝播する技術です。従来のバッチ処理によるデータ同期と異なり、CDCはデータの変更が発生した瞬間に、その変更内容を他のシステムに通知することができます。
CDCの主な手法としては、データベースのトランザクションログ (Write-Ahead Log) を監視する方法、タイムスタンプベースの変更検知、トリガーベースの変更キャプチャなどがあります。特にログベースのCDCは、データベースへの追加負荷が少なく、すべての変更を確実にキャプチャできるため、本格的なCDCソリューションで広く採用されています。
CDCを活用することで、マイクロサービス間のデータ整合性確保、データウェアハウスへのリアルタイムデータ連携、障害時のデータ復旧、イベントドリブンアーキテクチャの実現など、様々なユースケースに対応できます。特に分散システムにおいて、各サービスが独自のデータストアを持つ場合、CDCは重要な役割を果たします。
Conduit について
Conduitは、Go言語で実装されたオープンソースのデータストリーミングツールです。Meroxaという会社が開発・提供しています。
2025/12現在、最新のバージョンはv0.14.0で、安定バージョンとしてはリリースされていないことには注意が必要です。公式ドキュメントはある程度整備されていますが、この後の説明でいくつか指摘されるような機能の制約やバグのようなものも存在します。筆者が試していない部分でも何らかの問題が存在する可能性があることには注意が必要です。
実践
インストール
公式のInstalling and runningに従い、環境に合わせてインストールします。
Windowsの場合、
- GitHubのReleasesからバイナリをダウンロードする
- 必要に応じてパスを通す
-
conduit --versionで動作確認出力例v0.14.0 windows/amd64
により、すぐに実行できる環境が整います。
Connector
Conduitではconnectorを使用することで、どのソースからデータを読み取り、どのデスティネーションへとデータを書き込むかを設定します。
connectorには上記インストールをすることで即利用可能となるBuilt-in connectorと、Conduit本体とは独立して動作するStandalone connectorがあります。
それぞれ以下のようなものがあります。(ここに記載したものが全てではありません。)
| Built-in | Standalone |
|---|---|
| file kafka postgres s3 log |
http mysql mongo google-drive snowflake |
PostgreSQL から PostgreSQL へのCDC
Built-in connectorであるPostgreSQL connectorをソースとデスティネーションの両方に使用することで、PostgreSQL間でCDCを実現できます。
PostgreSQLの設定
論理レプリケーションの有効化
ConduitでPostgreSQLのCDCを実現するには、PostgreSQLの論理レプリケーション機能を使用する必要があります。具体的な設定として、Write-Ahead Logging (WAL) レベルを変更します。
作業としては、postgresql.confファイルを編集し、以下の設定を追加または変更します。
wal_level = logical # WALレベルを論理レプリケーション対応に変更
設定変更後は、PostgreSQLサーバーの再起動が必要です。以下はWindows環境での例です。
Restart-Service postgresql-x64-16 # "16" はPostgreSQLのバージョンに応じて変更
テーブルの作成
CDCの対象となるテーブルを作成します。以下は例として users テーブルを作成するSQL文です。
CREATE TABLE users (
id INT PRIMARY KEY,
name TEXT,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
これを、ソース・デスティネーション両方のデータベースに作成しておきます。
テーブル定義はあくまでテスト用で何でもよいのですが、筆者が試した限り現状(2025/12現在)では、主キーが単一の整数型以外(複合キー・文字列型など)の場合に正常に動作しない事象が発生しました。
Conduitの設定
設定ファイルはyamlで記載します。フォルダ pipelines を作成し、その下に pipelines.yaml というファイル名で以下の内容を保存します。
.
└── pipelines
└── pipelines.yaml
version: 2.2
pipelines:
- id: test-pipeline # パイプラインの一意なID
status: running
description: PostgreSQLからPostgreSQLへのCDCパイプライン
connectors:
- id: pg-source # ソースコネクタの一意なID
type: source
plugin: postgres
settings:
url: postgres://(user):(password)@(host):(port)/(データベース名)?sslmode=disable
tables: users # 監視対象のテーブル名
cdcMode: logrepl
logrepl.withAvroSchema: false
logrepl.publicationName: testpub
logrepl.slotName: testslot
- id: pg-target # デスティネーションコネクタの一意なID
type: destination
plugin: postgres
settings:
url: postgres://(user):(password)@(host):(port)/(データベース名)?sslmode=disable
起動・動作確認
conduit run
により起動できます。起動後、ソースのPostgreSQLの対象のテーブルに対して、INSERT、UPDATE、DELETEを行うと、すぐにデスティネーションのテーブルに同様の変更が反映されることが確認できます。
http connector をデスティネーションとして追加する
ソースやデスティネーションは複数設定可能です。先ほどの設定に加えて、standalone connectorであるhttp connectorをデスティネーションとして追加します。
- id: pg-target # デスティネーションコネクタの一意なID
type: destination
plugin: postgres
settings:
url: postgres://(user):(password)@(host):(port)/(データベース名)?sslmode=disable
+ - id: http
+ type: destination
+ plugin: http
+ settings:
+ url: http://localhost:8080/users/{{.Key.id}}
+ validateConnection: false
http connectorはstandaloneなconnectorなので、Conduit本体とは別に用意する必要があります。
公式ドキュメントのhttpより、環境に合ったバイナリをダウンロードして、下記のようなフォルダ構成にします。
.
├── pipelines
│ └── pipelines.yaml
└── connectors
└── conduit-connector-http.exe
送り先に相当するテスト用のサーバーを立てます。何でもよいのですが、ここではGoの標準ライブラリを使用した簡単なHTTPサーバーを例示します。あくまでサンプルなので、Node.jsのExpressやPythonのFlaskなど同等のことができれば何でもよいです。
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
)
func handler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Error reading request body", http.StatusInternalServerError)
return
}
fmt.Println(string(body))
// 何らかの処理を行う
}
func main() {
http.HandleFunc("/", handler)
fmt.Println("Starting server on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
go run main.go により、サーバーを起動します。
動作確認
先ほどPostgreSQLからPostgreSQLへCDCでの起動・動作確認と同様に、conduit run により、起動します。
上記のサンプルサーバーの場合、
- 挿入・更新により、
ID: 100 Body: {"email":"test@example.com","id":100,"name":"名前"} - 削除により、
ID: 100 Body: {}
のように標準出力に表示されることが確認できます。
筆者の検証では、現状少なくともtimestamp型のカラムがある場合に、http connectorへの送信に失敗する問題がありました。
Dead-letter queue
Dead-letter queue (DLQ) の機能を使うことにより、処理に失敗したレコードを別途保存しておくことができます。
Conduitでは、DLQとしてデスティネーションコネクタを指定します。以下は、DLQとしてbuilt-in connectorであるfile connectorを使用する場合の設定ファイルの記述例です。
- id: pg-target # デスティネーションコネクタの一意なID
type: destination
plugin: postgres
settings:
url: postgres://(user):(password)@(host):(port)/(データベース名)?sslmode=disable
+ dead-letter-queue:
+ plugin: builtin:file
+ settings:
+ path: ./dlq.out # ファイル名を指定
+ window-size: 0 # 即時書き込み
window-size などのパラメータの設定方法の詳細は公式ドキュメントのDead-letter queueをご覧ください。
設定後再起動し、デスティネーション側での処理に失敗するような操作を行うと、dlq.out ファイルに失敗したレコードが保存されることを確認できます。
{"position":"cG9zdGdyZXMtdG8tcG9zdGdyZXMtcGlwZWxpbmU6cGdfc3JjL3sidHlwZSI6MSwic25hcHNob3RzIjp7InVzZXJzIjp7Imxhc3RfcmVhZCI6MzAsInNuYXBzaG90X2VuZCI6MzB9fX0=","operation":"create","metadata":{"conduit.dlq.nack.error":"error getting data from URL: Post \"http://localhost:8089/30\": dial tcp [::1]:8089: connectex: No connection could be made because the target machine actively refused it.","conduit.dlq.nack.node.id":"test-pipeline:http-acker","opencdc.createdAt":"1765498423290171300"},"key":null,"payload":{"before":null,"after":{"key":{"id":30},"metadata":{"conduit.source.connector.id":"test-pipeline:pg_src","opencdc.collection":"users","opencdc.readAt":"1765498423280746300"},"operation":"snapshot","payload":{"after":{"email":"email@test.com","id":30,"name":"user name"},"before":null},"position":"eyJ0eXBlIjoxLCJzbmFwc2hvdHMiOnsidXNlcnMiOnsibGFzdF9yZWFkIjozMCwic25hcHNob3RfZW5kIjozMH19fQ=="}}}
まとめ
Conduitを用いることで、比較的簡単にCDCを実現できました。Conduitはまだ開発途上のツールであり、機能的な制約やバグが存在することに注意が必要ですが、Conduitに限らず、今後何らかの形でこのようなツールが発展したらなあと思っています。