LoginSignup
2
0

More than 3 years have passed since last update.

Oracle Streaming ServiceとAutonomous DBのKafka Connectによる動的連携

Last updated at Posted at 2021-01-12

はじめに

Oracle Blogsで紹介されていた『Using Kafka Connect With Oracle Streaming Service And Autonomous DB』を試してみたのですが、ちょっと試行錯誤したので内容をまとめておきます。内容はAutonomous DBのTESTテーブルにデータがinsertされると自動的にそのデータがStreamingへメッセージ通知されるというものです。

認証トークンの取得

こちらを参考にOCIコンソールから認証トークンを取得しておきます。(略)

VMインスタンスの作成

この記事では既に作成されているKafka connectのDockerイメージを使用するので、DockerがインストールされているVMインスタンスが必要です。今回は面倒くさいのでOracle Developer Imageを使いましたが、適当にインスタンスをプロビジョニングしておきます。(略)

Autonomous DBの作成

次にAutonomous DBをプロビジョニングします。
手順はこちらの「1. OCIコンソールからATPをプロビジョニングする」を参考にします。

Autonomous DBのセットアップ

次にAutonomous DBとの統合をテストするために使用するスキーマとテーブルを作成し、ウォレットの資格情報を取得します。

ATPの詳細画面から「サービス・コンソール」を開きます。
image.png

サービス・コンソールから「開発」→「SQL Developer」を開きます。
image.png

image.png

認証画面が出たら、ログインします。
ユーザー名、パスワードはAutonomous DBを作成した際のものです。(こちらを参考にした場合はadmin,TFWorkshop__2000)
image.png

ログインできたら以下を実行し、Kafka connectからアクセスするユーザーを作成します。

CREATE USER connectdemo IDENTIFIED BY "Str0ngP@$$word1234";
GRANT CONNECT, RESOURCE TO connectdemo;
GRANT UNLIMITED TABLESPACE TO connectdemo;

次に以下のSQLを実行し、テスト用のテーブルを作成します。

CREATE TABLE TEST 
(
  ID NUMBER(10,0) GENERATED BY DEFAULT ON NULL AS IDENTITY,
  USERNAME VARCHAR2(50) NOT NULL,
  FIRST_NAME VARCHAR2(50) NOT NULL,
  MIDDLE_NAME VARCHAR2(50),
  LAST_NAME VARCHAR2(50) NOT NULL,
  AGE NUMBER(5,0) DEFAULT 0 NOT NULL,
  CREATED_ON TIMESTAMP(9) NOT NULL,
  CONSTRAINT TEST_PK PRIMARY KEY 
  (
    ID 
  )
  ENABLE 
);

次にウォレットを取得します。
ウォレットはコンソールからダウンロードできます。
「DB接続」→「ウォレットのダウンロード」
image.png

image.png

後ほど使用するので、どこかに保存しておきます。

Streaming Poolの作成

次にStreaming Poolを作成します。
OCIのコンソールからハンバーガーメニューからアナリティクス-ストリーミングを選択し、Streamingの画面へ遷移します。

image.png

ストリームプールを選択し、
image.png

ストリーム・プールの作成を押します。
image.png

ストリーム・プール名に「demo-stream-pool」と入力し、拡張オプションの表示押下で中にある「トピックの自動作成」にチェックを入れて「作成」ボタンを押します。

image.png

作成できました。
image.png

次にKafka接続設定を開きます。
image.png

この情報も後ほど使うので、どこかにコピーしておきます。
image.png

Kafka接続構成の作成

次にKafka接続構成を作成します。
Streamingの画面から「Kafka接続構成」を開きます。
image.png

「Kafka接続構成の作成」を押します。
image.png

Kafka接続構成名に「demo-connect-config」を入力し、Kafka接続構成の作成ボタンを押します。
image.png

Kafka接続構成の詳細画面へ遷移し、構成情報をコピーしておきます。
image.png

必要ファイル(jdbcドライバ等)の配置

ここまで出来たらKafka Connectの設定をします。
作成していたVMインスタンスへTerminalからアクセスします。
今回はTeratermを使用しています。
アクセスできたら、以下のコマンドを実行し、プロジェクト用のディレクトリを作成します。

sudo mkdir -p /projects/connect-demo/drivers
sudo mkdir -p /projects/connect-demo/kafka-jdbc/connector
sudo mkdir -p /projects/connect-demo/wallet
sudo chmod -R 777 /projects/

次にドライバ、ウォレットを配置します。

Oracle JDBC Driver

Oracle JDBC Driversからダウンロードして、/projects/connect-demo/driversに配置します。
※今回はojdbc10-full.tar.gz

Kafka JDBC Connector

Kafka JDBC Connectorからダウンロードして、/projects/connect-demo/kafka-jdbc/connectorに配置し、解凍しておきます。
※今回はconfluentinc-kafka-connect-jdbc-10.0.1.zip

