1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

TwitterとAiven for Apache Kafka®で独自のコネクタを使用する

Posted at

Use your own connector with Twitter and Aiven for Apache Kafka®の翻訳です。

2022年5月24日

Apache Kafka®のためのTwitterとAivenで独自のコネクタを使用する

Aivenがあなたの望むApache Kafka®コネクターを提供していない場合はどうしますか?TwitterのメッセージをKafkaに収集するために、外部コネクターを使用する方法を学びましょう。

Aiven for Apache Kafka®で独自のコネクタを使用する - Twitterソースの例

Apache Kafka®は、より多くの企業にとって中心的なデータバックボーンとなっており、豊富なApache Kafka®コネクトエコシステムがさまざまなプラグインを提供することで、Kafkaと多種多様なテクノロジーを簡単に統合することができます。

Apache Kafkaをセルフホスティングする場合、適切なプラグインと設定オプションを見つけることで、どのようなオープンソースのコネクタープラグインでも利用することができます。マネージドサービスを利用する場合、状況は変わります。サポートされているプラグインのリストが驚くほど多くても(Aivenが提供するすべてのプラグインをご覧ください)、あなたが探していた特定のsink-to-obscure-datatechが見つからないかもしれません。それでも、Aivenは提案を受け入れ、リストに含めるために継続的に新しいコネクタプラグインを評価します!

また、自己管理型のApache Kafka® Connectクラスタを作成し、Aiven for Apache Kafkaに接続することもできます。この方法では、Aivenが提供するマネージドKafkaサービスの恩恵を受けながら、オープンソースのコネクターを自由に選択することができます。弊社開発者ポータルでプロセスの例を読むことができます。このブログポストでは、この町で最高のコネクタの1つ、おそらく新規学習者の90%が使用しているコネクタ、Twitterソースコネクタの使用を開始するために必要な手順を説明します。

Apache Kafkaクラスタの作成

Aiven CLIと専用の service create 関数を使って、この部分を手短に説明しよう。利用可能なすべてのパラメータは専用ページをご覧ください。また、すべての高度なカスタマイズパラメータのリストもご覧ください。Aivenが提供しています。今回のブログでは以下のパラメータを使用します:

AVN SERVICE CREATE DEMO-KAFKA
 --service-type kafka
 --cloud google-europe-west3  \
 --plan business-4 ╱ -c kafka.auto_create_topics_enable
 -c kafka.auto_create_topics_enable=true ¦ -c kafka_rest=true
 -c kafka_rest=true` クリップボードにコピーする

上記のコマンドは、google-europe-west3リージョンにbusiness-4プランのAiven for Apache Kafkaインスタンスdemo-kafkaを作成します。また、トピックとREST APIの自動作成も有効にしている。このAPIは、トピックに格納されたデータを確認するために使用する。

サービスの開始中に、ローカルの Apache Kafka Connect クラスタを demo-kafka サービスに統合するために使用する Java keystore and truststore を生成することができる。

以下のAiven CLIコマンドで両方のストアを作成できる:

avn service user-kafka-java-creds demo-kafka ˶ -d certsfolder
 -d certsfolder
 -p STOREPASSWORD123
 --ユーザー名 avnadmin`Copy to clipboard

上記は必要な証明書を certsfolder という名前のフォルダにダウンロードし、同じフォルダに client.keystore.p12 という名前のキーストアファイルと client.truststore.jks という名前のトラストストアを作成します。ストアとキーのセキュリティのために異なるシークレットを設定することに熱心であれば、開発者専用ポータル ドキュメント を確認するとよいだろう。

自己管理型の Apache Kafka Connect クラスタを作成する。

さて、シェルのスキルを使う時が来た。唯一の前提条件は、JDKがインストールされていることだ。まずはApache Kafkaのバイナリを入手することから始めよう。

wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz`Copy to clipboard

次に解凍する。

tar -xzf kafka_2.13-3.1.0.tgz` クリップボードにコピーする。

これで、すべての Apache Kafka グッズが入った kafka_2.13-3.1.0 というフォルダが作成される。

Twitterソースコネクターの依存関係を追加する

Twitterデータの取得を開始するには、専用オープンソースコネクターを使用します。関連するコードは

wget https://github.com/jcustenborder/kafka-connect-twitter/releases/download/0.2.26/kafka-connect-twitter-0.2.26.tar.gz`Copy to clipboard

