0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Kafka Connectを分散モードで動かしてみた

Last updated at Posted at 2025-07-17

著者: 伊藤 雅博, 株式会社日立製作所

はじめに

前回の投稿で解説したKafka Connectの動作を実機検証してみます。

記事一覧:

  1. Kafka Connectの概要とユースケース
  2. Kafka Connectのアーキテクチャとチューニングポイント
  3. Kafka Connectを分散モードで動かしてみた(本稿)

検証シナリオ

本稿ではKafka ConnectのJDBC Connectorを題材にして、Kafka Connectの動作検証を実施します。

検証内容:

  • JDBC Source/Sink Connectorを活用したPostgreSQLテーブル間のデータ転送
  • JDBC Sink Connectorのデッドレターキューの動作確認
  • Worker追加によるスケールアウトの動作確認
  • Worker停止時におけるフォールトトレランスの動作確認

検証環境

本検証ではJDBC Connectorを使用して、PostgreSQLのテーブル間でデータをコピーします。今回は動作確認が目的のため、転送元テーブルと転送先テーブルは同一のPostgreSQLサーバのデータベースを使用しました。

転送元と転送先テーブルの対応:

  • table01テーブルのレコードをdst_table01にコピー
  • table02テーブルのレコードをdst_table02にコピー

本検証のシステム構成を以下の図に示します。

Kafkaコネクタを分散モードで実行するため、3台のサーバでKafka Connectクラスタを構築します。使用するOSとソフトウェアを以下に示します。

項目 バージョンなど
OS Rocky Linux 8.10
Apache Kafka v3.8.1, Tarball版
Java OpenJDK 17.0.11
コネクタ Confluent Kafka JDBC Connector v10.8.0

事前準備

Kafka BrokerクラスタとPostgreSQLサーバを事前に用意しておきます。

Kafka Brokerクラスタの用意

Kafka Brokerクラスタを用意しておきます。今回は3台構成のBrokerクラスタで、サーバのホスト名は以下であると想定します。

ホスト名
broker01
broker02
broker03

なお、本検証は1台構成のBrokerクラスタでも実施は可能です。

PostgreSQLサーバの用意

PostgreSQLサーバを用意しておきます。
ホスト名は以下であると想定します。

ホスト名
pg01

PostgreSQLサーバを構築したら、以下のデータベースとユーザを作成してください。

項目
データベース kcdb
ユーザ kcuser
パスワード kcpw

また、Kafka Connectクラスタの全サーバからの接続を許可してください。

Kafka Connectの環境構築

本検証では3台のサーバでKafka Connectクラスタを構築します。サーバのホスト名は以下であると想定します。

ホスト名
node01
node02
node03

Kafka Connectを実行する全サーバで以下を実施します。

KafkaとJavaのインストール

Kafka Connectの実行ファイルはKafka本体の媒体に含まれているため、これを配置します。また、Kafkaの実行に必要なJDKをインストールします。

なお、Kafka ConnectをBrokerサーバ上で実行する場合は、これらの作業は不要です。

# Kafkaのパッケージをダウンロード
curl -O https://downloads.apache.org/kafka/3.8.1/kafka_2.13-3.8.1.tgz

# パッケージを解凍して配置し、リンクを作成
sudo tar -xzf kafka_2.13-3.8.1.tgz -C /opt
sudo ln -s /opt/kafka_2.13-3.8.1 /opt/kafka

# Open JDK 17をインストール
sudo dnf install -y java-17-openjdk-devel

Kafka Connectのファイル配置

Kafka ConnectのWorker構成ファイルと、コネクタプラグインのファイルを配置します。

Worker構成ファイルは、コネクタ群を実行するWorkerの設定ファイルです。プラグインを配置したディレクトリのパスplugin.path、WorkerのグループIDgroup.idなどを記載します。

コネクタプラグインについては、プラグインを構成するJARファイル群と依存関係にあるファイルを、プラグイン単位のディレクトリに配置します。

Worker構成ファイルの作成

以下のworker構成ファイルを用意します。listenersの設定値はサーバごとに書き換えます。

