Manage Apache Kafka® Connect connectors with kcctlの翻訳です。
2021年10月13日
kcctl で Apache Kafka® Connect コネクタを管理する
kcctlを使用すると、オンプレム、Aiven、その他のクラウドを問わず、1つのコマンドラインツールですべてのApache Kafka Connectインスタンスを管理できます。使い方をご覧ください。
Apache Kafkaは、Kafka Connectがブリッジとして機能する企業データのバックボーンとして広く使用されています。このようにKafkaは、他のテクノロジーと簡単に、信頼性高く、スケーラブルに統合することができます。Kafka Connect REST APIは、Webコールでコネクタを管理する方法を提供しますが、ターミナルでURLを作成するのは時に厄介です。
このブログポストでは、Kafka Connect用の新しいオープンソースのコマンドラインツールであるkcctlについて説明します。Apache Kafkaと統合し、他のシステムとの接続を管理する方法がわかります。
AivenはAiven Command Line Interfaceで同様の機能を提供しています。CLIを使用して、Aivenサービス上で動作するKafka Connectコネクタの作成、削除、変更、検証、一時停止、復元を行うことができます。すべてのKafka ConnectインスタンスがAivenサービスである場合、Aivenコマンドラインインターフェイスがすべて必要です。
しかし、オンプレミス、Aiven、他のクラウドプロバイダーなど、どのKafka Connectインスタンスでも同じツールを使いたい場合は、kcctlが便利です。
Kafka ConnectでApache Kafkaインスタンスを作成する
この記事のプロセスに従うには、Kafka Connectを備えたApache Kafka環境がすでに稼働していることを確認してください。もしお持ちでない場合でもご安心ください。Aiven Console、またはAiven Command Line Interfaceで、以下のコマンドを実行するだけです:
AVN SERVICE CREATE DEMO-KAFKA ⇦ -SERVICE-TYPE KAFKA
--service-type kafka
--cloud google-europe-west3 \
--plan business-4
-c kafka_connect=true ¦ -c kafka.auto_create=true
-c kafka.auto_create_topics_enable=true` クリップボードにコピーする
このコマンドは、google-europe-west3
リージョンに3ノード(business-4
プランを使用)のdemo-kafka
という名前のAiven for Apache Kafkaクラスタを作成し、トピックとKafka Connectの自動作成を有効にします。Aivenでは、ビジネスプランとプレミアムプランのKafkaクラスタの一部として、または独立したスタンドアロンのクラスタとしてKafka Connectを導入できます。Aiven for Apache Kafkaおよび関連するKafka Connectの詳細については、専用ページをご覧ください。
サービスの準備が整うまで待ちましょう:
avn service wait demo-kafka`クリップボードにコピーする
kcctl
をインストールする
この記事を書いている時点では、kcctl
は早期アクセスリリースである。現在のインストール手順は GitHub リポジトリ にあります。
kcctl がインストールされたら、
binサブディレクトリを
PATH` に追加して実行し、ターミナルで動作することを確認する:
kcctl`クリップボードにコピーする
すべて正しくできていれば、利用状況が表示されるはずだ。いよいよKafka Connectクラスタに接続する。
接続する
Kafka Connectに接続するには、まずクラスタのURLを取得します。Aiven CLIとjqを使って、JSON出力を解析します:
avn service get demo-kafka --json | jq '.connection_info.kafka_connect_uri'`Copy to clipboard
次のコマンドを実行して kcctl
設定コンテキストを作成し、cluster
パラメータを適宜置き換える。
kcctl config set-context ¦ --cluster @demo-kafka-<PROJECT
--cluster https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443(日本語)
my_kafka_cluster`Copy to clipboard
上記により、demo-kafka
インスタンスを指すmy_kafka_cluster
という名前のコンテキストが作成される。設定を確認する
kcctl info`クリップボードにコピーする
現在の kcctl
設定コンテキストの定義を取得する:
URL: https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443
バージョン: 2.7.2-SNAPSHOT
コミット: d15ddddd3ef3f5ef
Kafka Cluster ID: -DvILyiXQxSpnFSK9M1qgQ`Copy to clipboard
PostgreSQLでデータソースを作成する
コネクタの動作を確認するために、PostgreSQLデータベースを作成し、データをKafkaに取り込むKafka Connect JDBC source connectorを設定する。コネクタはPostgreSQLデータベースに格納されたpasta
という名前のテーブルからデータを取得し、Kafkaトピックに含める。
PostgreSQLデータベースが手元にない場合は、以下のAiven CLIコマンドでAivenで作成できます:
avn service create demo-pg ˶ --service-type pg ˶.
--サービスタイプ pg
--cloud google-europe-west3 \
--plan hobbyist`Copy to clipboard
demo-pgのPostgreSQLインスタンスが稼働したら(待機するには
avn service wait demo-pg`を使用する)、接続する:
avn service cli demo-pg`クリップボードにコピーする
次に、ターミナルで以下のステートメントを使用して、サンプルの pasta
テーブルを作成し、そのテーブルをデータで埋めます:
テーブルpasta (id serial, name varchar, cooking_minutes int)を作成する;
insert into pasta (name, cooking_minutes) values ('spaghetti', 8);
insert into pasta (name, cooking_minutes) values ('spaghettini', 6);
insert into pasta (name, cooking_minutes) values ('fusilli', 9);
insert into pasta (name, cooking_minutes) values ('trofie', 5);`Copy to clipboard
Kafka Connectコネクタを新規作成する
ソースデータが利用可能になったら、Kafka Connect JDBC ソースコネクタを作成し、id
カラムに基づいて pasta
テーブルを incremental
モードでソーシングする。ホスト名、ポート、ユーザー、パスワードなど、必要な PostgreSQL 接続の詳細を取得するには、次のコマンドを使用する:
avn service get demo-pg --format '{service_uri_params}'`Copy to clipboard
以下の JSON コンテンツを含む my_jdbc_connect_source.json
という名前のファイルを作成する(<HOST>
、<PORT>
、<PASSWORD>
は前のステップで取得した実際の情報に置き換える):
`{
"connector.class":"io.aiven.connect.jdbc.JdbcSourceConnector"、
"connection.url":"jdbc:postgresql://<HOST>:<PORT>/defaultdb?sslmode=require"、
"connection.user":"avnadmin"、
"connection.password":"<PASSWORD>"、
"table.whitelist":"pasta"、
"mode":"インクリメント"、
"incrementing.column.name": "id"、
"poll.interval.ms":"2000",
"topic.prefix":"pg_source_"
クリップボードにコピーする
ここで、新しいターミナルウィンドウで kcctl
を使ってコネクタ作成を呼び出す:
kcctl apply -f my_jdbc_connect_source.json --name pg-incremental-source`Copy to clipboard
コネクタが正常に作成されたことを確認します:
kcctl describe connector pg-incremental-source`Copy to clipboard
コマンドの出力は RUNNING
状態の pg-incremental-source
コネクタと、それに関連するすべての詳細を表示します。
kcctl で Apache Kafka のデータを確認する。
kcatを使って、PostgreSQLに同じデータが保存されているpg_source_pasta
という新しいKafkaトピックを確認することもできる。まず、必要な証明書をダウンロードすることから始めよう:
avnサービスuser-creds-download demo-kafka \
--ユーザー名 avnadmin
-d certs`Copy to clipboard
次に、以下のエントリを含む kcat.config
ファイルを作成する:
`bootstrap.servers=<HOST>:<PORT>
security.protocol=ssl
ssl.key.location=certs/service.key
ssl.certificate.location=certs/service.cert
ssl.ca.location=certs/ca.pem`クリップボードにコピーする
そして、以下の kcat
を呼び出して pg_source_pasta
トピックから読み込む:
kcat -F kcat.config -C -t pg_source_pasta`Copy to clipboard
ここで、PostgreSQL の pasta
テーブルにいくつかの行を挿入すると、kcat
を介して Kafka に同じ変更が表示される:
kcctl による Kafka Connect コネクタの管理
コネクタの作成は kcctl
を使ったゲームの一部に過ぎません!コネクタを管理することもできます。以下のコマンドを実行するだけです:
kcctl get connectors`クリップボードにコピーする
コネクタを一時停止したり、再開したりする必要がありますか?例えば、以下のコードでは pg-incremental-source
という名前のコネクタを一時停止している:
kcctl pause connector pg-incremental-source`Copy to clipboard
どのようなコネクタを作成できますか?ご質問ありがとうございます!プラグインの完全なリストは
kcctl get plugins`クリップボードにコピーする
このコマンドは、利用可能なすべてのコネクタプラグインを、関連するタイプ (source
または sink
) とバージョンと共に表示します。このコマンドを使用すると、Aiven for Apache Kafkaで作成できるKafka Connectコネクタタイプの管理リストを確認できます。
タイプ クラス バージョン
ソース com.couchbase.connect.kafka.CouchbaseSourceConnector 4.0.6
ソース com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceCon 2.1.3
ネクター
ソース com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector 2.1.3
ソース com.google.pubsub.kafka.source.CloudPubSubSourceConnector 2.7.2-SNAPSHOT
ソース com.google.pubsublite.kafka.source.PubSubLiteSourceConnector 2.7.2-SNAPSHOT
...
sink io.aiven.kafka.connect.gcs.GcsSinkConnector 0.9.0
シンク io.aiven.kafka.connect.http.HttpSinkConnector 0.4.0
シンク io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector 2.12.0
...`クリップボードにコピー
ラッピング
ターミナルから Kafka Connect コネクタを管理できるようになった。kcctl` を使えば、私たちの環境にあるあらゆるコネクタの検査、デプロイ、更新、一時停止、リストアが簡単にできる。これにより、オンプレミス、セルフホスト、AivenにデプロイされたApache Kafkaインスタンスのエンドユーザーエクスペリエンスが統一される。
**さらに読む
まだAivenのサービスをご利用になっていませんか?https://console.aiven.io/signupから今すぐ無料トライアルにお申し込みください!
また、changelogやblogのRSSフィード、またはLinkedInやTwitterのアカウントをフォローし、製品や機能関連の最新情報をご確認ください。