はじめに
Oracle Blogsで紹介されていた『Using Kafka Connect With Oracle Streaming Service And Autonomous DB』を試してみたのですが、ちょっと試行錯誤したので内容をまとめておきます。内容はAutonomous DBのTESTテーブルにデータがinsertされると自動的にそのデータがStreamingへメッセージ通知されるというものです。
認証トークンの取得
こちらを参考にOCIコンソールから認証トークンを取得しておきます。(略)
VMインスタンスの作成
この記事では既に作成されているKafka connectのDockerイメージを使用するので、DockerがインストールされているVMインスタンスが必要です。今回は面倒くさいのでOracle Developer Imageを使いましたが、適当にインスタンスをプロビジョニングしておきます。(略)
Autonomous DBの作成
次にAutonomous DBをプロビジョニングします。
手順はこちらの「1. OCIコンソールからATPをプロビジョニングする」を参考にします。
Autonomous DBのセットアップ
次にAutonomous DBとの統合をテストするために使用するスキーマとテーブルを作成し、ウォレットの資格情報を取得します。
サービス・コンソールから「開発」→「SQL Developer」を開きます。
認証画面が出たら、ログインします。
ユーザー名、パスワードはAutonomous DBを作成した際のものです。(こちらを参考にした場合はadmin,TFWorkshop__2000)
ログインできたら以下を実行し、Kafka connectからアクセスするユーザーを作成します。
CREATE USER connectdemo IDENTIFIED BY "Str0ngP@$$word1234";
GRANT CONNECT, RESOURCE TO connectdemo;
GRANT UNLIMITED TABLESPACE TO connectdemo;
次に以下のSQLを実行し、テスト用のテーブルを作成します。
CREATE TABLE TEST
(
ID NUMBER(10,0) GENERATED BY DEFAULT ON NULL AS IDENTITY,
USERNAME VARCHAR2(50) NOT NULL,
FIRST_NAME VARCHAR2(50) NOT NULL,
MIDDLE_NAME VARCHAR2(50),
LAST_NAME VARCHAR2(50) NOT NULL,
AGE NUMBER(5,0) DEFAULT 0 NOT NULL,
CREATED_ON TIMESTAMP(9) NOT NULL,
CONSTRAINT TEST_PK PRIMARY KEY
(
ID
)
ENABLE
);
次にウォレットを取得します。
ウォレットはコンソールからダウンロードできます。
「DB接続」→「ウォレットのダウンロード」
後ほど使用するので、どこかに保存しておきます。
Streaming Poolの作成
次にStreaming Poolを作成します。
OCIのコンソールからハンバーガーメニューからアナリティクス-ストリーミングを選択し、Streamingの画面へ遷移します。
ストリーム・プール名に「demo-stream-pool」と入力し、拡張オプションの表示押下で中にある「トピックの自動作成」にチェックを入れて「作成」ボタンを押します。
Kafka接続構成の作成
次にKafka接続構成を作成します。
Streamingの画面から「Kafka接続構成」を開きます。
Kafka接続構成名に「demo-connect-config」を入力し、Kafka接続構成の作成ボタンを押します。
Kafka接続構成の詳細画面へ遷移し、構成情報をコピーしておきます。
必要ファイル(jdbcドライバ等)の配置
ここまで出来たらKafka Connectの設定をします。
作成していたVMインスタンスへTerminalからアクセスします。
今回はTeratermを使用しています。
アクセスできたら、以下のコマンドを実行し、プロジェクト用のディレクトリを作成します。
sudo mkdir -p /projects/connect-demo/drivers
sudo mkdir -p /projects/connect-demo/kafka-jdbc/connector
sudo mkdir -p /projects/connect-demo/wallet
sudo chmod -R 777 /projects/
次にドライバ、ウォレットを配置します。
Oracle JDBC Driver
Oracle JDBC Driversからダウンロードして、/projects/connect-demo/drivers
に配置します。
※今回はojdbc10-full.tar.gz
Kafka JDBC Connector
Kafka JDBC Connectorからダウンロードして、/projects/connect-demo/kafka-jdbc/connector
に配置し、解凍しておきます。
※今回はconfluentinc-kafka-connect-jdbc-10.0.1.zip
ATP Wallet
ATPのコンソールからダウンロードしておいたウォレットを/projects/connect-demo/wallet
に配置し、解凍しておきます。
※今回はWallet_tfOKEATPDB.zip
必要ファイルの配置が完了すると以下のような構成になります。
projects
└ connect-demo
├ drivers
│ └ ojdbc10-full.tar.gz
├ kafka-jdbc
│ └ connector
│ ├ confluentinc-kafka-connect-jdbc-10.0.1.zip
│ └ confluentinc-kafka-connect-jdbc-10.0.1
│ ├ assets
│ │ ├ confluent.png
│ │ └ jdbc.jpg
│ ├ doc
│ │ ├ LICENSE
│ │ ├ LICENSE-xmlparserv2-19.7.0.0.txt
│ │ ├ licenses.html
│ │ ├ NOTICE
│ │ ├ README.md
│ │ ├ version.txt
│ │ └ licenses
│ │ ├ LICENSE-kafka-connect-jdbc-10.0.0-SNAPSHOT.txt
│ │ ├ LICENSE-mssql-jdbc-8.4.1.jre8.txt
│ │ ├ LICENSE-ojdbc8-19.7.0.0.txt
│ │ ├ LICENSE-ons-19.7.0.0.txt
│ │ ├ LICENSE-oraclepki-19.7.0.0.txt
│ │ ├ LICENSE-orai18n-19.7.0.0.txt
│ │ ├ LICENSE-osdt_cert-19.7.0.0.txt
│ │ ├ LICENSE-osdt_core-19.7.0.0.txt
│ │ ├ LICENSE-postgresql-42.2.10.txt
│ │ ├ LICENSE-simplefan-19.7.0.0.txt
│ │ ├ LICENSE-sqlite-jdbc-3.25.2.txt
│ │ ├ LICENSE-ucp-19.7.0.0.txt
│ │ └ LICENSE-xdb-19.7.0.0.txt
│ ├ etc
│ │ ├ sink-quickstart-sqlite.properties
│ │ └ source-quickstart-sqlite.properties
│ └ lib
│ ├ common-utils-6.0.0.jar
│ ├ jtds-1.3.1.jar
│ ├ kafka-connect-jdbc-10.0.1.jar
│ ├ mssql-jdbc-8.4.1.jre8.jar
│ ├ ojdbc8-19.7.0.0.jar
│ ├ ojdbc8-production-19.7.0.0.pom
│ ├ ons-19.7.0.0.jar
│ ├ oraclepki-19.7.0.0.jar
│ ├ orai18n-19.7.0.0.jar
│ ├ osdt_cert-19.7.0.0.jar
│ ├ osdt_core-19.7.0.0.jar
│ ├ postgresql-42.2.10.jar
│ ├ simplefan-19.7.0.0.jar
│ ├ slf4j-api-1.7.30.jar
│ ├ sqlite-jdbc-3.25.2.jar
│ ├ ucp-19.7.0.0.jar
│ ├ xdb-19.7.0.0.jar
│ ├ xmlparserv2-19.7.0.0.jar
│ └ manifest.json
└ wallet
├ cwallet.sso
├ ewallet.p12
├ keystore.jks
├ ojdbc.properties
├ README
├ sqlnet.ora
├ tnsnames.ora
├ truststore.jks
└ Wallet_tfOKEATPDB.zip
Kafka Connectの起動
ここまで来たら設定ファイルを作成し、Kafka Connectを起動します。
まずは以下のファイルを作成します。
<括弧>の部分はご自身の実際の値で置き換えてください。
group.id=connect-demo-group
bootstrap.servers=<streamPoolBootstrapServer>
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";
config.storage.replication.factor=1
config.storage.partitions=1
config.storage.topic=<connectConfigOCID>-config
status.storage.replication.factor=1
status.storage.partitions=1
status.storage.topic=<connectConfigOCID>-status
offset.storage.replication.factor=1
offset.storage.partitions=1
offset.storage.topic=<connectConfigOCID>-offset
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
task.shutdown.graceful.timeout.ms=10000
括弧内の項目 | コピー元 |
---|---|
streamPoolBootstrapServer | ここでコピーした中にある |
streamPoolId | ここからコピー |
username | テナンシ名(ここ) |
tenancy | ユーザ名(ここ) |
authToken | ここで作成したもの |
connectConfigOCID | ここでコピーしたもの |
次にDockerfileを作成します。
FROM debezium/connect:0.10
USER root
RUN mkdir /wallet
USER kafka
COPY drivers/* /kafka/libs/
COPY kafka-jdbc/connector/confluentinc-kafka-connect-jdbc-10.0.1/lib/* /kafka/libs/
COPY wallet/* /wallet/
次にdockerイメージを起動するshellを作成します。
こちらも<括弧>の部分はご自身の実際の値で置き換えてください。
#!/usr/bin/env bash
CONNECT_CONFIG_ID=<connectConfigOCID>
CONFIG_STORAGE_TOPIC=$CONNECT_CONFIG_ID-config
OFFSET_STORAGE_TOPIC=$CONNECT_CONFIG_ID-offset
STATUS_STORAGE_TOPIC=$CONNECT_CONFIG_ID-status
docker build -t connect .
docker run -it --rm --name connect -p 8083:8083 \
-e GROUP_ID=connect-demo-group \
-e BOOTSTRAP_SERVERS=<streamPoolBootstrapServer> \
-e CONFIG_STORAGE_TOPIC=$CONFIG_STORAGE_TOPIC \
-e OFFSET_STORAGE_TOPIC=$OFFSET_STORAGE_TOPIC \
-e STATUS_STORAGE_TOPIC=$STATUS_STORAGE_TOPIC \
-v `pwd -P`/connect-distributed.properties:/kafka/config.orig/connect-distributed.properties \
connect
作成出来たら、実行します。
cd /projects/connect-demo
chmod +x connect-demo.sh
./connect-demo.sh
エラーがなければ、以下のようにコンソールに表示されます。
[org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2020-12-30 01:17:29,915 INFO || Attempting to open connection #1 to Oracle [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-12-30 01:17:32,323 INFO || [Worker clientId=connect-1, groupId=connect-demo-group] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-12-30 01:17:32,427 INFO || Found offset {{protocol=1, table=ADMIN.TEST}={incrementing=1}, {table=TEST}=null} for partition {protocol=1, table=ADMIN.TEST} [io.confluent.connect.jdbc.source.JdbcSourceTask]
2020-12-30 01:17:32,432 INFO || Started JDBC source task [io.confluent.connect.jdbc.source.JdbcSourceTask]
2020-12-30 01:17:32,433 INFO || WorkerSourceTask{id=oss-atp-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
2020-12-30 01:17:32,440 INFO || Begin using SQL query: SELECT * FROM "ADMIN"."TEST" WHERE "ADMIN"."TEST"."ID" > ? ORDER BY "ADMIN"."TEST"."ID" ASC [io.confluent.connect.jdbc.source.TableQuerier]
これでKafka connectのインスタンスが起動し、REST APIでConnectorを作成できるようになりました。
REST APIでConnectorを作成するためにjsonで設定ファイルを作成します。
以下のファイルを作成します。
の値はご自身の値に書き換えてください。(こちらを参考にした場合はtfokeatpdb)
{
"name": "oss-atp-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "100",
"connection.url": "jdbc:oracle:thin:@<dbname>_high?TNS_ADMIN=/wallet",
"connection.user": "coannectdemo",
"connection.password": "Str0ngP@$$word1234",
"mode": "incrementing",
"incrementing.column.name": "ID",
"topic.prefix": "demo-stream-",
"table.whitelist": "TEST",
"numeric.mapping": "best_fit"
}
}
REST APIでこの設定ファイルを POST してSource Connectorを作成します。
curl -iX POST -H "Accept:application/json" -H "Content-Type:application/json" -d @connector-config.json http://localhost:8083/connectors
以下のコマンドで作成されているか確認します。
oss-atp-connectorというコネクタが作成されていることが確認できます。
```
curl -i http://localhost:8083/connectors
HTTP/1.1 200 OK
Date: Wed, 30 Dec 2020 01:20:45 GMT
Content-Type: application/json
Content-Length: 21
Server: Jetty(9.4.18.v20190429)
["oss-atp-connector"]
```
ちなみに、削除する場合は以下のコマンドで削除できます。
curl -i -X DELETE http://localhost:8083/connectors/[connector-name]
テスト
これで準備は整いました。
では、テストしてみます。
SQLDeveloperへログインし、TESTテーブルへデータをinsertしてみます。
以下のSQLを実行します。
INSERT INTO TEST (username, first_name, middle_name, last_name, age, created_on) VALUES ('taro', 'taro', null, 'tanaka', 10, sysdate);
COMMIT;
データが登録出来たら、Streamingコンソールを確認します。
自動的にストリームが作成されていることが確認できます。
ストリームの詳細を見てみるとメッセージが作成されていることが確認できます。
参考URL
Using Kafka Connect With Oracle Streaming Service And Autonomous DB
https://blogs.oracle.com/developers/using-kafka-connect-with-oracle-streaming-service-and-autonomous-db
以上。