Use your own connector with Twitter and Aiven for Apache Kafka®の翻訳です。
2022年5月24日
Apache Kafka®のためのTwitterとAivenで独自のコネクタを使用する
Aivenがあなたの望むApache Kafka®コネクターを提供していない場合はどうしますか?TwitterのメッセージをKafkaに収集するために、外部コネクターを使用する方法を学びましょう。
Aiven for Apache Kafka®で独自のコネクタを使用する - Twitterソースの例
Apache Kafka®は、より多くの企業にとって中心的なデータバックボーンとなっており、豊富なApache Kafka®コネクトエコシステムがさまざまなプラグインを提供することで、Kafkaと多種多様なテクノロジーを簡単に統合することができます。
Apache Kafkaをセルフホスティングする場合、適切なプラグインと設定オプションを見つけることで、どのようなオープンソースのコネクタープラグインでも利用することができます。マネージドサービスを利用する場合、状況は変わります。サポートされているプラグインのリストが驚くほど多くても(Aivenが提供するすべてのプラグインをご覧ください)、あなたが探していた特定のsink-to-obscure-datatech
が見つからないかもしれません。それでも、Aivenは提案を受け入れ、リストに含めるために継続的に新しいコネクタプラグインを評価します!
また、自己管理型のApache Kafka® Connectクラスタを作成し、Aiven for Apache Kafkaに接続することもできます。この方法では、Aivenが提供するマネージドKafkaサービスの恩恵を受けながら、オープンソースのコネクターを自由に選択することができます。弊社開発者ポータルでプロセスの例を読むことができます。このブログポストでは、この町で最高のコネクタの1つ、おそらく新規学習者の90%が使用しているコネクタ、Twitterソースコネクタの使用を開始するために必要な手順を説明します。
Apache Kafkaクラスタの作成
Aiven CLIと専用の service create
関数を使って、この部分を手短に説明しよう。利用可能なすべてのパラメータは専用ページをご覧ください。また、すべての高度なカスタマイズパラメータのリストもご覧ください。Aivenが提供しています。今回のブログでは以下のパラメータを使用します:
AVN SERVICE CREATE DEMO-KAFKA
--service-type kafka
--cloud google-europe-west3 \
--plan business-4 ╱ -c kafka.auto_create_topics_enable
-c kafka.auto_create_topics_enable=true ¦ -c kafka_rest=true
-c kafka_rest=true` クリップボードにコピーする
上記のコマンドは、google-europe-west3
リージョンにbusiness-4
プランのAiven for Apache Kafkaインスタンスdemo-kafka
を作成します。また、トピックとREST APIの自動作成も有効にしている。このAPIは、トピックに格納されたデータを確認するために使用する。
サービスの開始中に、ローカルの Apache Kafka Connect クラスタを demo-kafka
サービスに統合するために使用する Java keystore and truststore を生成することができる。
以下のAiven CLIコマンドで両方のストアを作成できる:
avn service user-kafka-java-creds demo-kafka ˶ -d certsfolder
-d certsfolder
-p STOREPASSWORD123
--ユーザー名 avnadmin`Copy to clipboard
上記は必要な証明書を certsfolder
という名前のフォルダにダウンロードし、同じフォルダに client.keystore.p12
という名前のキーストアファイルと client.truststore.jks
という名前のトラストストアを作成します。ストアとキーのセキュリティのために異なるシークレットを設定することに熱心であれば、開発者専用ポータル ドキュメント を確認するとよいだろう。
自己管理型の Apache Kafka Connect クラスタを作成する。
さて、シェルのスキルを使う時が来た。唯一の前提条件は、JDKがインストールされていることだ。まずはApache Kafkaのバイナリを入手することから始めよう。
wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz`Copy to clipboard
次に解凍する。
tar -xzf kafka_2.13-3.1.0.tgz` クリップボードにコピーする。
これで、すべての Apache Kafka グッズが入った kafka_2.13-3.1.0
というフォルダが作成される。
Twitterソースコネクターの依存関係を追加する
Twitterデータの取得を開始するには、専用オープンソースコネクターを使用します。関連するコードは
wget https://github.com/jcustenborder/kafka-connect-twitter/releases/download/0.2.26/kafka-connect-twitter-0.2.26.tar.gz`Copy to clipboard
tarファイルを解凍する
mkdir twitter-connector
tar -xvf kafka-connect-twitter-0.2.26.tar.gz -C twitter-connector`クリップボードにコピーする
上記のコマンドを実行すると、tarファイルが解凍され、twitter-connector
というフォルダに格納される。このフォルダには、Apache Kafka Connectクラスタが読み込む必要があるすべてのファイルが格納されたusr/share/kafka-connect/kafka-connect-twitter
というサブフォルダがある。これらのファイルは kafka_2.13-3.1.0
フォルダ内の plugin
サブフォルダに移動できる。
`mkdir kafka\_2.13-3.1.0/plugins
mv twitter-connector/usr/share/kafka-connect/kafka-connect-twitter kafka_2.13-3.1.0/plugins/lib``Copy to clipboard
Apache Kafka Connect設定ファイルを定義する
次に、ローカルのKafka ConnectクラスタがAiven for Apache Kafkaを指すようにするための設定ファイルを定義します。開発者ポータルテンプレートを使用して、my-connect-distributed.properties
という名前のファイルを作成し、代入します:
- PATH_TO_KAFKA_HOME` を Apache Kafka のバイナリがあるディレクトリに置き換える。
- APACHE_KAFKA_HOST:APACHE_KAFKA_PORT
を
demo-kafka` のホスト名とポートに置き換える。
avn service get demo-kafka --format '{service_uri}'`Copy to clipboard
- TRUSTSTORE_PATH
と
KEYSTORE_PATHには、キーストアとトラストストアのファイルが格納されているフォルダのパスを指定する(前のセクションで定義したコマンドを使用した場合は
certsfolder`)。 - KEY_TRUST_SECRET
にキーストアとトラストストアの秘密鍵 (前のセクションで定義したコマンドを使用した場合は
STOREPASSWORD123`) を指定する。
ローカルのConnectクラスタを起動する。
設定と必要なファイルがすべて揃ったので、ローカルのApache Kafka Connectクラスタを次のように起動します:
./kafka_2.13-3.1.0/bin/connect-distributed.sh ./my-connect-distributed.properties`Copy to clipboard
Twitterへのアクセスを設定する
Apache Kafka Connectクラスタが稼働しているので、Twitterの開発者ポータルページの開発者ポータルにアクセスして、コネクターがツイートのソーシングを開始するために必要な認証情報を提供する新しいアプリケーションを作成します。
1.メインダッシュボードのページで、v2エンドポイントを使用して新しいプロジェクトを作成します。プロジェクトには、以下を指定する必要があります:
* プロジェクト名
* APIの探索**、ボットの作成**、コンシューマ・ツールの作成**など、さまざまな選択肢の中から、目的に合ったものを選ぶことができます。
* プロジェクトの説明では、目的の概要を説明することができます。
* 既存のアプリを使うのか、新しく作るのかを定義します。私たちはTwitter APIの初心者なので、新しいアプリを作成します。
2.App Setup*セクションでは、新しいアプリの設定を定義します:
* アプリの**環境**、ブログ投稿の目的では*Development*を選択します。
* アプリに覚えやすい名前を付けることで、そのアプリが何のために使われているのかを追跡することができます。twitter-kafka-connect-<SUFFIX>`と呼ぶことができます。ここで、`<SUFFIX>`は一意な識別子である必要があります(すべてのアプリ名は一意である必要があります)。
* アプリの**キーとトークン**は、必要なキーを生成して取り出すことができる。このセクションから、後にApache Kafkaコネクタのセットアップで使用する**API Key**と**API Key Secret**をコピーする必要がある(これらを`TWITTER_API_KEY`と`TWITTER_API_SECRET`として参照する)。すべてのセットアップが完了したら、**App Settings**を選択すると、アプリとプロジェクトが正常にビルドされたことを伝える以下のような画面が表示されるはずだ。
3.次に、Keys and tokens タブに移動して、コネクタの動作に必要な追加シークレットを生成(または再生成)します。
4.アクセス・トークンとシークレットを生成します。
* アクセストークン: `TWITTER_ACCESS_TOKEN`
* アクセストークン・シークレット: `TWITTER_ACCESS_TOKEN_SECRET`
5.Twitter開発者ポータルでの最後の設定は、私たちのプロジェクトのためにElevatedアクセスをリクエストすることです。Twitter API v2がリリースされたため、デフォルトの(Essential)アクセスではv2としかやり取りできません。コーディング・スキル・レベルやプロジェクトの説明を含むいくつかの情報を入力し、Developer Agreementに同意する必要がある。リクエストはTwitterによって公式にチェックされ、すべてがうまくいけば、数時間かかることもあるが、すぐに昇格アカウントを手に入れることができるだろう。
ソースコネクタ設定ファイルの作成
上記で取得したツイッターの秘密を、twitter-source.json
という名前の設定ファイルに以下の内容で記述する。
`{
"name": "twitter_connector"、
"config":
{
"tasks.max": "1"、
"connector.class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector"、
"process.delees": "false"、
"filter.keywords": "データベース"、
"kafka.status.topic": "twitter-topic"、
"twitter.auth.consumerKey": "TWITTER_API_KEY"、
"twitter.oauth.consumerSecret": "TWITTER_API_SECRET"、
"twitter.oauth.accessToken": "TWITTER_ACCESS_TOKEN"、
"twitter.oauth.accessTokenSecret": "TWITTER_ACCESS_TOKEN_SECRET"
}
クリップボードにコピーする
以下はカスタマイズ可能なパラメータである:
-
"process.deletes": "false"
: 簡単のため、ツイートの削除を処理しない。 -
"filter.keywords": "database"
:database
キーワードを含むツイートをフィルタリングする。このキーワードは、Apache Kafkaにデータが表示されるように、ツイートの中で発生する、読んだ瞬間に話題となるものに置き換えることをお勧めします。 -
"kafka.status.topic": "twitter-topic"
: 対象のトピックはtwitter-topic
という名前になる。 -
twitter.oauth
パラメータの値は、前のステップで取得した twitter の秘密情報で変更する必要がある。
Twitter ソースコネクタを起動する
Apache Kafka REST APIを使ってコネクタを起動します:
``curl -s -H "Content-Type: application/json" -X POST ˶ -d @twitter-source.json ˶.
-d @twitter-source.json \
http://localhost:8083/connectors/`クリップボードにコピー
Apache Kafkaで出力を確認する
上記の curl
コマンドは、twitter-source.json
コネクタ設定ファイルを渡す connectors
REST エンドポイントを使用する。これで、twitter-topic
Apache Kafka トピックに apachekafka
を含むツイートが流れてくるのが確認できるはずだ。KarapaceのREST APIを有効にしたので、Aiven Consoleにアクセスし、Topicタブの下にあるdemo-kafka
サービス名をクリックすることで、トピックデータを確認することができる。
これはストリーミング・ソリューションなので、定義したキーワードを含むツイートが書き込まれた場合のみ、トピックにデータが表示されることに注意してください。コネクターをテストしたい場合は、キーワードを含むツイートを自分で書いてみてください!
ビッグニュース:現在、Aiven for Apache KafkaでAivenがサポートしていないApache Kafka® Connectorプラグインを使用しています!Apache Kafka Connectクラスタを管理する負担はありますが、私たちのソース/ターゲット技術が利用可能なコネクタのどれにも対応していない場合に最適なオプションかもしれません。
クイック・ヒント: コネクタの設定を変更する必要がある場合は、まずコネクタを
curl -s -X DELETE http://localhost:8083/connectors/twitter_connector`Copy to clipboard
そして、更新された設定ファイルを curl
コマンドで PUSH
オプションを指定して送信します。
マネージド Apache Kafka サービスとコネクタプラグイン選択の自由度
Apache Kafkaクラスタを完全に管理するのは面倒な作業であり、AivenのようなマネージドサービスをApache Kafkaのために使用することは、通常、賢明なアイデアです。特殊なユースケースの場合、探している構成やコネクタがAivenでサポートされていないことに気づくかもしれません。Aiven for Apache Kafkaを使用すれば、特定の統合問題を解決するローカルのApache Kafka Connectクラスタを簡単に統合でき、全体的なApache Kafkaマネージドソリューションと豊富なオープンソースコネクタの両方のメリットを享受できます。
詳しくは以下のリソースをご覧ください: