著者: 伊藤 雅博, 株式会社日立製作所
はじめに
前回の投稿で解説したKafka Connectの動作を実機検証してみます。
記事一覧:
- Kafka Connectの概要とユースケース
- Kafka Connectのアーキテクチャとチューニングポイント
- Kafka Connectを分散モードで動かしてみた(本稿)
検証シナリオ
本稿ではKafka ConnectのJDBC Connectorを題材にして、Kafka Connectの動作検証を実施します。
検証内容:
- JDBC Source/Sink Connectorを活用したPostgreSQLテーブル間のデータ転送
- JDBC Sink Connectorのデッドレターキューの動作確認
- Worker追加によるスケールアウトの動作確認
- Worker停止時におけるフォールトトレランスの動作確認
検証環境
本検証ではJDBC Connectorを使用して、PostgreSQLのテーブル間でデータをコピーします。今回は動作確認が目的のため、転送元テーブルと転送先テーブルは同一のPostgreSQLサーバのデータベースを使用しました。
転送元と転送先テーブルの対応:
-
table01
テーブルのレコードをdst_table01
にコピー -
table02
テーブルのレコードをdst_table02
にコピー
本検証のシステム構成を以下の図に示します。
Kafkaコネクタを分散モードで実行するため、3台のサーバでKafka Connectクラスタを構築します。使用するOSとソフトウェアを以下に示します。
項目 | バージョンなど |
---|---|
OS | Rocky Linux 8.10 |
Apache Kafka | v3.8.1, Tarball版 |
Java | OpenJDK 17.0.11 |
コネクタ | Confluent Kafka JDBC Connector v10.8.0 |
事前準備
Kafka BrokerクラスタとPostgreSQLサーバを事前に用意しておきます。
Kafka Brokerクラスタの用意
Kafka Brokerクラスタを用意しておきます。今回は3台構成のBrokerクラスタで、サーバのホスト名は以下であると想定します。
ホスト名 |
---|
broker01 |
broker02 |
broker03 |
なお、本検証は1台構成のBrokerクラスタでも実施は可能です。
PostgreSQLサーバの用意
PostgreSQLサーバを用意しておきます。
ホスト名は以下であると想定します。
ホスト名 |
---|
pg01 |
PostgreSQLサーバを構築したら、以下のデータベースとユーザを作成してください。
項目 | 値 |
---|---|
データベース | kcdb |
ユーザ | kcuser |
パスワード | kcpw |
また、Kafka Connectクラスタの全サーバからの接続を許可してください。
Kafka Connectの環境構築
本検証では3台のサーバでKafka Connectクラスタを構築します。サーバのホスト名は以下であると想定します。
ホスト名 |
---|
node01 |
node02 |
node03 |
Kafka Connectを実行する全サーバで以下を実施します。
KafkaとJavaのインストール
Kafka Connectの実行ファイルはKafka本体の媒体に含まれているため、これを配置します。また、Kafkaの実行に必要なJDKをインストールします。
なお、Kafka ConnectをBrokerサーバ上で実行する場合は、これらの作業は不要です。
# Kafkaのパッケージをダウンロード
curl -O https://downloads.apache.org/kafka/3.8.1/kafka_2.13-3.8.1.tgz
# パッケージを解凍して配置し、リンクを作成
sudo tar -xzf kafka_2.13-3.8.1.tgz -C /opt
sudo ln -s /opt/kafka_2.13-3.8.1 /opt/kafka
# Open JDK 17をインストール
sudo dnf install -y java-17-openjdk-devel
Kafka Connectのファイル配置
Kafka ConnectのWorker構成ファイルと、コネクタプラグインのファイルを配置します。
Worker構成ファイルは、コネクタ群を実行するWorkerの設定ファイルです。プラグインを配置したディレクトリのパスplugin.path
、WorkerのグループIDgroup.id
などを記載します。
コネクタプラグインについては、プラグインを構成するJARファイル群と依存関係にあるファイルを、プラグイン単位のディレクトリに配置します。
Worker構成ファイルの作成
以下のworker構成ファイルを用意します。listeners
の設定値はサーバごとに書き換えます。
worker.properties
:
####################
# 必須の設定
####################
# 初期接続先のKafka Brokerのリスト
bootstrap.servers=broker01:9092,broker02:9092,broker03:9092
# Connectクラスタを示す一意のID(Consumer group ID)
group.id=mycluster
####################
# Converterの設定
####################
# Converterの指定: コネクタとTopic間のデータフォーマット変換処理を行うクラス
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converterがメッセージにスキーマ情報を含めるか否か
key.converter.schemas.enable=true
value.converter.schemas.enable=true
####################
# 管理用Topicの設定
####################
# Offset保存用Topicの設定
offset.storage.topic=mycluster-offsets
offset.storage.replication.factor=3
# コネクタ設定の保存用Topicの設定: Partition数は1で固定
config.storage.topic=mycluster-configs
config.storage.replication.factor=3
# コネクタ実行状態の保存用Topicの設定
status.storage.topic=mycluster-status
status.storage.replication.factor=3
####################
# Workerの設定
####################
# REST APIサーバが待ち受けるURIのリスト。Nodeごとにホスト名を書き換えること。
listeners=HTTP://node01:8083
#listeners=HTTP://node02:8083
#listeners=HTTP://node03:8083
# プラグイン (Connector、Converter、Transform) のファイル/ディレクトリパスのリスト
plugin.path=/opt/connectors
プラグインファイルの配置
Kafka Connectプラグイン用のディレクトリを作成して、プラグインファイルを配置します。JDBC Connectorプラグインは、下記サイトから「Self-Hosted」のファイルをダウンロードしてください。
# プラグインディレクトリの作成
mkdir -p /opt/connectors
# JDBC Connectorプラグインを配置
unzip confluentinc-kafka-connect-jdbc-10.8.0.zip -d /opt/connectors/
# 確認
ls -al /opt/connectors
テーブルとTopicの用意
PostgreSQLテーブルの作成
PostgreSQL上に入力元と出力先のテーブルを作成します。
出力先のテーブルはJDBC Sink Connectorによる自動作成も可能ですが、その場合は効率の悪いデータ型(integer
, text
など)が使用されます。そのため、テーブルは事前に作成しておくことを推奨します。
また、今回はデッドレターキュー (Dead Letter Queue: DLQ) をテストするために、入力元テーブルtable01
に対応する出力先テーブルdst_table01
のitem
列の型を変えています。
table01
のitem
列はvarchar(20)
ですが、dst_table01
のitem
列はvarchar(10)
のため、table01
に11文字以上の値を挿入すると、JDBC Sink Connectorの転送時にエラーが発生します。このエラーが発生したレコードはデッドレターキューに隔離されます。
# PostgreSQLのインタラクティブターミナルを起動してデータベースにログイン
sudo -u kcuser psql -d kcdb
-- 入力元テーブル`table01`を作成: `item`列は`varchar(20)`
CREATE TABLE table01 (
id serial NOT NULL,
user_id varchar(10),
item varchar(20),
PRIMARY KEY (id)
);
-- 入力元テーブル`table02`を作成
CREATE TABLE table02 (
id serial NOT NULL,
user_id varchar(10),
point integer,
PRIMARY KEY (id)
);
-- 出力先テーブル`dst_table01`を作成: `item`列は`varchar(10)`
CREATE TABLE dst_table01 (
id integer NOT NULL,
user_id varchar(10),
item varchar(10),
PRIMARY KEY (id)
);
-- 出力先テーブル`dst_table02`を作成
CREATE TABLE dst_table02 (
id integer NOT NULL,
user_id varchar(10),
point integer,
PRIMARY KEY (id)
);
-- テーブル一覧を確認
\dt
-- リレーション一覧
-- スキーマ | 名前 | 型 | 所有者
-- ----------+-------------+----------+--------
-- public | dst_table01 | テーブル | kcuser
-- public | dst_table02 | テーブル | kcuser
-- public | table01 | テーブル | kcuser
-- public | table02 | テーブル | kcuser
-- (4 行)
-- インタラクティブターミナルからログアウト
\q
Kafka Topicの作成
Kafkaクラスタ上に、各テーブル用の転送用Topicを作成しておきます。
Sourceコネクタの初回書き込み時、またはSinkコネクタ起動時にTopicを自動作成することも可能ですが、Topic設定をデフォルトから変更したい場合は、事前に作成しておく必要があります。
今回は各TopicのPartition数を2個に設定します。そのため、入力元テーブルから出力先テーブルへの書き込み順序は、メッセージのキー単位でのみ維持されます。
今回はキーにuser_id
を使用するため、同じuser_id
をもつメッセージの転送順序は維持されますが、別々のuser_id
間では順序が維持されません。書き込み順序を完全に維持したい場合は、Partition数を1にする必要があります。
# Topic `dst_table01` を作成
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--create \
--topic dst_table01 \
--partitions 2
# Topic `dst_table02` を作成
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--create \
--topic dst_table02 \
--partitions 2
# Topicの一覧を確認
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--list
## ・・・
## dst_table01
## dst_table02
Kafka Connectの起動
Workerの起動
まずはnode01
とnode02
の2台で下記コマンドを実行して、Workerインスタンス2個起動します。残りのnode03
は後ほどスケールアウトの検証時に起動します。
# Workerインスタンスを起動
/opt/kafka/bin/connect-distributed.sh worker.properties
同一のグループIDgroup.id
=mycluster
を持つWorker間で、自動的にWorkerグループ(クラスタ)が構成されます。
Workerクラスタを起動すると、下記3種類のコネクタ管理用Topicが作成されます。
-
mycluster-configs
: コネクタの設定を保存 -
mycluster-status
: コネクタの実行状態を保存 -
mycluster-offsets
: Partitioned StreamのOffsetを保存(Sourceコネクタのみ利用)
# Topicの一覧を表示: 管理用Topicが作成されている
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--list
## ・・・
## mycluster-configs
## mycluster-offsets
## mycluster-status
コネクタ一覧の確認
いずれかのWorkerのREST APIにアクセスして、コネクタプラグインJdbcSourceConnector
とJdbcSinkConnector
が読み込まれていることを確認します。
# 利用可能なコネクタプラグインの一覧を確認
curl -X GET http://node01:8083/connector-plugins
レスポンス(整形済):
[
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "10.8.0"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "10.8.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "3.8.1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "3.8.1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "3.8.1"
}
]
コネクタの起動
Workerクラスタ上でJDBC Source ConnectorとJDBC Sink Connectorを起動します。
JDBC Source Connectorの起動
コネクタ設定を記載したJSONペイロードを作成します。
PostgreSQLの接続情報、出力先Topicなどを指定します。また、transforms
で各テーブルのuser_id
をKafkaメッセージのキーに設定します。そのため、メッセージの転送順序はuser_id
単位で維持されます。
設定の詳細は以下の公式ドキュメントをご参照ください。
jdbc_source_postgresql.json
:
{
"name": "jdbc_source_postgresql",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://pg01:5432/kcdb",
"connection.user": "kcuser",
"connection.password": "kcpw",
"topic.prefix": "dst_",
"poll.interval.ms" : "5000",
"table.poll.interval.ms" : "60000",
"table.whitelist" : "table01,table02",
"mode":"incrementing",
"incrementing.column.name": "id",
"tasks.max": "5",
"transforms":"createKey,extractInt",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"user_id",
"transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field":"user_id"
}
}
REST APIにコネクタ作成リクエストを送信して、コネクタを起動します。
# コネクタを作成
curl -X POST "http://node01:8083/connectors" \
-H 'Content-Type: application/json' \
-d @jdbc_source_postgresql.json
# 作成済みコネクタの一覧を確認
curl -X GET http://node01:8083/connectors
## ["jdbc_source_postgresql"]
起動したコネクタの状態を確認します。
# コネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_source_postgresql/status
レスポンス(整形済み):
{
"name": "jdbc_source_postgresql",
"connector": {
"state": "RUNNING",
"worker_id": "node01:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "node01:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "node02:8083"
}
],
"type": "source"
}
JDBC Source Connectorは取得対象のテーブル数と同じ2個のTaskを起動しました。
JDBC Sink Connectorの起動
コネクタ設定を記載したJSONペイロードを作成します。ここでデッドレターキューも有効化します。
設定の詳細は以下の公式ドキュメントをご参照ください。
jdbc_sink_postgresql.json
:
{
"name": "jdbc_sink_postgresql",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://pg01:5432/kcdb",
"connection.user": "kcuser",
"connection.password": "kcpw",
"topics": "dst_table01,dst_table02",
"tasks.max": "5",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-jdbc-sink-postgresql",
"errors.deadletterqueue.context.headers.enable": true
}
}
REST APIにコネクタ作成リクエストを送信して、コネクタを起動します。
# コネクタを作成
curl -X POST "http://node01:8083/connectors" \
-H 'Content-Type: application/json' \
-d @jdbc_sink_postgresql.json
# 作成済みコネクタの一覧を確認
curl -X GET http://node01:8083/connectors
## ["jdbc_source_postgresql","jdbc_sink_postgresql"]
起動したコネクタの状態を確認します。
# コネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_sink_postgresql/status
レスポンス(整形済み):
{
"name": "jdbc_sink_postgresql",
"connector": {
"state": "RUNNING",
"worker_id": "node02:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "node01:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "node02:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "node01:8083"
},
{
"id": 3,
"state": "RUNNING",
"worker_id": "node02:8083"
},
{
"id": 4,
"state": "RUNNING",
"worker_id": "node01:8083"
}
],
"type": "sink"
}
JDBC Source Connectorはtasks.max
と同じ5個のTaskを起動しました。
ただし入力元Topicの最大Partition数である2個より多いTaskは、実際には使用されません。これはConsumer Groupの仕組みを利用しているためです。余剰Taskは最大Partition数が増えた時に備えて待機するだけです。
JDBCコネクタの動作検証
起動した2つのコネクタの動作確認を行います。
PostgreSQLテーブルにレコードを格納すると、JDBC Source ConnectorがPostgreSQLテーブルのレコードを読み出してTopicに格納します。その後、JDBC Sink ConnectorがTopicからメッセージを読み出して、PostgreSQLの別のテーブルに格納します。
PostgreSQLにレコードを格納
PostgreSQLの入力元テーブルにレコードを挿入して、同じレコードが出力先テーブルに格納されることを確認します。
# PostgreSQLのインタラクティブターミナルを起動してデータベースにログイン
sudo -u kcuser psql -d kcdb
-- 入力元テーブルにレコードを格納
INSERT INTO table01 (user_id, item)
VALUES ('user01', 'apple'),('user01', 'orange'),('user02', 'banana'),('user02', 'melon');
INSERT INTO table02 (user_id, point)
VALUES ('user01', 3),('user01', 5),('user02', 6),('user02', 1);
-- 入力元テーブル`table01`のレコードを確認
SELECT * FROM table01;
-- id | user_id | item
-- ----+---------+--------
-- 1 | user01 | apple
-- 2 | user01 | orange
-- 3 | user02 | banana
-- 4 | user02 | melon
-- (4 行)
-- 入力元テーブル`table02`のレコードを確認
SELECT * FROM table02;
-- id | user_id | point
-- ----+---------+-------
-- 1 | user01 | 3
-- 2 | user01 | 5
-- 3 | user02 | 6
-- 4 | user02 | 1
-- (4 行)
-- 出力先テーブル`dst_table01`のレコードを確認
SELECT * FROM dst_table01;
-- id | user_id | item
-- ----+---------+--------
-- 1 | user01 | apple
-- 2 | user01 | orange
-- 3 | user02 | banana
-- 4 | user02 | melon
-- (4 行)
-- 出力先テーブル`dst_table02`のレコードを確認:入力元テーブル`table01`と順序が入れ替わっている
SELECT * FROM dst_table02;
-- id | user_id | point
-- ----+---------+-------
-- 3 | user02 | 6
-- 4 | user02 | 1
-- 1 | user01 | 3
-- 2 | user01 | 5
-- (4 行)
-- インタラクティブターミナルからログアウト
\q
今回は各TopicのPartition数が2個なので、上記の出力先テーブルのレコード順序は入れ替わる場合があります。
Topicに転送されたメッセージを確認
JDBC Source ConnectorがPostgreSQLテーブルからKafka Topicに転送したメッセージを確認します。
確認内容:
- Topic
dst_table01
とdst_table02
に、それぞれ4件のメッセージが格納されていること - 同じキー(
user01
または``user02`)のメッセージは、同じPartitionに格納されていること
# コンソールConsumerでTopic `dst_table01` のメッセージを確認
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--topic dst_table01 \
--property print.partition=true \
--property print.offset=true \
--property print.key=true \
--property print.value=true \
--from-beginning
取得結果:
Partition:0 Offset:0 {"schema":{"type":"string","optional":true},"payload":"user01"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"table01"},"payload":{"id":1,"user_id":"user01","item":"apple"}}
Partition:0 Offset:1 {"schema":{"type":"string","optional":true},"payload":"user01"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"table01"},"payload":{"id":2,"user_id":"user01","item":"orange"}}
Partition:1 Offset:0 {"schema":{"type":"string","optional":true},"payload":"user02"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"table01"},"payload":{"id":3,"user_id":"user02","item":"banana"}}
Partition:1 Offset:1 {"schema":{"type":"string","optional":true},"payload":"user02"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"table01"},"payload":{"id":4,"user_id":"user02","item":"melon"}}
4件のメッセージが取得できました。またuser01
のメッセージは全てPartition:0
、user02
のメッセージは全てPartition:1
に格納されていることを確認できました。
# コンソールConsumerでTopic `dst_table02` のメッセージを確認
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--topic dst_table02 \
--property print.partition=true \
--property print.offset=true \
--property print.key=true \
--property print.value=true \
--from-beginning
取得結果:
Partition:0 Offset:0 {"schema":{"type":"string","optional":true},"payload":"user01"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"int32","optional":true,"field":"point"}],"optional":false,"name":"table02"},"payload":{"id":1,"user_id":"user01","point":3}}
Partition:0 Offset:1 {"schema":{"type":"string","optional":true},"payload":"user01"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"int32","optional":true,"field":"point"}],"optional":false,"name":"table02"},"payload":{"id":2,"user_id":"user01","point":5}}
Partition:1 Offset:0 {"schema":{"type":"string","optional":true},"payload":"user02"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"int32","optional":true,"field":"point"}],"optional":false,"name":"table02"},"payload":{"id":3,"user_id":"user02","point":6}}
Partition:1 Offset:1 {"schema":{"type":"string","optional":true},"payload":"user02"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"int32","optional":true,"field":"point"}],"optional":false,"name":"table02"},"payload":{"id":4,"user_id":"user02","point":1}}
同様に4件のメッセージが取得できました。またuser01
のメッセージは全てPartition:0
、user02
のメッセージは全てPartition:1
に格納されていることを確認できました。
デッドレターキュー(DLQ)の動作検証
Sinkコネクタが何らかの理由でメッセージを処理できなかった場合、そのメッセージをデッドレターキュー (Dead Letter Queue: DLQ) 用のTopicに隔離して処理をスキップします。
今回はJDBC SinkコネクタでDLQを有効化する下記設定を追加済みです。
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-jdbc-sink-postgresql",
"errors.deadletterqueue.context.headers.enable": true
PostgreSQLテーブルの作成で説明した通り、今回はDLQをテストするため入力元テーブルtable01
に対応する出力先テーブルdst_table01
のitem
列の型を変えています。
table01
のitem
列はvarchar(20)
ですが、dst_table01
のitem
列はvarchar(10)
のため、table01
に11-20文字の値を挿入すると、JDBC Sink Connectorの転送時にエラーが発生します。このエラーが発生したレコードがデッドレターキューに隔離されます。
dst_table01
テーブルにitem
が16文字のレコード挿入して、DLQに記録される内容を確認してみます。
# PostgreSQLのインタラクティブターミナルを起動してデータベースにログイン
sudo -u kcuser psql -d kcdb
-- 入力元テーブルに不正なレコード(`item`が16文字)を挿入
INSERT INTO table01 (user_id, item) VALUES ('user01', 'super-apple-0000');
-- 入力元テーブルには挿入された
SELECT * FROM table01;
id | user_id | item
----+---------+------------------
1 | user01 | apple
2 | user01 | orange
3 | user02 | banana
4 | user02 | melon
5 | user01 | super-apple-0000
(5 行)
-- 出力先テーブルには転送されなかった
SELECT * FROM dst_table01;
id | user_id | item
----+---------+--------
1 | user01 | apple
2 | user01 | orange
3 | user02 | banana
4 | user02 | melon
(4 行)
-- インタラクティブターミナルからログアウト
\q
DLQに記録された内容を確認します。
# コンソールConsumerを起動してDLQのメッセージを読み出し
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--topic dlq-jdbc-sink-postgresql \
--property print.headers=true \
--property print.key=true \
--property print.value=true \
--from-beginning
DLQに以下のメッセージが格納されていました。メッセージのヘッダーにエラー内容ERROR: 値は型character varying(10)としては長すぎます
が記録されています。
__connect.errors.topic:dst_table01,__connect.errors.partition:0,__connect.errors.offset:2,__connect.errors.connector.name:jdbc_sink_postgresql,__connect.errors.task.id:0,__connect.errors.stage:TASK_PUT,__connect.errors.class.name:org.apache.kafka.connect.sink.SinkTask,__connect.errors.exception.class.name:java.sql.SQLException,__connect.errors.exception.message:Exception chain:
java.sql.BatchUpdateException: バッチ 0 INSERT INTO "kcdb"."public"."dst_table01" ("id","user_id","item") VALUES (('5'::int4),('user01'),('super-apple-0000')) はアボートしました: ERROR: 値は型character varying(10)としては長すぎます このバッチの他のエラーは getNextException を呼び出すことで確認できます。
org.postgresql.util.PSQLException: ERROR: 値は型character varying(10)としては長すぎます
org.postgresql.util.PSQLException: ERROR: 値は型character varying(10)としては長すぎます
,__connect.errors.exception.stacktrace:java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: バッチ 0 INSERT INTO "kcdb"."public"."dst_table01" ("id","user_id","item") VALUES (('5'::int4),('user01'),('super-apple-0000')) はアボートしました: ERROR: 値は型character varying(10)としては長すぎます このバッチの他のエラーは getNextException を呼び出すことで確認できます。
org.postgresql.util.PSQLException: ERROR: 値は型character varying(10)としては長すぎます
org.postgresql.util.PSQLException: ERROR: 値は型character varying(10)としては長すぎます
at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:167)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.unrollAndRetry(JdbcSinkTask.java:152)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:605)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
{"schema":{"type":"string","optional":true},"payload":"user01"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"table01"},"payload":{"id":5,"user_id":"user01","item":"super-apple-0000"}}
スケールアウトの動作検証
Workerインスタンスを追加して以下の動作を確認します。
- サーバ
node03
上で新しいWorkerインスタンスを起動する - 起動中のコネクタのConnectorとTaskの一部が、自動的に
node03
上のWorkerに移動する
新しいサーバ上でWorkerインスタンスを起動すると、Workerクラスタに自動追加されます。
# Workerインスタンスを起動
/opt/kafka/bin/connect-distributed.sh worker.properties
しばらく待ってからコネクタの状態を確認します。
# JDBC Sourceコネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_source_postgresql/status
レスポンス(整形済み):
{
"name": "jdbc_source_postgresql",
"connector": {
"state": "RUNNING",
"worker_id": "node01:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "node03:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "node02:8083"
}
],
"type": "source"
}
tasks.id: 0
がworker_id: node03:8083
に移動しました。
# JDBC Sinkコネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_sink_postgresql/status
レスポンス(整形済み):
{
"name": "jdbc_sink_postgresql",
"connector": {
"state": "RUNNING",
"worker_id": "node02:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "node03:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "node02:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "node01:8083"
},
{
"id": 3,
"state": "RUNNING",
"worker_id": "node02:8083"
},
{
"id": 4,
"state": "RUNNING",
"worker_id": "node01:8083"
}
],
"type": "sink"
}
tasks.id: 0
がworker_id: node03:8083
に移動しました。
フォールトトレランスの動作検証
Workerインスタンスを停止して以下の動作を確認します。
- サーバ
node02
上のWorkerインスタンスを停止する -
node02
上の各コネクタのConnectorとTaskが、自動的に別のWorkerに移動する
なお、クラスタからWorkerが離脱した際のリバランスは、実行前に最大scheduled.rebalance.max.delay.ms
だけ待機する場合があります。この待機時間はデフォルト設定では300000
ミリ秒 (5分)です。リバランスの実行タイミングは様々な要因に影響されるため、待機する場合もあれば、即座にリバランスが実施される場合もあります。
詳しくは前回の投稿にある増分協調リバランス (Incremental Cooperative Rebalancing)の説明をご参照ください。
node02
のWorkerインスタンスを停止します。
# Workerインスタンスをkill(SIGKILLで強制終了)
jps -l | grep org.apache.kafka.connect.cli.ConnectDistributed | awk '{print $1}' | xargs kill -9
JDBC Sourceコネクタの状態を確認します。今回はリバランスの実行前に、約5分間の待機が発生しました。
Workerインスタンスの停止直後にコネクタの状態を確認するとConnector/Taskがnode02
に配置されたままでしたが、5分後に再度確認するとConnector/Taskがnode02
以外に移動していることが確認できました。
# JDBC Sourceコネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_source_postgresql/status
レスポンス(整形済み):
{
"name": "jdbc_source_postgresql",
"connector": {
"state": "RUNNING",
"worker_id": "node01:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "node03:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "node01:8083"
}
],
"type": "source"
}
tasks.id: 1
がworker_id: node01:8083
に移動しました。
# JDBC Sinkコネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_sink_postgresql/status
レスポンス(整形済み):
{
"name": "jdbc_sink_postgresql",
"connector": {
"state": "RUNNING",
"worker_id": "node03:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "node03:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "node01:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "node01:8083"
},
{
"id": 3,
"state": "RUNNING",
"worker_id": "node03:8083"
},
{
"id": 4,
"state": "RUNNING",
"worker_id": "node01:8083"
}
],
"type": "sink"
}
connector
、tasks.id: 3
がworker_id: node03:8083
に移動し、tasks.id: 1
がworker_id: node01:8083
に移動しました。
おわりに
本稿ではKafka Connectを分散モードで動かして、JDBC Connectorを活用したPostgreSQLテーブル間のデータコピーの動作を確認しました。またコネクタのスケールアウトとフォールトトレランスの動作を確認しました。