1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

kcctlでApache Kafka® Connectコネクタを管理する

Posted at

Manage Apache Kafka® Connect connectors with kcctlの翻訳です。

2021年10月13日

kcctl で Apache Kafka® Connect コネクタを管理する

kcctlを使用すると、オンプレム、Aiven、その他のクラウドを問わず、1つのコマンドラインツールですべてのApache Kafka Connectインスタンスを管理できます。使い方をご覧ください。

Apache Kafkaは、Kafka Connectがブリッジとして機能する企業データのバックボーンとして広く使用されています。このようにKafkaは、他のテクノロジーと簡単に、信頼性高く、スケーラブルに統合することができます。Kafka Connect REST APIは、Webコールでコネクタを管理する方法を提供しますが、ターミナルでURLを作成するのは時に厄介です。

このブログポストでは、Kafka Connect用の新しいオープンソースのコマンドラインツールであるkcctlについて説明します。Apache Kafkaと統合し、他のシステムとの接続を管理する方法がわかります。

AivenはAiven Command Line Interfaceで同様の機能を提供しています。CLIを使用して、Aivenサービス上で動作するKafka Connectコネクタの作成、削除、変更、検証、一時停止、復元を行うことができます。すべてのKafka ConnectインスタンスがAivenサービスである場合、Aivenコマンドラインインターフェイスがすべて必要です。

しかし、オンプレミス、Aiven、他のクラウドプロバイダーなど、どのKafka Connectインスタンスでも同じツールを使いたい場合は、kcctlが便利です。

Kafka ConnectでApache Kafkaインスタンスを作成する

この記事のプロセスに従うには、Kafka Connectを備えたApache Kafka環境がすでに稼働していることを確認してください。もしお持ちでない場合でもご安心ください。Aiven Console、またはAiven Command Line Interfaceで、以下のコマンドを実行するだけです:

AVN SERVICE CREATE DEMO-KAFKA ⇦ -SERVICE-TYPE KAFKA
 --service-type kafka
 --cloud google-europe-west3  \
 --plan business-4
 -c kafka_connect=true ¦ -c kafka.auto_create=true
 -c kafka.auto_create_topics_enable=true` クリップボードにコピーする

このコマンドは、google-europe-west3リージョンに3ノード(business-4プランを使用)のdemo-kafkaという名前のAiven for Apache Kafkaクラスタを作成し、トピックとKafka Connectの自動作成を有効にします。Aivenでは、ビジネスプランとプレミアムプランのKafkaクラスタの一部として、または独立したスタンドアロンのクラスタとしてKafka Connectを導入できます。Aiven for Apache Kafkaおよび関連するKafka Connectの詳細については、専用ページをご覧ください。

サービスの準備が整うまで待ちましょう:

avn service wait demo-kafka`クリップボードにコピーする

kcctl をインストールする

この記事を書いている時点では、kcctl は早期アクセスリリースである。現在のインストール手順は GitHub リポジトリ にあります。

kcctl がインストールされたら、binサブディレクトリをPATH` に追加して実行し、ターミナルで動作することを確認する:

kcctl`クリップボードにコピーする

すべて正しくできていれば、利用状況が表示されるはずだ。いよいよKafka Connectクラスタに接続する。

接続する

Kafka Connectに接続するには、まずクラスタのURLを取得します。Aiven CLIとjqを使って、JSON出力を解析します:

avn service get demo-kafka --json | jq '.connection_info.kafka_connect_uri'`Copy to clipboard

次のコマンドを実行して kcctl 設定コンテキストを作成し、cluster パラメータを適宜置き換える。

kcctl config set-context ¦ --cluster @demo-kafka-<PROJECT
 --cluster https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443(日本語)
 my_kafka_cluster`Copy to clipboard

上記により、demo-kafkaインスタンスを指すmy_kafka_clusterという名前のコンテキストが作成される。設定を確認する

kcctl info`クリップボードにコピーする

現在の kcctl 設定コンテキストの定義を取得する:

URL: https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443
バージョン: 2.7.2-SNAPSHOT
コミット: d15ddddd3ef3f5ef
Kafka Cluster ID: -DvILyiXQxSpnFSK9M1qgQ`Copy to clipboard

PostgreSQLでデータソースを作成する

コネクタの動作を確認するために、PostgreSQLデータベースを作成し、データをKafkaに取り込むKafka Connect JDBC source connectorを設定する。コネクタはPostgreSQLデータベースに格納されたpastaという名前のテーブルからデータを取得し、Kafkaトピックに含める。

PostgreSQLデータベースが手元にない場合は、以下のAiven CLIコマンドでAivenで作成できます:

avn service create demo-pg ˶ --service-type pg ˶.
 --サービスタイプ pg
 --cloud google-europe-west3  \
 --plan hobbyist`Copy to clipboard

demo-pgのPostgreSQLインスタンスが稼働したら(待機するにはavn service wait demo-pg`を使用する)、接続する:

