10
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Oracle Advanced Queuing(AQ)使ってみませんか?

Posted at

Advent Calendar初参加、Qiita初投稿です!

この記事は、JPOUG Advent Calendar 2019 の16日目の記事です。
15日目はtomoさんの「Flex DiskGroup使ってみたよ!」でした。
ASMは私も好きな機能ですが、共有ディスクの調達・設定が面倒なので11g以降、実機を使った検証や情報のキャッチアップができていません。自分の勉強不足を改めて認識する記事でした。

さて先日2019年12月5日に、Oracle Databaseのライセンス緩和により一部オプションがStandard Editionでも使えるようになりました。

 Oracle 19cライセンス緩和 機械学習+地理データがStandard Editionで使用可能に

Oracle Databaseだけに限りませんが、いわゆる商用製品(OS、ミドルウェア)の多くにはエディションの考え方があり、エディションによって(有償または無償で)使える機能・使えない機能があります。

費用的な理由でエンタープライズ用途でもエントリークラスのエディションのことが多々あるので、上記のようなライセンス緩和は、技術が普及する観点から歓迎されることだと思います。

このエントリーではspatialや機械学習と同様、Oracle Databaseの全てのエディションで使える機能の中で、私のお気に入りのひとつ"Advanced Queuing"(AQ)の使い方を紹介します。(業務システムで使っていましたが、やってみた!的な内容です)

Oracle Advanced Queuing(AQ)とは

 簡単にいうと非同期でメッセージをやり取りする仕組みです。
メッセージは同一ノード内で受信・送信することも、(DBリンクを使用して)異なるノード間でも受信・送信することもできます。

 Oracle 9iでAQが導入される以前はPL/SQLパッケージ DBMS_PIPEを使用して非同期にメッセージ送信を行っていました。ただAQと異なりDBMS_PIPEでは同一ノード内(RAC環境)でしか処理ができず、受信側のプロセスを(RACの場合、全ノードで)起動させておく必要がありました。受信側のプロセスを起動し忘れると、もちろん非同期の処理は実行されません。

 詳細はマニュアルをみてください。

用途(どんな時に使うのか)

ユーザー・アプリケーションを作成していると、利用者の体感速度を落とさないために、いいかんじで遅延させて実行させたい処理があります。(人間と違って「後でやる」と言ってやらなかったということはコンピュータシステム上許されません)
例えばECサイトで、発注者や仕入れ先、運送会社などへのFAX送信処理や帳票作成処理などがその例です。(あくまで例です)

 またOracle Databaseの内部ではサーバー生成アラート(キュー名:ALERT_QUE)で使われているそうですが、エラー・ワーニングの発報も有用な用途のひとつだと思われます。

AQの課題

  • 知名度が低い(Oracle Databaseを使っている人でも知らない人が多い印象)
  • PL/SQLでしか設定できず導入の敷居が高い(GUIで設定できない)
  • 日本語情報が少ない(書籍だとこれくらい?。かなり参考にさせてもらいました)
  • 将来のバージョンで非推奨、デサポートされる不安がある。(個人の感想です)

などがあります。
今回、長年使ってきたAQの使い方をまとめてみましたので、ご参考ください。

メッセージのシナリオ

 異なるサーバ1とサーバ2で以下のメッセージを送信・受信するとします。

id mesg
1 こんにちは
2 今晩は
3 おはようございます

 ※今回は1インスタンスで試すため同一データベース内でユーザーを分けて受送信します。(USER1とUSER2)
 18c XE(18.4.0.0.0)で試しました。Always Freeで試そうと思いましたがいろいろ躓くと思ってやめました。

メッセージ連携する時に事前に決めることは以下の7点です。

  • スキーマ    (送信側スキーマ、受信側スキーマ)
  • DBリンク名  (送信側のみ。パブリックDBリンクの必要あり)
  • ユーザー定義型 (送信側と受信側は同じ定義・名前。メッセージの構造を定義)
  • キューテーブル名(送信側と受信側は同じ定義・名前)
  • キュー名    (送信側と受信側は同じ定義・名前)
  • サブスクライバ名(送信側と受信側は同じ定義・名前。送信先の識別子)
  • コールバック関数(受信側のみで定義。非同期で行う業務処理を定義)

 ユーザー定義型とコールバック関数以外は、任意の名称で問題ありませんが、大文字アルファベット、数字のみで定義することをお勧めします。
小文字が混ざっているとうまく設定できないことがありました。今回は以下のようにします。

  • スキーマ     USER1(送信側)、USER2(受信側)
  • DBリンク名   DBLINK_USER2
  • ユーザー定義型  MYMESG
  • キューテーブル名 MYMESG_TAB
  • キュー名     MYMESG_Q
  • サブスクライバ名 SUBSCR
  • コールバック関数 AUTO_RECV

 ※USER1/USER2は作成済であるとします。