tarファイルを解凍する

mkdir twitter-connector
tar -xvf kafka-connect-twitter-0.2.26.tar.gz -C twitter-connector`クリップボードにコピーする

上記のコマンドを実行すると、tarファイルが解凍され、twitter-connectorというフォルダに格納される。このフォルダには、Apache Kafka Connectクラスタが読み込む必要があるすべてのファイルが格納されたusr/share/kafka-connect/kafka-connect-twitterというサブフォルダがある。これらのファイルは kafka_2.13-3.1.0 フォルダ内の plugin サブフォルダに移動できる。

`mkdir kafka\_2.13-3.1.0/plugins
mv twitter-connector/usr/share/kafka-connect/kafka-connect-twitter kafka_2.13-3.1.0/plugins/lib``Copy to clipboard

Apache Kafka Connect設定ファイルを定義する

次に、ローカルのKafka ConnectクラスタがAiven for Apache Kafkaを指すようにするための設定ファイルを定義します。開発者ポータルテンプレートを使用して、my-connect-distributed.propertiesという名前のファイルを作成し、代入します:

  • PATH_TO_KAFKA_HOME` を Apache Kafka のバイナリがあるディレクトリに置き換える。
  • APACHE_KAFKA_HOST:APACHE_KAFKA_PORTdemo-kafka` のホスト名とポートに置き換える。