ATP Wallet

ATPのコンソールからダウンロードしておいたウォレットを/projects/connect-demo/walletに配置し、解凍しておきます。
※今回はWallet_tfOKEATPDB.zip

必要ファイルの配置が完了すると以下のような構成になります。

projects
 └ connect-demo
     ├ drivers
     │ └ ojdbc10-full.tar.gz
     ├ kafka-jdbc
     │ └ connector
     │    ├ confluentinc-kafka-connect-jdbc-10.0.1.zip
     │    └ confluentinc-kafka-connect-jdbc-10.0.1
     │       ├ assets
     │       │ ├ confluent.png
     │       │ └ jdbc.jpg
     │       ├ doc
     │       │ ├ LICENSE
     │       │ ├ LICENSE-xmlparserv2-19.7.0.0.txt
     │       │ ├ licenses.html
     │       │ ├ NOTICE
     │       │ ├ README.md
     │       │ ├ version.txt
     │       │ └ licenses
     │       │    ├ LICENSE-kafka-connect-jdbc-10.0.0-SNAPSHOT.txt
     │       │    ├ LICENSE-mssql-jdbc-8.4.1.jre8.txt
     │       │    ├ LICENSE-ojdbc8-19.7.0.0.txt
     │       │    ├ LICENSE-ons-19.7.0.0.txt
     │       │    ├ LICENSE-oraclepki-19.7.0.0.txt
     │       │    ├ LICENSE-orai18n-19.7.0.0.txt
     │       │    ├ LICENSE-osdt_cert-19.7.0.0.txt
     │       │    ├ LICENSE-osdt_core-19.7.0.0.txt
     │       │    ├ LICENSE-postgresql-42.2.10.txt
     │       │    ├ LICENSE-simplefan-19.7.0.0.txt
     │       │    ├ LICENSE-sqlite-jdbc-3.25.2.txt
     │       │    ├ LICENSE-ucp-19.7.0.0.txt
     │       │    └ LICENSE-xdb-19.7.0.0.txt
     │       ├ etc
     │       │ ├ sink-quickstart-sqlite.properties
     │       │ └ source-quickstart-sqlite.properties
     │       └ lib
     │          ├ common-utils-6.0.0.jar
     │          ├ jtds-1.3.1.jar
     │          ├ kafka-connect-jdbc-10.0.1.jar
     │          ├ mssql-jdbc-8.4.1.jre8.jar
     │          ├ ojdbc8-19.7.0.0.jar
     │          ├ ojdbc8-production-19.7.0.0.pom
     │          ├ ons-19.7.0.0.jar
     │          ├ oraclepki-19.7.0.0.jar
     │          ├ orai18n-19.7.0.0.jar
     │          ├ osdt_cert-19.7.0.0.jar
     │          ├ osdt_core-19.7.0.0.jar
     │          ├ postgresql-42.2.10.jar
     │          ├ simplefan-19.7.0.0.jar
     │          ├ slf4j-api-1.7.30.jar
     │          ├ sqlite-jdbc-3.25.2.jar
     │          ├ ucp-19.7.0.0.jar
     │          ├ xdb-19.7.0.0.jar
     │          ├ xmlparserv2-19.7.0.0.jar
     │          └ manifest.json
     └ wallet
        ├ cwallet.sso
        ├ ewallet.p12
        ├ keystore.jks
        ├ ojdbc.properties
        ├ README
        ├ sqlnet.ora
        ├ tnsnames.ora
        ├ truststore.jks
        └ Wallet_tfOKEATPDB.zip

Kafka Connectの起動

ここまで来たら設定ファイルを作成し、Kafka Connectを起動します。
まずは以下のファイルを作成します。
<括弧>の部分はご自身の実際の値で置き換えてください。

/projects/connect-demo/connect-distributed.properties
group.id=connect-demo-group

bootstrap.servers=<streamPoolBootstrapServer>
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";

producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";

consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";

config.storage.replication.factor=1
config.storage.partitions=1
config.storage.topic=<connectConfigOCID>-config

status.storage.replication.factor=1
status.storage.partitions=1
status.storage.topic=<connectConfigOCID>-status

offset.storage.replication.factor=1
offset.storage.partitions=1
offset.storage.topic=<connectConfigOCID>-offset
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

task.shutdown.graceful.timeout.ms=10000
括弧内の項目 コピー元
streamPoolBootstrapServer ここでコピーした中にある
streamPoolId ここからコピー
username テナンシ名(ここ)
tenancy ユーザ名(ここ)
authToken ここで作成したもの
connectConfigOCID ここでコピーしたもの

次にDockerfileを作成します。