設定手順例

送信側の設定

(送信側)パブリックDBリンクを作成

SQL> CREATE PUBLIC DATABASE LINK DBLINK_USER2 CONNECT TO USER2 IDENTIFIED BY USER2 USING 'ORCL';

(送信側)権限付与

SQL> GRANT AQ_ADMINISTRATOR_ROLE TO USER1;

(送信側)ユーザー定義型、キュー表、キューを作成しキューを開始する

SQL> CREATE TYPE MYMESG AS OBJECT (
           ID    NUMBER(10,0),
           MESG  VARCHAR2(30)
         )
/
SQL> exec DBMS_AQADM.CREATE_QUEUE_TABLE( -
           queue_table        => 'MYMESG_TAB', -
           queue_payload_type => 'MYMESG', -
           multiple_consumers => TRUE -
       );
SQL> exec DBMS_AQADM.CREATE_QUEUE( -
           queue_name  => 'MYMESG_Q', -
           queue_table => 'MYMESG_TAB' -
       );
SQL> exec DBMS_AQADM.START_QUEUE ( -
           queue_name  => 'MYMESG_Q' -
       );

受信側の設定

(受信側)権限付与

SQL> GRANT AQ_ADMINISTRATOR_ROLE TO USER2;
SQL> GRANT EXECUTE ON DBMS_AQ    TO USER2;

(受信側)ユーザー定義型、キュー表、キューを作成しキューを開始する(※送信側と同じです。)

SQL> CREATE TYPE MYMESG AS OBJECT (
           ID    NUMBER(10,0),
           MESG  VARCHAR2(30)
         )
/
SQL> exec DBMS_AQADM.CREATE_QUEUE_TABLE( -
                queue_table        => 'MYMESG_TAB', -
                queue_payload_type => 'MYMESG', -
                multiple_consumers => TRUE -
         );
SQL> exec DBMS_AQADM.CREATE_QUEUE( -
              queue_name  => 'MYMESG_Q', -
              queue_table => 'MYMESG_TAB' -
     );
SQL> exec DBMS_AQADM.START_QUEUE ( -
              queue_name  => 'MYMESG_Q' -
     );

(受信側)サブスクライバ(購読者)を登録

SQL> exec DBMS_AQADM.ADD_SUBSCRIBER( -
                queue_name  => 'MYMESG_Q', -
                subscriber  => SYS.AQ$_AGENT('SUBSCR',NULL,NULL), -
                rule        => NULL -
         );

送信側の設定

(送信側)サブスクライバ(購読者)を登録

SQL> exec DBMS_AQADM.ADD_SUBSCRIBER( -
                queue_name  => 'MYMESG_Q', -
                subscriber  => SYS.AQ$_AGENT('SUBSCR','USER2.MYMESG_Q@DBLINK_USER2',NULL), -
                rule        => NULL -
         );

(送信側)伝播(リモートインスタンスへの転送スケジュール)の開始

SQL> exec DBMS_AQADM.SCHEDULE_PROPAGATION( -
           queue_name    => 'MYMESG_Q', -
           destination   => 'DBLINK_USER2', -
           duration      => NULL, -
           latency       => 0 -
         );

  ※伝播の開始は、DBリンク先毎に最初の1回実行すればよいです。

受信側の設定

(受信側)受信側でメッセージを受け取った時に、自動でデキューするプロシージャを作成。

SQL> CREATE OR REPLACE PROCEDURE USER2.AUTO_RECV(
             context    IN RAW,
             reginfo    IN SYS.AQ$_REG_INFO,
             descr      IN SYS.AQ$_DESCRIPTOR,
             payload    IN VARCHAR2,
             payloadl   IN NUMBER
         )IS
             mesg    MYMESG;
             dequeue_options     DBMS_AQ.DEQUEUE_OPTIONS_T;
             message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
             message_handle      RAW(16);
         BEGIN
             -- サブスクライバ名を設定
             dequeue_options.CONSUMER_NAME := 'SUBSCR';
             DBMS_AQ.DEQUEUE(queue_name         => 'SCOTT.MYMESG_Q',
                             dequeue_options    => dequeue_options,
                             message_properties => message_properties,
                             payload            => mesg,
                             msgid              => message_handle);
             --
             -- ここから業務処理を実装する。今回はINSERTのみする。
             --
             INSERT INTO LOG_MESG VALUES(mesg.ID,  mesg.MESG);
             COMMIT;
         EXCEPTION
             WHEN OTHERS THEN Null;
         END AUTO_RECV;