worker.properties:

####################
# 必須の設定
####################

# 初期接続先のKafka Brokerのリスト
bootstrap.servers=broker01:9092,broker02:9092,broker03:9092

# Connectクラスタを示す一意のID(Consumer group ID)
group.id=mycluster

####################
# Converterの設定
####################

# Converterの指定: コネクタとTopic間のデータフォーマット変換処理を行うクラス
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Converterがメッセージにスキーマ情報を含めるか否か
key.converter.schemas.enable=true
value.converter.schemas.enable=true

####################
# 管理用Topicの設定
####################

# Offset保存用Topicの設定
offset.storage.topic=mycluster-offsets
offset.storage.replication.factor=3

# コネクタ設定の保存用Topicの設定: Partition数は1で固定
config.storage.topic=mycluster-configs
config.storage.replication.factor=3

# コネクタ実行状態の保存用Topicの設定
status.storage.topic=mycluster-status
status.storage.replication.factor=3

####################
# Workerの設定
####################

# REST APIサーバが待ち受けるURIのリスト。Nodeごとにホスト名を書き換えること。
listeners=HTTP://node01:8083
#listeners=HTTP://node02:8083
#listeners=HTTP://node03:8083

# プラグイン (Connector、Converter、Transform) のファイル/ディレクトリパスのリスト
plugin.path=/opt/connectors

プラグインファイルの配置

Kafka Connectプラグイン用のディレクトリを作成して、プラグインファイルを配置します。JDBC Connectorプラグインは、下記サイトから「Self-Hosted」のファイルをダウンロードしてください。

# プラグインディレクトリの作成
mkdir -p /opt/connectors

# JDBC Connectorプラグインを配置
unzip confluentinc-kafka-connect-jdbc-10.8.0.zip -d /opt/connectors/

# 確認
ls -al /opt/connectors

テーブルとTopicの用意

PostgreSQLテーブルの作成

PostgreSQL上に入力元と出力先のテーブルを作成します。

出力先のテーブルはJDBC Sink Connectorによる自動作成も可能ですが、その場合は効率の悪いデータ型(integer, textなど)が使用されます。そのため、テーブルは事前に作成しておくことを推奨します。

また、今回はデッドレターキュー (Dead Letter Queue: DLQ) をテストするために、入力元テーブルtable01に対応する出力先テーブルdst_table01item列の型を変えています。

table01item列はvarchar(20)ですが、dst_table01item列はvarchar(10)のため、table01に11文字以上の値を挿入すると、JDBC Sink Connectorの転送時にエラーが発生します。このエラーが発生したレコードはデッドレターキューに隔離されます。

# PostgreSQLのインタラクティブターミナルを起動してデータベースにログイン
sudo -u kcuser psql -d kcdb
-- 入力元テーブル`table01`を作成: `item`列は`varchar(20)`
CREATE TABLE table01 (
  id serial NOT NULL,
  user_id varchar(10),
  item varchar(20),
  PRIMARY KEY (id)
);

-- 入力元テーブル`table02`を作成
CREATE TABLE table02 (
  id serial NOT NULL,
  user_id varchar(10),
  point integer,
  PRIMARY KEY (id)
);

-- 出力先テーブル`dst_table01`を作成: `item`列は`varchar(10)`
CREATE TABLE dst_table01 (
  id integer NOT NULL,
  user_id varchar(10),
  item varchar(10),
  PRIMARY KEY (id)
);

-- 出力先テーブル`dst_table02`を作成
CREATE TABLE dst_table02 (
  id integer NOT NULL,
  user_id varchar(10),
  point integer,
  PRIMARY KEY (id)
);

-- テーブル一覧を確認
\dt
--               リレーション一覧
--  スキーマ |    名前     |    型    | 所有者 
-- ----------+-------------+----------+--------
--  public   | dst_table01 | テーブル | kcuser
--  public   | dst_table02 | テーブル | kcuser
--  public   | table01     | テーブル | kcuser
--  public   | table02     | テーブル | kcuser
-- (4 行)

-- インタラクティブターミナルからログアウト
\q

Kafka Topicの作成