/projects/connect-demo/Dockerfile
FROM debezium/connect:0.10
USER root
RUN mkdir /wallet
USER kafka
COPY drivers/* /kafka/libs/
COPY kafka-jdbc/connector/confluentinc-kafka-connect-jdbc-10.0.1/lib/* /kafka/libs/
COPY wallet/* /wallet/

次にdockerイメージを起動するshellを作成します。
こちらも<括弧>の部分はご自身の実際の値で置き換えてください。

/projects/connect-demo/connect-demo.sh
#!/usr/bin/env bash
CONNECT_CONFIG_ID=<connectConfigOCID>

CONFIG_STORAGE_TOPIC=$CONNECT_CONFIG_ID-config
OFFSET_STORAGE_TOPIC=$CONNECT_CONFIG_ID-offset
STATUS_STORAGE_TOPIC=$CONNECT_CONFIG_ID-status

docker build -t connect .

docker run -it --rm --name connect -p 8083:8083 \
-e GROUP_ID=connect-demo-group \
-e BOOTSTRAP_SERVERS=<streamPoolBootstrapServer> \
-e CONFIG_STORAGE_TOPIC=$CONFIG_STORAGE_TOPIC \
-e OFFSET_STORAGE_TOPIC=$OFFSET_STORAGE_TOPIC \
-e STATUS_STORAGE_TOPIC=$STATUS_STORAGE_TOPIC \
-v `pwd -P`/connect-distributed.properties:/kafka/config.orig/connect-distributed.properties \
connect

作成出来たら、実行します。

cd /projects/connect-demo
chmod +x connect-demo.sh
./connect-demo.sh

エラーがなければ、以下のようにコンソールに表示されます。

[org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2020-12-30 01:17:29,915 INFO || Attempting to open connection #1 to Oracle [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-12-30 01:17:32,323 INFO || [Worker clientId=connect-1, groupId=connect-demo-group] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-12-30 01:17:32,427 INFO || Found offset {{protocol=1, table=ADMIN.TEST}={incrementing=1}, {table=TEST}=null} for partition {protocol=1, table=ADMIN.TEST} [io.confluent.connect.jdbc.source.JdbcSourceTask]
2020-12-30 01:17:32,432 INFO || Started JDBC source task [io.confluent.connect.jdbc.source.JdbcSourceTask]
2020-12-30 01:17:32,433 INFO || WorkerSourceTask{id=oss-atp-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
2020-12-30 01:17:32,440 INFO || Begin using SQL query: SELECT * FROM "ADMIN"."TEST" WHERE "ADMIN"."TEST"."ID" > ? ORDER BY "ADMIN"."TEST"."ID" ASC [io.confluent.connect.jdbc.source.TableQuerier]

これでKafka connectのインスタンスが起動し、REST APIでConnectorを作成できるようになりました。
REST APIでConnectorを作成するためにjsonで設定ファイルを作成します。
以下のファイルを作成します。
の値はご自身の値に書き換えてください。(こちらを参考にした場合はtfokeatpdb)

/projects/connect-demo/connector-config.json
{
 "name": "oss-atp-connector",
 "config": {
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "tasks.max": "100",
  "connection.url": "jdbc:oracle:thin:@<dbname>_high?TNS_ADMIN=/wallet",
  "connection.user": "coannectdemo",
  "connection.password": "Str0ngP@$$word1234",
  "mode": "incrementing",
  "incrementing.column.name": "ID",
  "topic.prefix": "demo-stream-",
  "table.whitelist": "TEST",
  "numeric.mapping": "best_fit"
 }
}

REST APIでこの設定ファイルを POST してSource Connectorを作成します。

curl -iX POST -H "Accept:application/json" -H "Content-Type:application/json" -d @connector-config.json http://localhost:8083/connectors

以下のコマンドで作成されているか確認します。
oss-atp-connectorというコネクタが作成されていることが確認できます。
```
curl -i http://localhost:8083/connectors
HTTP/1.1 200 OK
Date: Wed, 30 Dec 2020 01:20:45 GMT
Content-Type: application/json
Content-Length: 21
Server: Jetty(9.4.18.v20190429)

["oss-atp-connector"]
```

ちなみに、削除する場合は以下のコマンドで削除できます。

curl -i -X DELETE http://localhost:8083/connectors/[connector-name]

テスト

これで準備は整いました。
では、テストしてみます。

SQLDeveloperへログインし、TESTテーブルへデータをinsertしてみます。
以下のSQLを実行します。

INSERT INTO TEST (username, first_name, middle_name, last_name, age, created_on) VALUES ('taro', 'taro', null, 'tanaka', 10, sysdate);
COMMIT;

データが登録出来たら、Streamingコンソールを確認します。
自動的にストリームが作成されていることが確認できます。
image.png

ストリームの詳細を見てみるとメッセージが作成されていることが確認できます。
image.png

参考URL

Using Kafka Connect With Oracle Streaming Service And Autonomous DB
https://blogs.oracle.com/developers/using-kafka-connect-with-oracle-streaming-service-and-autonomous-db

以上。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0