8
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Oracle Cloud Infrastructure Advent Calendar 2022

Day 20

Oracle DatabaseのTransactional Event Queues(TEQ)を使ってデータベースでメッセージ・キューイングを行う

Last updated at Posted at 2022-12-20

はじめに

本記事は、Oracle Cloud Infrastructure Advent Calendar 2022 のDay 20の記事として書かれています。
Oracle Transactional Event Queues(TEQ)は、大規模なデータの収集、処理、保存、統合に使われるイベントストリーミング・プラットフォームです。

データベース内にあるため、分散トランザクションを必要とせずに、データベース・トランザクションにエンキュー・デキューを組み込むことができます。メッセージは標準的なSQLで問合せ可能ですし、Oracle Databaseの高可用性、拡張性、信頼性はキューデータにも適用されます。

今回は、Spring Bootで書かれた2つのマイクロサービス間のイベント駆動型通信を確立するために、Apache Kafka brokerをデプロイします。さらに、Kafkaライブラリの代わりに、Oracle TEQへのKafka Javaクライアントの互換性を持つOkafkaを使い、Kafka brokerの代わりにTEQをデータベースで使用します。そして最後に、KafkaとTEQ broker間を接続し、Kafka側でメッセージを生成し、TEQ側で消費されるのを確認します。

なお、本記事ではOracle Cloud Infrastructure(OCI)のCloud Shell上にアプリケーションをデプロイし、データベースはAutonomous Database(ADB)のAlways Freeを作成し使用します。ですので全ての手順をOCIのFree Tierアカウントで行うことができます。

アーキテクチャの全体像はこのようになります。

  • Kafka brokerと、Kafkaトピック上でメッセージを生成および消費するマイクロサービス
  • Oracle TEQ brokerと、TEQキューでメッセージを生成および消費するマイクロサービス
  • KafkaとOracle TEQ間のコネクターを利用したKafkaトピックからTEQキューへのメッセージフロー
    eventmesh-txeventq-kafka.drawio.png

1. 環境のセットアップ

まずは、使用する環境のセットアップを行います。

  1. OCIコンソールのCloud Shellを開きます。
    image.png

  2. 作業用のディレクトリを作成し、移動します。

    mkdir txeventqlab
    cd txeventqlab
    
  3. GitHubのリポジトリをクローンします。

    git clone -b 22.10.1 --single-branch https://github.com/oracle/microservices-datadriven.git
    
  4. .bashrcのファイルを編集し、LAB_HOMEを定義します。

    source ./microservices-datadriven/workshops/txeventq-kafka/cloud-setup/env.sh
    source ${HOME}/.bashrc
    
  5. セットアップ用のスクリプトを実行します。

    source $LAB_HOME/cloud-setup/setup.sh
    

    セットアップでは、ユーザーOCIDを入力します。
    image.png
    このユーザーOCIDからテナント内に、今回使用するコンパートメントやAlways FreeのAutonomous Database(ADB)などの各リソースをOCI CLIから作成するコードが含まれています。

  6. Always FreeのADBインスタンスを1つ使用するため、確認します。

    Do you want to use Always Free Autonomous Database Instance? (y/N)
    
  7. ADBのパスワードを決め、セットアップ完了です。txeventqlabというコンパートメント内に、Always FreeのADBと、DB接続用のウォレットを格納したオブジェクトストレージのバケットが作成されているはずです。

2. Kafkaを使ったマイクロサービス(Spring Boot)の構築

まずは、Kafkaを使い、通信するマイクロサービスを構築してみます。

1. Kafka brokerの実行

  1. 以下のコマンドでKafkaの状態を確認します。
    ※Dockerコンテナ環境で動いているので、実際はdocker-compose psを実行しています。

    kafka-status
    

    このような結果が表示されるはずです。

    NAME                COMMAND                  SERVICE             STATUS              PORTS
    broker              "/etc/confluent/dock…"   broker              created
    connect             "/etc/confluent/dock…"   connect             created
    schema-registry     "/etc/confluent/dock…"   schema-registry     created
    zookeeper           "/etc/confluent/dock…"   zookeeper           created
    
  2. 以下のコマンドでKafkaを起動します。

    kafka-start
    
  3. しばらく待って再度状態を確認すると、このようにConnectサービスがhealthyになるはずです。

    kafka-status
    

    image.png

2. トピックの作成

Kafkaの準備ができたので、次にProducerから送り、Consumerで受け取るトピックを作成します。
以下のコマンドでTXEVENTQTOPIC1というトピックを作成します。

kafka-add-topic TXEVENTQTOPIC1

※実際は以下のコマンドを実行しています。

docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic "TXEVENTQTOPIC1"

3. ProducerとConsumerマイクロサービスのビルド

今回はSpring Boot+Spring Kafkaを使ってコード化したProducerとConsumerをMavenでビルドしています。

cd $LAB_HOME/springboot-kafka
./kafka-ms-build