Kafkaクラスタ上に、各テーブル用の転送用Topicを作成しておきます。

Sourceコネクタの初回書き込み時、またはSinkコネクタ起動時にTopicを自動作成することも可能ですが、Topic設定をデフォルトから変更したい場合は、事前に作成しておく必要があります。

今回は各TopicのPartition数を2個に設定します。そのため、入力元テーブルから出力先テーブルへの書き込み順序は、メッセージのキー単位でのみ維持されます。

今回はキーにuser_idを使用するため、同じuser_idをもつメッセージの転送順序は維持されますが、別々のuser_id間では順序が維持されません。書き込み順序を完全に維持したい場合は、Partition数を1にする必要があります。

# Topic `dst_table01` を作成
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--create \
--topic dst_table01 \
--partitions 2

# Topic `dst_table02` を作成
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--create \
--topic dst_table02 \
--partitions 2

# Topicの一覧を確認
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--list
## ・・・
## dst_table01
## dst_table02

Kafka Connectの起動

Workerの起動

まずはnode01node02の2台で下記コマンドを実行して、Workerインスタンス2個起動します。残りのnode03は後ほどスケールアウトの検証時に起動します。

# Workerインスタンスを起動
/opt/kafka/bin/connect-distributed.sh worker.properties

同一のグループIDgroup.id=myclusterを持つWorker間で、自動的にWorkerグループ(クラスタ)が構成されます。

Workerクラスタを起動すると、下記3種類のコネクタ管理用Topicが作成されます。

  • mycluster-configs: コネクタの設定を保存
  • mycluster-status: コネクタの実行状態を保存
  • mycluster-offsets: Partitioned StreamのOffsetを保存(Sourceコネクタのみ利用)
# Topicの一覧を表示: 管理用Topicが作成されている
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--list
## ・・・
## mycluster-configs
## mycluster-offsets
## mycluster-status

コネクタ一覧の確認

いずれかのWorkerのREST APIにアクセスして、コネクタプラグインJdbcSourceConnectorJdbcSinkConnectorが読み込まれていることを確認します。

# 利用可能なコネクタプラグインの一覧を確認
curl -X GET http://node01:8083/connector-plugins

レスポンス(整形済):

[
    {
        "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "type": "sink",
        "version": "10.8.0"
    },
    {
        "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "type": "source",
        "version": "10.8.0"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "3.8.1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "3.8.1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "3.8.1"
    }
]

コネクタの起動

Workerクラスタ上でJDBC Source ConnectorとJDBC Sink Connectorを起動します。

kc_03_run_connectors.png

JDBC Source Connectorの起動

コネクタ設定を記載したJSONペイロードを作成します。

PostgreSQLの接続情報、出力先Topicなどを指定します。また、transformsで各テーブルのuser_idをKafkaメッセージのキーに設定します。そのため、メッセージの転送順序はuser_id単位で維持されます。

設定の詳細は以下の公式ドキュメントをご参照ください。

jdbc_source_postgresql.json:

{
  "name": "jdbc_source_postgresql",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://pg01:5432/kcdb",
    "connection.user": "kcuser",
    "connection.password": "kcpw",
    "topic.prefix": "dst_",
    "poll.interval.ms" : "5000",
    "table.poll.interval.ms" : "60000",
    "table.whitelist" : "table01,table02",
    "mode":"incrementing",
    "incrementing.column.name": "id",
    "tasks.max": "5",
    "transforms":"createKey,extractInt",
    "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields":"user_id",
    "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractInt.field":"user_id"
  }
}

REST APIにコネクタ作成リクエストを送信して、コネクタを起動します。

# コネクタを作成
curl -X POST "http://node01:8083/connectors" \
-H 'Content-Type: application/json' \
-d @jdbc_source_postgresql.json

# 作成済みコネクタの一覧を確認
curl -X GET http://node01:8083/connectors
## ["jdbc_source_postgresql"]

起動したコネクタの状態を確認します。

# コネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_source_postgresql/status

レスポンス(整形済み):