avn service get demo-kafka --format '{service_uri}'`Copy to clipboard
  • TRUSTSTORE_PATHKEYSTORE_PATHには、キーストアとトラストストアのファイルが格納されているフォルダのパスを指定する(前のセクションで定義したコマンドを使用した場合はcertsfolder`)。
  • KEY_TRUST_SECRETにキーストアとトラストストアの秘密鍵 (前のセクションで定義したコマンドを使用した場合はSTOREPASSWORD123`) を指定する。

ローカルのConnectクラスタを起動する。

設定と必要なファイルがすべて揃ったので、ローカルのApache Kafka Connectクラスタを次のように起動します:

./kafka_2.13-3.1.0/bin/connect-distributed.sh ./my-connect-distributed.properties`Copy to clipboard

Twitterへのアクセスを設定する

Apache Kafka Connectクラスタが稼働しているので、Twitterの開発者ポータルページ開発者ポータルにアクセスして、コネクターがツイートのソーシングを開始するために必要な認証情報を提供する新しいアプリケーションを作成します。

1.メインダッシュボードのページで、v2エンドポイントを使用して新しいプロジェクトを作成します。プロジェクトには、以下を指定する必要があります:

* プロジェクト名
* APIの探索**、ボットの作成**、コンシューマ・ツールの作成**など、さまざまな選択肢の中から、目的に合ったものを選ぶことができます。
* プロジェクトの説明では、目的の概要を説明することができます。
* 既存のアプリを使うのか、新しく作るのかを定義します。私たちはTwitter APIの初心者なので、新しいアプリを作成します。

2.App Setup*セクションでは、新しいアプリの設定を定義します:

* アプリの**環境**、ブログ投稿の目的では*Development*を選択します。
* アプリに覚えやすい名前を付けることで、そのアプリが何のために使われているのかを追跡することができます。twitter-kafka-connect-<SUFFIX>`と呼ぶことができます。ここで、`<SUFFIX>`は一意な識別子である必要があります(すべてのアプリ名は一意である必要があります)。
* アプリの**キーとトークン**は、必要なキーを生成して取り出すことができる。このセクションから、後にApache Kafkaコネクタのセットアップで使用する**API Key**と**API Key Secret**をコピーする必要がある(これらを`TWITTER_API_KEY`と`TWITTER_API_SECRET`として参照する)。すべてのセットアップが完了したら、**App Settings**を選択すると、アプリとプロジェクトが正常にビルドされたことを伝える以下のような画面が表示されるはずだ。

Twitter App and Project build
3.次に、Keys and tokens タブに移動して、コネクタの動作に必要な追加シークレットを生成(または再生成)します。
4.アクセス・トークンとシークレットを生成します。

* アクセストークン: `TWITTER_ACCESS_TOKEN`
* アクセストークン・シークレット: `TWITTER_ACCESS_TOKEN_SECRET`

5.Twitter開発者ポータルでの最後の設定は、私たちのプロジェクトのためにElevatedアクセスをリクエストすることです。Twitter API v2がリリースされたため、デフォルトの(Essential)アクセスではv2としかやり取りできません。コーディング・スキル・レベルやプロジェクトの説明を含むいくつかの情報を入力し、Developer Agreementに同意する必要がある。リクエストはTwitterによって公式にチェックされ、すべてがうまくいけば、数時間かかることもあるが、すぐに昇格アカウントを手に入れることができるだろう。

ソースコネクタ設定ファイルの作成

上記で取得したツイッターの秘密を、twitter-source.jsonという名前の設定ファイルに以下の内容で記述する。

`{
 "name": "twitter_connector"、
 "config":
 {
 "tasks.max": "1"、
 "connector.class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector"、
 "process.delees": "false"、
 "filter.keywords": "データベース"、
 "kafka.status.topic": "twitter-topic"、
 "twitter.auth.consumerKey": "TWITTER_API_KEY"、
 "twitter.oauth.consumerSecret": "TWITTER_API_SECRET"、
 "twitter.oauth.accessToken": "TWITTER_ACCESS_TOKEN"、
 "twitter.oauth.accessTokenSecret": "TWITTER_ACCESS_TOKEN_SECRET"
 }
クリップボードにコピーする

以下はカスタマイズ可能なパラメータである:

  • "process.deletes": "false": 簡単のため、ツイートの削除を処理しない。
  • "filter.keywords": "database": databaseキーワードを含むツイートをフィルタリングする。このキーワードは、Apache Kafkaにデータが表示されるように、ツイートの中で発生する、読んだ瞬間に話題となるものに置き換えることをお勧めします。
  • "kafka.status.topic": "twitter-topic": 対象のトピックは twitter-topic という名前になる。
  • twitter.oauth パラメータの値は、前のステップで取得した twitter の秘密情報で変更する必要がある。

Twitter ソースコネクタを起動する

Apache Kafka REST APIを使ってコネクタを起動します:

``curl -s -H "Content-Type: application/json" -X POST ˶ -d @twitter-source.json ˶.
 -d @twitter-source.json \
 http://localhost:8083/connectors/`クリップボードにコピー

Apache Kafkaで出力を確認する

上記の curl コマンドは、twitter-source.json コネクタ設定ファイルを渡す connectors REST エンドポイントを使用する。これで、twitter-topic Apache Kafka トピックに apachekafka を含むツイートが流れてくるのが確認できるはずだ。KarapaceのREST APIを有効にしたので、Aiven Consoleにアクセスし、Topicタブの下にあるdemo-kafkaサービス名をクリックすることで、トピックデータを確認することができる。

Aivenコンソールに表示されるデータベースに関するツイート

これはストリーミング・ソリューションなので、定義したキーワードを含むツイートが書き込まれた場合のみ、トピックにデータが表示されることに注意してください。コネクターをテストしたい場合は、キーワードを含むツイートを自分で書いてみてください!

ビッグニュース:現在、Aiven for Apache KafkaでAivenがサポートしていないApache Kafka® Connectorプラグインを使用しています!Apache Kafka Connectクラスタを管理する負担はありますが、私たちのソース/ターゲット技術が利用可能なコネクタのどれにも対応していない場合に最適なオプションかもしれません。

クイック・ヒント: コネクタの設定を変更する必要がある場合は、まずコネクタを

curl -s -X DELETE http://localhost:8083/connectors/twitter_connector`Copy to clipboard

そして、更新された設定ファイルを curl コマンドで PUSH オプションを指定して送信します。

マネージド Apache Kafka サービスとコネクタプラグイン選択の自由度

Apache Kafkaクラスタを完全に管理するのは面倒な作業であり、AivenのようなマネージドサービスをApache Kafkaのために使用することは、通常、賢明なアイデアです。特殊なユースケースの場合、探している構成やコネクタがAivenでサポートされていないことに気づくかもしれません。Aiven for Apache Kafkaを使用すれば、特定の統合問題を解決するローカルのApache Kafka Connectクラスタを簡単に統合でき、全体的なApache Kafkaマネージドソリューションと豊富なオープンソースコネクタの両方のメリットを享受できます。

詳しくは以下のリソースをご覧ください:

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?