正しくビルドできていれば、以下のように表示されます。
image.png

4. Producerからイベントを生成

アプリケーションがビルドできたのでデプロイします。

  1. 以下のコマンドを実行してイメージをビルドし、Docker Engine(Kafkaクラスタが動作しているのと同じ)内にProducerをデプロイします。 docker build

    cd $LAB_HOME/springboot-kafka
    ./kafka-ms-deploy-producer
    
  2. イメージのビルドができたら、続いてコンテナを起動して、Producerのマイクロサービスを実行します。 docker run

    cd $LAB_HOME/springboot-kafka
    ./kafka-ms-launch-producer
    
  3. 実行したら試しに一度テストしてみます。Producerにcurlで直接HTTPリクエストをし、イベントを送信してみます。

    curl -s -X POST -H "Content-Type: application/json"  \
         -d '{ "id": "id1", "message": "message1" }'  \
         http://localhost:8080/placeMessage | jq
    

    以下のように返ってくれば成功です。

    {
     "id": "0",
     "statusMessage": "Successful"
    }
    

5. Consumerでイベントを消費

同様の操作をConsumerでも行います。

  1. 以下のコマンドを実行してイメージをビルドし、Docker Engine(Kafkaクラスタが動作しているのと同じ)内にConsumerをデプロイします。docker build

    cd $LAB_HOME/springboot-kafka
    ./kafka-ms-deploy-consumer
    
  2. コンテナを起動して、Consumerのマイクロサービスを実行します。docker run

    cd $LAB_HOME/springboot-kafka
    ./kafka-ms-launch-consumer
    
  3. テストしてみます。Consumerマイクロサービスは、起動後、Kafka brokerからメッセージをデキューします。デキューに成功すると、先ほどProducerによって送信されたイベントをログで見ることができます。以下のコマンドでコンテナログから後半の6行をリストアップします。

    container-logs kafka-consumer 6
    

    以下のように{"id": "0", "message": "message1"}.と表示されていたら成功です。
    image.png

    Kafkaによって、メッセージが受け取れることがわかりました。

3. Transactional Event Queuesを使ったマイクロサービスの構築

今度は、Oracle Transactional Event Queues(TEQ)を使い、非同期に通信するマイクロサービスを構築してみます。

1. トピックの作成

以下のコマンドで、セットアップで作成したADBインスタンスに、TXEVENTQTOPIC1というトピックを作成します。

txeventq-add-topic TXEVENTQTOPIC1

ここでは、実際は以下のようなSQLを実行しています。

declare
  txeventq_topic      varchar2(30) := '&1' ;
  txeventq_subscriber varchar2(30) := '&2' ;
  subscriber     sys.aq$_agent;
begin
    if txeventq_topic is not null and txeventq_subscriber is not null
    then
        -- Creating a JMS type sharded queue:
        dbms_aqadm.create_sharded_queue(
            queue_name => txeventq_topic,
            multiple_consumers => TRUE);

        dbms_aqadm.start_queue(txeventq_topic);

        --- Create the subscriber agent
        subscriber := sys.aq$_agent(txeventq_subscriber, NULL, NULL);

        dbms_aqadm.add_subscriber(
            queue_name => txeventq_topic,
            subscriber => subscriber);
    else
        DBMS_OUTPUT.put_line('ERR : at least one of the variables is null !');
    end if;
end;
/
commit;

2. ProducerとConsumerマイクロサービスのビルド

先ほどと同様にProducerとConsumerをビルドします。一点、こちらではspring-kafkaではなく、TEQ用のKafkaJavaクライアントであるKafka Java Client for Oracle Transactional Event Queues Kafka (OKafka)を使用します。

cd $LAB_HOME/springboot-txeventq
./txeventq-ms-build

3. Producerからイベントを生成

  1. 以下のコマンドを実行してイメージをビルドし、Docker Engine内にProducerをデプロイします。docker build

    cd $LAB_HOME/springboot-txeventq
    ./txeventq-ms-deploy-producer
    
  2. コンテナを起動して、Producerのマイクロサービスを実行します。docker run

    cd $LAB_HOME/springboot-txeventq
    ./txeventq-ms-launch-producer
    
  3. 一度テストしてみます。Producerにcurlで直接HTTPリクエストをし、イベントを送信してみます。

    curl -s -X POST -H "Content-Type: application/json"  \
         -d '{ "id": "id1", "message": "TxEventQ message1" }'  \
         http://localhost:8090/placeMessage | jq
    

    以下のように返ってくれば成功です。

    {
     "id": "0",
     "statusMessage": "Successful"
    }
    

4. Consumerでイベントを消費