{
    "name": "jdbc_source_postgresql",
    "connector": {
        "state": "RUNNING",
        "worker_id": "node01:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "node01:8083"
        },
        {
            "id": 1,
            "state": "RUNNING",
            "worker_id": "node02:8083"
        }
    ],
    "type": "source"
}

JDBC Source Connectorは取得対象のテーブル数と同じ2個のTaskを起動しました。

JDBC Sink Connectorの起動

コネクタ設定を記載したJSONペイロードを作成します。ここでデッドレターキューも有効化します。

設定の詳細は以下の公式ドキュメントをご参照ください。

jdbc_sink_postgresql.json:

{
  "name": "jdbc_sink_postgresql",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://pg01:5432/kcdb",
    "connection.user": "kcuser",
    "connection.password": "kcpw",
    "topics": "dst_table01,dst_table02",
    "tasks.max": "5",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-jdbc-sink-postgresql",
    "errors.deadletterqueue.context.headers.enable": true
  }
}

REST APIにコネクタ作成リクエストを送信して、コネクタを起動します。

# コネクタを作成
curl -X POST "http://node01:8083/connectors" \
-H 'Content-Type: application/json' \
-d @jdbc_sink_postgresql.json

# 作成済みコネクタの一覧を確認
curl -X GET http://node01:8083/connectors
## ["jdbc_source_postgresql","jdbc_sink_postgresql"]

起動したコネクタの状態を確認します。

# コネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_sink_postgresql/status

レスポンス(整形済み):

{
    "name": "jdbc_sink_postgresql",
    "connector": {
        "state": "RUNNING",
        "worker_id": "node02:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "node01:8083"
        },
        {
            "id": 1,
            "state": "RUNNING",
            "worker_id": "node02:8083"
        },
        {
            "id": 2,
            "state": "RUNNING",
            "worker_id": "node01:8083"
        },
        {
            "id": 3,
            "state": "RUNNING",
            "worker_id": "node02:8083"
        },
        {
            "id": 4,
            "state": "RUNNING",
            "worker_id": "node01:8083"
        }
    ],
    "type": "sink"
}

JDBC Source Connectorはtasks.maxと同じ5個のTaskを起動しました。

ただし入力元Topicの最大Partition数である2個より多いTaskは、実際には使用されません。これはConsumer Groupの仕組みを利用しているためです。余剰Taskは最大Partition数が増えた時に備えて待機するだけです。

JDBCコネクタの動作検証

起動した2つのコネクタの動作確認を行います。

PostgreSQLテーブルにレコードを格納すると、JDBC Source ConnectorがPostgreSQLテーブルのレコードを読み出してTopicに格納します。その後、JDBC Sink ConnectorがTopicからメッセージを読み出して、PostgreSQLの別のテーブルに格納します。

kc_test.png

PostgreSQLにレコードを格納

PostgreSQLの入力元テーブルにレコードを挿入して、同じレコードが出力先テーブルに格納されることを確認します。

# PostgreSQLのインタラクティブターミナルを起動してデータベースにログイン
sudo -u kcuser psql -d kcdb
-- 入力元テーブルにレコードを格納
INSERT INTO table01 (user_id, item) 
VALUES ('user01', 'apple'),('user01', 'orange'),('user02', 'banana'),('user02', 'melon');

INSERT INTO table02 (user_id, point) 
VALUES ('user01', 3),('user01', 5),('user02', 6),('user02', 1);

-- 入力元テーブル`table01`のレコードを確認
SELECT * FROM table01;
--  id | user_id |  item  
-- ----+---------+--------
--   1 | user01  | apple
--   2 | user01  | orange
--   3 | user02  | banana
--   4 | user02  | melon
-- (4 行)

-- 入力元テーブル`table02`のレコードを確認
SELECT * FROM table02;
--  id | user_id | point 
-- ----+---------+-------
--   1 | user01  |     3
--   2 | user01  |     5
--   3 | user02  |     6
--   4 | user02  |     1
-- (4 行)