/

(受信側)上記のプロシージャを自動デキューするように登録。

SQL> DECLARE
             REGINFO  SYS.AQ$_REG_INFO;
             REGINFOS SYS.AQ$_REG_INFO_LIST;
     BEGIN
         REGINFO := SYS.AQ$_REG_INFO(
                     'USER2.MYMESG_Q:SUBSCR',
                      DBMS_AQ.NAMESPACE_AQ,
                      'plsql://USER2.AUTO_RECV?PR=1',
                      NULL
                    );
         REGINFOS := SYS.AQ$_REG_INFO_LIST(REGINFO);
         DBMS_AQ.REGISTER( REGINFOS,1);
     END;
/

エンキュー(送信)の例

以上で、設定は終了です。
エンキュー(メッセージ送信)は、以下のようなPL/SQLを実行します。
実際は、この処理をラップするプロシージャ、ファンクションを作ると便利です。

コマンド例

DECLARE
    mesg MYMESG;
    enqueue_options     dbms_aq.enqueue_options_t;
    message_properties  dbms_aq.message_properties_t;
    message_handle      RAW(16);
BEGIN
	-- ひとつめのメッセージを送信
    mesg := MYMESG( 1 ,'こんにちは' );
    dbms_aq.enqueue(queue_name         => 'MYMESG_Q',
                    enqueue_options    => enqueue_options,
                    message_properties => message_properties,
                    payload            => mesg,
                    msgid              => message_handle
    );
	-- ふたつめのメッセージを送信
    mesg := MYMESG( 2 ,'今晩は' );
    dbms_aq.enqueue(queue_name         => 'MYMESG_Q',
                    enqueue_options    => enqueue_options,
                    message_properties => message_properties,
                    payload            => mesg,
                    msgid              => message_handle
    );
    -- トランザクションを終了(COMMIT or ROLLBACK)しないとキューに溜められない
    COMMIT;
END;
/

上記を実行すると、自動デキューのプロシージャが実行されLOG_MESG表にメッセージが登録されます。

まとめ

 本格的な非同期処理を行いたい場合は、専用のミドルウェアを導入すると思いますが、ちょっとしたシステムの場合、Oracle Databaseの標準機能AQを使って実装することができます。
9iからの機能でありDatabase内部でも使われているため枯れた機能に入ると思います。

Oracle DatabaseにはAQ以外にもRDBMSの範囲を超えた様々な機能があります。有償オプションの機能は最大限活用されていると思いますが、それ以外の無償機能も使ってみてはいかがでしょうか?。開発生産性が向上すると思います。

(注意) 昔(9i)の知識で動作確認した程度なので、実際に使用する場合は、最新のマニュアルを確認して使ってください。

明日は実行計画は、SQL文のレントゲン写真だ! Oracle Database編 (全部俺) Advent Calendar 2019を書かれているHiroshi Sekiguchiさんです。

おまけ

自動デキュー設定を確認する

SQL> SELECT * FROM SYS.REG$;  

自動デキューを解除する

DECLARE
    REGINFO  SYS.AQ$_REG_INFO;
    REGINFOS SYS.AQ$_REG_INFO_LIST;
BEGIN
    REGINFO := SYS.AQ$_REG_INFO(
                 'USER2.MYMESG_Q:SUBSCR',
                 DBMS_AQ.NAMESPACE_AQ,
                 'plsql://USER2.AUTO_RECV?PR=1',
                 ''
               );
    REGINFOS := SYS.AQ$_REG_INFO_LIST(REGINFO);
    DBMS_AQ.UNREGISTER( REGINFOS,1);
END;
/

手動デキュー

DECLARE
    mesg MYMESG;
    dequeue_options     dbms_aq.dequeue_options_t;
    message_properties  dbms_aq.message_properties_t;
    message_handle      RAW(16);
BEGIN
    dequeue_options.CONSUMER_NAME := 'SUBSCR';

    DBMS_AQ.DEQUEUE(queue_name         => 'MYMESG_Q',
                    dequeue_options    => dequeue_options,
                    message_properties => message_properties,
                    payload            => mesg,
                    msgid              => message_handle);

    DBMS_OUTPUT.PUT_LINE ('Message[' || mesg.id || '][' || mesg.mesg || ']');
    COMMIT;
END;
/

AQをPostgreSQLに移行する場合

OracleからPostgreSQLに移行する場合、PostgreSQLのdblinkとNOTIFY/LISTEN機能を組み合わせることでAQと同様のことができそうです。まだ試していませんが、AQよりDBMS_PIPEに似た機能になります。

10
6
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?