Consumerも同様にデプロイし、実行します。

  1. 以下のコマンドを実行してイメージをビルドし、Docker Engine内にConsumerをデプロイします。docker build

    cd $LAB_HOME/springboot-txeventq
    ./txeventq-ms-deploy-consumer
    
  2. コンテナを起動して、Consumerのマイクロサービスを実行します。docker run

    cd $LAB_HOME/springboot-txeventq
    ./txeventq-ms-launch-consumer
    
  3. テストしてみます。Consumerマイクロサービスは、起動後、TEQ brokerからメッセージをデキューします。デキューに成功すると、先ほどProducerによって送信されたイベントをログで見ることができます。以下のコマンドでコンテナログから後半の6行をリストアップします。

    container-logs txeventq-consumer 6
    

    {"id": "0", "message": "TxEventQ message1"}.と表示されていたら成功です。
    このように、Kafka brokerと同様に、TEQ brokerでもイベントの生成と消費を行うことができることがわかります。

4. KafkaとTEQの接続

ここでは、KafkaとTEQの互換性を確認してみます。TEQはKafkaとの双方向の通信が可能なので、変更をほぼリアルタイムで反映させることができます。

1. Kafka Connectのセットアップ

今回は、Apache Kafkaに含まれる、Kafkaと他のシステムを統合するフレームワークであるApache Kafka Connectを使用します。

  1. ADBのパスワードを指定して、以下のコマンドを実行します。DBユーザーやホスト名などのパラメータが入力され、KafkaトピックとTEQの間にConnect Sinkが設定されます。

    cd $LAB_HOME/kafka-connect-txeventq
    ./setup-kafka2txeventq-connect.sh
    
  2. Connect Sinkの状態を以下のコマンドで確認します。

    kafka-connect-status
    

    正しく接続できていれば、以下のような結果になります。

    {
     "name": "JmsConnectSink_txeventqlab",
     "connector": {
         "state": "RUNNING",
         "worker_id": "connect:8083"
     },
     "tasks": [
      {
         "id": 0,
         "state": "RUNNING",
         "worker_id": "connect:8083"
      }
     ],
     "type": "sink"
    }
    

2. Kafka brokerからメッセージをエンキュー

Connectorを実行したので、メッセージを生成して転送をテストします。Kafka Producerからエンキューし、TEQからデキューしてみます。

Kafka Producerが動いている状態で、curlでProducer APIにメッセージを送信します。

curl -s -X POST -H "Content-Type: application/json"  \
     -d '{ "id": "Sink1", "message": "Sink Message from Kafka to TxEventQ #1" }'  \
     http://localhost:8080/placeMessage | jq

以下のように返ってくれば、送信は成功です。

{
    "id": "1",
    "statusMessage": "Successful"
}

3. PL/SQLを使ってTEQからメッセージをデキュー

Producerからメッセージをエンキューすると、その後Connect SinkエージェントがKafkaトピックからメッセージを消費し、TEQにエンキューします。そして、OKafka ConsumerマイクロサービスやPL/SQLプロシージャを使って、TEQからメッセージをデキューすることができます。

以下のコマンドでTEQからデキューします。

txeventq-dequeue

実際は以下のようなPL/SQLを実行しています。

declare
  txeventq_topic      varchar2(30) := '&1' ;
  txeventq_subscriber varchar2(30) := '&2' ;

  dequeue_options    DBMS_AQ.dequeue_options_t;
  message_properties DBMS_AQ.message_properties_t;
  message_id         RAW(2000);
  my_message         SYS.AQ$_JMS_TEXT_MESSAGE;
  msg_text           varchar2(32767);
begin
    DBMS_OUTPUT.ENABLE (20000);

    if txeventq_topic is not null and txeventq_subscriber is not null
    then
        -- Dequeue Options
        dequeue_options.dequeue_mode  := DBMS_AQ.REMOVE;
        dequeue_options.wait          := DBMS_AQ.NO_WAIT;
        dequeue_options.navigation    := DBMS_AQ.FIRST_MESSAGE;
        dequeue_options.wait          := 1;
        dequeue_options.consumer_name := txeventq_subscriber;

      DBMS_AQ.DEQUEUE(
        queue_name => txeventq_topic,
        dequeue_options => dequeue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id);
        commit;
        my_message.get_text(msg_text);
        DBMS_OUTPUT.put_line('TxEventQ message: ' || msg_text);
    else
        DBMS_OUTPUT.put_line('ERR : at least one of the variables is null !');
    end if;
end;
/

以下のような結果が返ってくるはずです。

TxEventQ message: {"id": "1", "message": "Sink Message from Kafka to TxEventQ #1"}

PL/SQL procedure successfully completed.

まとめ

本記事では、Apache KafkaとOracle Transactional Event Queues(TEQ)という2つのメッセージ・ブローカーを使用したメッセージ・キューイングをご紹介しました。
TEQはコンバージド・データベースであるOracle Databaseの一機能でかつ、Kafkaとも互換性があるので、エンタープライズクラスのデータ中心のマイクロサービス・アーキテクチャを構築することができます。
本記事でご紹介したものは、全てOCIの無料のリソースで完結しているので、お時間があればぜひ試してみてください。

参考

8
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?