はじめに
本記事は、Oracle Cloud Infrastructure Advent Calendar 2022 のDay 20の記事として書かれています。
Oracle Transactional Event Queues(TEQ)は、大規模なデータの収集、処理、保存、統合に使われるイベントストリーミング・プラットフォームです。
データベース内にあるため、分散トランザクションを必要とせずに、データベース・トランザクションにエンキュー・デキューを組み込むことができます。メッセージは標準的なSQLで問合せ可能ですし、Oracle Databaseの高可用性、拡張性、信頼性はキューデータにも適用されます。
今回は、Spring Bootで書かれた2つのマイクロサービス間のイベント駆動型通信を確立するために、Apache Kafka brokerをデプロイします。さらに、Kafkaライブラリの代わりに、Oracle TEQへのKafka Javaクライアントの互換性を持つOkafkaを使い、Kafka brokerの代わりにTEQをデータベースで使用します。そして最後に、KafkaとTEQ broker間を接続し、Kafka側でメッセージを生成し、TEQ側で消費されるのを確認します。
なお、本記事ではOracle Cloud Infrastructure(OCI)のCloud Shell上にアプリケーションをデプロイし、データベースはAutonomous Database(ADB)のAlways Freeを作成し使用します。ですので全ての手順をOCIのFree Tierアカウントで行うことができます。
アーキテクチャの全体像はこのようになります。
- Kafka brokerと、Kafkaトピック上でメッセージを生成および消費するマイクロサービス
- Oracle TEQ brokerと、TEQキューでメッセージを生成および消費するマイクロサービス
- KafkaとOracle TEQ間のコネクターを利用したKafkaトピックからTEQキューへのメッセージフロー
1. 環境のセットアップ
まずは、使用する環境のセットアップを行います。
-
作業用のディレクトリを作成し、移動します。
mkdir txeventqlab cd txeventqlab
-
GitHubのリポジトリをクローンします。
git clone -b 22.10.1 --single-branch https://github.com/oracle/microservices-datadriven.git
-
.bashrcのファイルを編集し、LAB_HOMEを定義します。
source ./microservices-datadriven/workshops/txeventq-kafka/cloud-setup/env.sh source ${HOME}/.bashrc
-
セットアップ用のスクリプトを実行します。
source $LAB_HOME/cloud-setup/setup.sh
セットアップでは、ユーザーOCIDを入力します。
このユーザーOCIDからテナント内に、今回使用するコンパートメントやAlways FreeのAutonomous Database(ADB)などの各リソースをOCI CLIから作成するコードが含まれています。 -
Always FreeのADBインスタンスを1つ使用するため、確認します。
Do you want to use Always Free Autonomous Database Instance? (y/N)
-
ADBのパスワードを決め、セットアップ完了です。txeventqlabというコンパートメント内に、Always FreeのADBと、DB接続用のウォレットを格納したオブジェクトストレージのバケットが作成されているはずです。
2. Kafkaを使ったマイクロサービス(Spring Boot)の構築
まずは、Kafkaを使い、通信するマイクロサービスを構築してみます。
1. Kafka brokerの実行
-
以下のコマンドでKafkaの状態を確認します。
※Dockerコンテナ環境で動いているので、実際はdocker-compose ps
を実行しています。kafka-status
このような結果が表示されるはずです。
NAME COMMAND SERVICE STATUS PORTS broker "/etc/confluent/dock…" broker created connect "/etc/confluent/dock…" connect created schema-registry "/etc/confluent/dock…" schema-registry created zookeeper "/etc/confluent/dock…" zookeeper created
-
以下のコマンドでKafkaを起動します。
kafka-start
-
しばらく待って再度状態を確認すると、このようにConnectサービスがhealthyになるはずです。
kafka-status
2. トピックの作成
Kafkaの準備ができたので、次にProducerから送り、Consumerで受け取るトピックを作成します。
以下のコマンドでTXEVENTQTOPIC1
というトピックを作成します。
kafka-add-topic TXEVENTQTOPIC1
※実際は以下のコマンドを実行しています。
docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic "TXEVENTQTOPIC1"
3. ProducerとConsumerマイクロサービスのビルド
今回はSpring Boot+Spring Kafkaを使ってコード化したProducerとConsumerをMavenでビルドしています。
cd $LAB_HOME/springboot-kafka
./kafka-ms-build
4. Producerからイベントを生成
アプリケーションがビルドできたのでデプロイします。
-
以下のコマンドを実行してイメージをビルドし、Docker Engine(Kafkaクラスタが動作しているのと同じ)内にProducerをデプロイします。
docker build
cd $LAB_HOME/springboot-kafka ./kafka-ms-deploy-producer
-
イメージのビルドができたら、続いてコンテナを起動して、Producerのマイクロサービスを実行します。
docker run
cd $LAB_HOME/springboot-kafka ./kafka-ms-launch-producer
-
実行したら試しに一度テストしてみます。Producerにcurlで直接HTTPリクエストをし、イベントを送信してみます。
curl -s -X POST -H "Content-Type: application/json" \ -d '{ "id": "id1", "message": "message1" }' \ http://localhost:8080/placeMessage | jq
以下のように返ってくれば成功です。
{ "id": "0", "statusMessage": "Successful" }
5. Consumerでイベントを消費
同様の操作をConsumerでも行います。
-
以下のコマンドを実行してイメージをビルドし、Docker Engine(Kafkaクラスタが動作しているのと同じ)内にConsumerをデプロイします。
docker build
cd $LAB_HOME/springboot-kafka ./kafka-ms-deploy-consumer
-
コンテナを起動して、Consumerのマイクロサービスを実行します。
docker run
cd $LAB_HOME/springboot-kafka ./kafka-ms-launch-consumer
-
テストしてみます。Consumerマイクロサービスは、起動後、Kafka brokerからメッセージをデキューします。デキューに成功すると、先ほどProducerによって送信されたイベントをログで見ることができます。以下のコマンドでコンテナログから後半の6行をリストアップします。
container-logs kafka-consumer 6
以下のように
{"id": "0", "message": "message1"}.
と表示されていたら成功です。
Kafkaによって、メッセージが受け取れることがわかりました。
3. Transactional Event Queuesを使ったマイクロサービスの構築
今度は、Oracle Transactional Event Queues(TEQ)を使い、非同期に通信するマイクロサービスを構築してみます。
1. トピックの作成
以下のコマンドで、セットアップで作成したADBインスタンスに、TXEVENTQTOPIC1
というトピックを作成します。
txeventq-add-topic TXEVENTQTOPIC1
ここでは、実際は以下のようなSQLを実行しています。
declare
txeventq_topic varchar2(30) := '&1' ;
txeventq_subscriber varchar2(30) := '&2' ;
subscriber sys.aq$_agent;
begin
if txeventq_topic is not null and txeventq_subscriber is not null
then
-- Creating a JMS type sharded queue:
dbms_aqadm.create_sharded_queue(
queue_name => txeventq_topic,
multiple_consumers => TRUE);
dbms_aqadm.start_queue(txeventq_topic);
--- Create the subscriber agent
subscriber := sys.aq$_agent(txeventq_subscriber, NULL, NULL);
dbms_aqadm.add_subscriber(
queue_name => txeventq_topic,
subscriber => subscriber);
else
DBMS_OUTPUT.put_line('ERR : at least one of the variables is null !');
end if;
end;
/
commit;
2. ProducerとConsumerマイクロサービスのビルド
先ほどと同様にProducerとConsumerをビルドします。一点、こちらではspring-kafkaではなく、TEQ用のKafkaJavaクライアントであるKafka Java Client for Oracle Transactional Event Queues Kafka (OKafka)を使用します。
cd $LAB_HOME/springboot-txeventq
./txeventq-ms-build
3. Producerからイベントを生成
-
以下のコマンドを実行してイメージをビルドし、Docker Engine内にProducerをデプロイします。
docker build
cd $LAB_HOME/springboot-txeventq ./txeventq-ms-deploy-producer
-
コンテナを起動して、Producerのマイクロサービスを実行します。
docker run
cd $LAB_HOME/springboot-txeventq ./txeventq-ms-launch-producer
-
一度テストしてみます。Producerにcurlで直接HTTPリクエストをし、イベントを送信してみます。
curl -s -X POST -H "Content-Type: application/json" \ -d '{ "id": "id1", "message": "TxEventQ message1" }' \ http://localhost:8090/placeMessage | jq
以下のように返ってくれば成功です。
{ "id": "0", "statusMessage": "Successful" }
4. Consumerでイベントを消費
Consumerも同様にデプロイし、実行します。
-
以下のコマンドを実行してイメージをビルドし、Docker Engine内にConsumerをデプロイします。
docker build
cd $LAB_HOME/springboot-txeventq ./txeventq-ms-deploy-consumer
-
コンテナを起動して、Consumerのマイクロサービスを実行します。
docker run
cd $LAB_HOME/springboot-txeventq ./txeventq-ms-launch-consumer
-
テストしてみます。Consumerマイクロサービスは、起動後、TEQ brokerからメッセージをデキューします。デキューに成功すると、先ほどProducerによって送信されたイベントをログで見ることができます。以下のコマンドでコンテナログから後半の6行をリストアップします。
container-logs txeventq-consumer 6
{"id": "0", "message": "TxEventQ message1"}.
と表示されていたら成功です。
このように、Kafka brokerと同様に、TEQ brokerでもイベントの生成と消費を行うことができることがわかります。
4. KafkaとTEQの接続
ここでは、KafkaとTEQの互換性を確認してみます。TEQはKafkaとの双方向の通信が可能なので、変更をほぼリアルタイムで反映させることができます。
1. Kafka Connectのセットアップ
今回は、Apache Kafkaに含まれる、Kafkaと他のシステムを統合するフレームワークであるApache Kafka Connectを使用します。
-
ADBのパスワードを指定して、以下のコマンドを実行します。DBユーザーやホスト名などのパラメータが入力され、KafkaトピックとTEQの間にConnect Sinkが設定されます。
cd $LAB_HOME/kafka-connect-txeventq ./setup-kafka2txeventq-connect.sh
-
Connect Sinkの状態を以下のコマンドで確認します。
kafka-connect-status
正しく接続できていれば、以下のような結果になります。
{ "name": "JmsConnectSink_txeventqlab", "connector": { "state": "RUNNING", "worker_id": "connect:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "connect:8083" } ], "type": "sink" }
2. Kafka brokerからメッセージをエンキュー
Connectorを実行したので、メッセージを生成して転送をテストします。Kafka Producerからエンキューし、TEQからデキューしてみます。
Kafka Producerが動いている状態で、curlでProducer APIにメッセージを送信します。
curl -s -X POST -H "Content-Type: application/json" \
-d '{ "id": "Sink1", "message": "Sink Message from Kafka to TxEventQ #1" }' \
http://localhost:8080/placeMessage | jq
以下のように返ってくれば、送信は成功です。
{
"id": "1",
"statusMessage": "Successful"
}
3. PL/SQLを使ってTEQからメッセージをデキュー
Producerからメッセージをエンキューすると、その後Connect SinkエージェントがKafkaトピックからメッセージを消費し、TEQにエンキューします。そして、OKafka ConsumerマイクロサービスやPL/SQLプロシージャを使って、TEQからメッセージをデキューすることができます。
以下のコマンドでTEQからデキューします。
txeventq-dequeue
実際は以下のようなPL/SQLを実行しています。
declare
txeventq_topic varchar2(30) := '&1' ;
txeventq_subscriber varchar2(30) := '&2' ;
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
message_id RAW(2000);
my_message SYS.AQ$_JMS_TEXT_MESSAGE;
msg_text varchar2(32767);
begin
DBMS_OUTPUT.ENABLE (20000);
if txeventq_topic is not null and txeventq_subscriber is not null
then
-- Dequeue Options
dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.wait := 1;
dequeue_options.consumer_name := txeventq_subscriber;
DBMS_AQ.DEQUEUE(
queue_name => txeventq_topic,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => my_message,
msgid => message_id);
commit;
my_message.get_text(msg_text);
DBMS_OUTPUT.put_line('TxEventQ message: ' || msg_text);
else
DBMS_OUTPUT.put_line('ERR : at least one of the variables is null !');
end if;
end;
/
以下のような結果が返ってくるはずです。
TxEventQ message: {"id": "1", "message": "Sink Message from Kafka to TxEventQ #1"}
PL/SQL procedure successfully completed.
まとめ
本記事では、Apache KafkaとOracle Transactional Event Queues(TEQ)という2つのメッセージ・ブローカーを使用したメッセージ・キューイングをご紹介しました。
TEQはコンバージド・データベースであるOracle Databaseの一機能でかつ、Kafkaとも互換性があるので、エンタープライズクラスのデータ中心のマイクロサービス・アーキテクチャを構築することができます。
本記事でご紹介したものは、全てOCIの無料のリソースで完結しているので、お時間があればぜひ試してみてください。