avn service cli demo-pg`クリップボードにコピーする

次に、ターミナルで以下のステートメントを使用して、サンプルの pasta テーブルを作成し、そのテーブルをデータで埋めます:

テーブルpasta (id serial, name varchar, cooking_minutes int)を作成する;
insert into pasta (name, cooking_minutes) values ('spaghetti', 8);
insert into pasta (name, cooking_minutes) values ('spaghettini', 6);
insert into pasta (name, cooking_minutes) values ('fusilli', 9);
insert into pasta (name, cooking_minutes) values ('trofie', 5);`Copy to clipboard

Kafka Connectコネクタを新規作成する

ソースデータが利用可能になったら、Kafka Connect JDBC ソースコネクタを作成し、id カラムに基づいて pasta テーブルを incremental モードでソーシングする。ホスト名、ポート、ユーザー、パスワードなど、必要な PostgreSQL 接続の詳細を取得するには、次のコマンドを使用する:

avn service get demo-pg --format '{service_uri_params}'`Copy to clipboard

以下の JSON コンテンツを含む my_jdbc_connect_source.json という名前のファイルを作成する(<HOST><PORT><PASSWORD> は前のステップで取得した実際の情報に置き換える):

`{
 "connector.class":"io.aiven.connect.jdbc.JdbcSourceConnector"、
 "connection.url":"jdbc:postgresql://<HOST>:<PORT>/defaultdb?sslmode=require"、
 "connection.user":"avnadmin"、
 "connection.password":"<PASSWORD>"、
 "table.whitelist":"pasta"、
 "mode":"インクリメント"、
 "incrementing.column.name": "id"、
 "poll.interval.ms":"2000",
 "topic.prefix":"pg_source_"
クリップボードにコピーする

ここで、新しいターミナルウィンドウで kcctl を使ってコネクタ作成を呼び出す:

kcctl apply -f my_jdbc_connect_source.json --name pg-incremental-source`Copy to clipboard

コネクタが正常に作成されたことを確認します:

kcctl describe connector pg-incremental-source`Copy to clipboard

コマンドの出力は RUNNING 状態の pg-incremental-source コネクタと、それに関連するすべての詳細を表示します。

kcctl で Apache Kafka のデータを確認する。

kcatを使って、PostgreSQLに同じデータが保存されているpg_source_pastaという新しいKafkaトピックを確認することもできる。まず、必要な証明書をダウンロードすることから始めよう:

avnサービスuser-creds-download demo-kafka  \
 --ユーザー名 avnadmin
 -d certs`Copy to clipboard

次に、以下のエントリを含む kcat.config ファイルを作成する:

`bootstrap.servers=<HOST>:<PORT>
security.protocol=ssl
ssl.key.location=certs/service.key
ssl.certificate.location=certs/service.cert
ssl.ca.location=certs/ca.pem`クリップボードにコピーする

そして、以下の kcat を呼び出して pg_source_pasta トピックから読み込む:

kcat -F kcat.config -C -t pg_source_pasta`Copy to clipboard

ここで、PostgreSQL の pasta テーブルにいくつかの行を挿入すると、kcat を介して Kafka に同じ変更が表示される:

kcat経由でKafkaに流れるPostgresqlの挿入と関連行を示すGif

kcctl による Kafka Connect コネクタの管理

コネクタの作成は kcctl を使ったゲームの一部に過ぎません!コネクタを管理することもできます。以下のコマンドを実行するだけです:

kcctl get connectors`クリップボードにコピーする

コネクタを一時停止したり、再開したりする必要がありますか?例えば、以下のコードでは pg-incremental-source という名前のコネクタを一時停止している:

kcctl pause connector pg-incremental-source`Copy to clipboard

どのようなコネクタを作成できますか?ご質問ありがとうございます!プラグインの完全なリストは

kcctl get plugins`クリップボードにコピーする

このコマンドは、利用可能なすべてのコネクタプラグインを、関連するタイプ (source または sink) とバージョンと共に表示します。このコマンドを使用すると、Aiven for Apache Kafkaで作成できるKafka Connectコネクタタイプの管理リストを確認できます。

タイプ クラス バージョン
 ソース com.couchbase.connect.kafka.CouchbaseSourceConnector 4.0.6
 ソース com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceCon 2.1.3
 ネクター
 ソース com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector 2.1.3
 ソース com.google.pubsub.kafka.source.CloudPubSubSourceConnector 2.7.2-SNAPSHOT
 ソース com.google.pubsublite.kafka.source.PubSubLiteSourceConnector 2.7.2-SNAPSHOT

 ...
 sink io.aiven.kafka.connect.gcs.GcsSinkConnector 0.9.0
 シンク io.aiven.kafka.connect.http.HttpSinkConnector 0.4.0
 シンク io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector 2.12.0
 ...`クリップボードにコピー

ラッピング

ターミナルから Kafka Connect コネクタを管理できるようになった。kcctl` を使えば、私たちの環境にあるあらゆるコネクタの検査、デプロイ、更新、一時停止、リストアが簡単にできる。これにより、オンプレミス、セルフホスト、AivenにデプロイされたApache Kafkaインスタンスのエンドユーザーエクスペリエンスが統一される。

**さらに読む

まだAivenのサービスをご利用になっていませんか?https://console.aiven.io/signupから今すぐ無料トライアルにお申し込みください!

また、changelogblogのRSSフィード、またはLinkedInTwitterのアカウントをフォローし、製品や機能関連の最新情報をご確認ください。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?