-- 出力先テーブル`dst_table01`のレコードを確認
SELECT * FROM dst_table01;
--  id | user_id |  item  
-- ----+---------+--------
--   1 | user01  | apple
--   2 | user01  | orange
--   3 | user02  | banana
--   4 | user02  | melon
-- (4 行)

-- 出力先テーブル`dst_table02`のレコードを確認:入力元テーブル`table01`と順序が入れ替わっている
SELECT * FROM dst_table02;
--  id | user_id | point 
-- ----+---------+-------
--   3 | user02  |     6
--   4 | user02  |     1
--   1 | user01  |     3
--   2 | user01  |     5
-- (4 行)

-- インタラクティブターミナルからログアウト
\q

今回は各TopicのPartition数が2個なので、上記の出力先テーブルのレコード順序は入れ替わる場合があります。

Topicに転送されたメッセージを確認

JDBC Source ConnectorがPostgreSQLテーブルからKafka Topicに転送したメッセージを確認します。

確認内容:

  • Topicdst_table01dst_table02に、それぞれ4件のメッセージが格納されていること
  • 同じキー(user01または``user02`)のメッセージは、同じPartitionに格納されていること
# コンソールConsumerでTopic `dst_table01` のメッセージを確認
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--topic dst_table01 \
--property print.partition=true \
--property print.offset=true \
--property print.key=true \
--property print.value=true \
--from-beginning

取得結果:

Partition:0     Offset:0        {"schema":{"type":"string","optional":true},"payload":"user01"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"table01"},"payload":{"id":1,"user_id":"user01","item":"apple"}}
Partition:0     Offset:1        {"schema":{"type":"string","optional":true},"payload":"user01"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"table01"},"payload":{"id":2,"user_id":"user01","item":"orange"}}
Partition:1     Offset:0        {"schema":{"type":"string","optional":true},"payload":"user02"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"table01"},"payload":{"id":3,"user_id":"user02","item":"banana"}}
Partition:1     Offset:1        {"schema":{"type":"string","optional":true},"payload":"user02"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"table01"},"payload":{"id":4,"user_id":"user02","item":"melon"}}

4件のメッセージが取得できました。またuser01のメッセージは全てPartition:0user02のメッセージは全てPartition:1に格納されていることを確認できました。

# コンソールConsumerでTopic `dst_table02` のメッセージを確認
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--topic dst_table02 \
--property print.partition=true \
--property print.offset=true \
--property print.key=true \
--property print.value=true \
--from-beginning

取得結果:

Partition:0     Offset:0        {"schema":{"type":"string","optional":true},"payload":"user01"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"int32","optional":true,"field":"point"}],"optional":false,"name":"table02"},"payload":{"id":1,"user_id":"user01","point":3}}
Partition:0     Offset:1        {"schema":{"type":"string","optional":true},"payload":"user01"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"int32","optional":true,"field":"point"}],"optional":false,"name":"table02"},"payload":{"id":2,"user_id":"user01","point":5}}
Partition:1     Offset:0        {"schema":{"type":"string","optional":true},"payload":"user02"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"int32","optional":true,"field":"point"}],"optional":false,"name":"table02"},"payload":{"id":3,"user_id":"user02","point":6}}
Partition:1     Offset:1        {"schema":{"type":"string","optional":true},"payload":"user02"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"int32","optional":true,"field":"point"}],"optional":false,"name":"table02"},"payload":{"id":4,"user_id":"user02","point":1}}

同様に4件のメッセージが取得できました。またuser01のメッセージは全てPartition:0user02のメッセージは全てPartition:1に格納されていることを確認できました。

デッドレターキュー(DLQ)の動作検証

Sinkコネクタが何らかの理由でメッセージを処理できなかった場合、そのメッセージをデッドレターキュー (Dead Letter Queue: DLQ) 用のTopicに隔離して処理をスキップします。

今回はJDBC SinkコネクタでDLQを有効化する下記設定を追加済みです。

"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-jdbc-sink-postgresql",
"errors.deadletterqueue.context.headers.enable": true

PostgreSQLテーブルの作成で説明した通り、今回はDLQをテストするため入力元テーブルtable01に対応する出力先テーブルdst_table01item列の型を変えています。

table01item列はvarchar(20)ですが、dst_table01item列はvarchar(10)のため、table01に11-20文字の値を挿入すると、JDBC Sink Connectorの転送時にエラーが発生します。このエラーが発生したレコードがデッドレターキューに隔離されます。

dst_table01テーブルにitemが16文字のレコード挿入して、DLQに記録される内容を確認してみます。

# PostgreSQLのインタラクティブターミナルを起動してデータベースにログイン
sudo -u kcuser psql -d kcdb
-- 入力元テーブルに不正なレコード(`item`が16文字)を挿入
INSERT INTO table01 (user_id, item) VALUES ('user01', 'super-apple-0000');

-- 入力元テーブルには挿入された
SELECT * FROM table01;
 id | user_id |       item       
----+---------+------------------
  1 | user01  | apple
  2 | user01  | orange
  3 | user02  | banana
  4 | user02  | melon
  5 | user01  | super-apple-0000
(5 )

-- 出力先テーブルには転送されなかった
SELECT * FROM dst_table01;
 id | user_id |  item  
----+---------+--------
  1 | user01  | apple
  2 | user01  | orange
  3 | user02  | banana
  4 | user02  | melon
(4 )

-- インタラクティブターミナルからログアウト
\q

DLQに記録された内容を確認します。

# コンソールConsumerを起動してDLQのメッセージを読み出し
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server broker01:9092,broker02:9092,broker03:9092 \
--topic dlq-jdbc-sink-postgresql \
--property print.headers=true \
--property print.key=true \
--property print.value=true \
--from-beginning

DLQに以下のメッセージが格納されていました。メッセージのヘッダーにエラー内容ERROR: 値は型character varying(10)としては長すぎますが記録されています。

__connect.errors.topic:dst_table01,__connect.errors.partition:0,__connect.errors.offset:2,__connect.errors.connector.name:jdbc_sink_postgresql,__connect.errors.task.id:0,__connect.errors.stage:TASK_PUT,__connect.errors.class.name:org.apache.kafka.connect.sink.SinkTask,__connect.errors.exception.class.name:java.sql.SQLException,__connect.errors.exception.message:Exception chain:
java.sql.BatchUpdateException: バッチ 0 INSERT INTO "kcdb"."public"."dst_table01" ("id","user_id","item") VALUES (('5'::int4),('user01'),('super-apple-0000')) はアボートしました: ERROR: 値は型character varying(10)としては長すぎます このバッチの他のエラーは getNextException を呼び出すことで確認できます。
org.postgresql.util.PSQLException: ERROR: 値は型character varying(10)としては長すぎます
org.postgresql.util.PSQLException: ERROR: 値は型character varying(10)としては長すぎます
,__connect.errors.exception.stacktrace:java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: バッチ 0 INSERT INTO "kcdb"."public"."dst_table01" ("id","user_id","item") VALUES (('5'::int4),('user01'),('super-apple-0000')) はアボートしました: ERROR: 値は型character varying(10)としては長すぎます このバッチの他のエラーは getNextException を呼び出すことで確認できます。
org.postgresql.util.PSQLException: ERROR: 値は型character varying(10)としては長すぎます
org.postgresql.util.PSQLException: ERROR: 値は型character varying(10)としては長すぎます

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:167)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.unrollAndRetry(JdbcSinkTask.java:152)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:605)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
        {"schema":{"type":"string","optional":true},"payload":"user01"} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"table01"},"payload":{"id":5,"user_id":"user01","item":"super-apple-0000"}}

スケールアウトの動作検証

Workerインスタンスを追加して以下の動作を確認します。

  1. サーバnode03上で新しいWorkerインスタンスを起動する
  2. 起動中のコネクタのConnectorとTaskの一部が、自動的にnode03上のWorkerに移動する

kc_scaleout.png

新しいサーバ上でWorkerインスタンスを起動すると、Workerクラスタに自動追加されます。

# Workerインスタンスを起動
/opt/kafka/bin/connect-distributed.sh worker.properties

しばらく待ってからコネクタの状態を確認します。

# JDBC Sourceコネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_source_postgresql/status

レスポンス(整形済み):

{
    "name": "jdbc_source_postgresql",
    "connector": {
        "state": "RUNNING",
        "worker_id": "node01:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "node03:8083"
        },
        {
            "id": 1,
            "state": "RUNNING",
            "worker_id": "node02:8083"
        }
    ],
    "type": "source"
}

tasks.id: 0worker_id: node03:8083に移動しました。

# JDBC Sinkコネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_sink_postgresql/status

レスポンス(整形済み):

{
    "name": "jdbc_sink_postgresql",
    "connector": {
        "state": "RUNNING",
        "worker_id": "node02:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "node03:8083"
        },
        {
            "id": 1,
            "state": "RUNNING",
            "worker_id": "node02:8083"
        },
        {
            "id": 2,
            "state": "RUNNING",
            "worker_id": "node01:8083"
        },
        {
            "id": 3,
            "state": "RUNNING",
            "worker_id": "node02:8083"
        },
        {
            "id": 4,
            "state": "RUNNING",
            "worker_id": "node01:8083"
        }
    ],
    "type": "sink"
}

tasks.id: 0worker_id: node03:8083に移動しました。

フォールトトレランスの動作検証

Workerインスタンスを停止して以下の動作を確認します。

  1. サーバnode02上のWorkerインスタンスを停止する
  2. node02上の各コネクタのConnectorとTaskが、自動的に別のWorkerに移動する

なお、クラスタからWorkerが離脱した際のリバランスは、実行前に最大scheduled.rebalance.max.delay.msだけ待機する場合があります。この待機時間はデフォルト設定では300000ミリ秒 (5分)です。リバランスの実行タイミングは様々な要因に影響されるため、待機する場合もあれば、即座にリバランスが実施される場合もあります。

詳しくは前回の投稿にある増分協調リバランス (Incremental Cooperative Rebalancing)の説明をご参照ください。

kc_fault_tolerance.png

node02のWorkerインスタンスを停止します。

# Workerインスタンスをkill(SIGKILLで強制終了)
jps -l | grep org.apache.kafka.connect.cli.ConnectDistributed | awk '{print $1}' | xargs kill -9

JDBC Sourceコネクタの状態を確認します。今回はリバランスの実行前に、約5分間の待機が発生しました。

Workerインスタンスの停止直後にコネクタの状態を確認するとConnector/Taskがnode02に配置されたままでしたが、5分後に再度確認するとConnector/Taskがnode02以外に移動していることが確認できました。

# JDBC Sourceコネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_source_postgresql/status

レスポンス(整形済み):

{
    "name": "jdbc_source_postgresql",
    "connector": {
        "state": "RUNNING",
        "worker_id": "node01:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "node03:8083"
        },
        {
            "id": 1,
            "state": "RUNNING",
            "worker_id": "node01:8083"
        }
    ],
    "type": "source"
}

tasks.id: 1worker_id: node01:8083に移動しました。

# JDBC Sinkコネクタの状態を確認
curl -X GET http://node01:8083/connectors/jdbc_sink_postgresql/status

レスポンス(整形済み):

{
    "name": "jdbc_sink_postgresql",
    "connector": {
        "state": "RUNNING",
        "worker_id": "node03:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "node03:8083"
        },
        {
            "id": 1,
            "state": "RUNNING",
            "worker_id": "node01:8083"
        },
        {
            "id": 2,
            "state": "RUNNING",
            "worker_id": "node01:8083"
        },
        {
            "id": 3,
            "state": "RUNNING",
            "worker_id": "node03:8083"
        },
        {
            "id": 4,
            "state": "RUNNING",
            "worker_id": "node01:8083"
        }
    ],
    "type": "sink"
}

connectortasks.id: 3worker_id: node03:8083に移動し、tasks.id: 1worker_id: node01:8083に移動しました。

おわりに

本稿ではKafka Connectを分散モードで動かして、JDBC Connectorを活用したPostgreSQLテーブル間のデータコピーの動作を確認しました。またコネクタのスケールアウトとフォールトトレランスの動作を確認しました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?