はじめに
IBM App Connect Enterprise(以下 ACEと略します)は、IBMが提供するシステム間連携のソフトウェアで、システム間のさまざまなギャップを整理して連携を実現します。
メッセージ連携は「1メッセージを変換、編集して連携する」という使い方が中心ですが「1件のメッセージをn件メッセージに分割」「n件メッセージを1件に統合」といったニーズもあります。
1件のメッセージをn件に分割する方法の一例は、別の記事で触れています。
当記事では、n件のメッセージを1件にまとめる手順について確認していきます。この記事ではMQからメッセージを取得するケースを想定しています。
技術要素としてはACEのCollectorノード、およびMQのメッセージグループを使用します。
Collectorノード
Collector ノードは、メッセージフロー内で複数のメッセージを受け取り、それらを単一のメッセージコレクション(集合)にまとめる機能を提供します。このノードを使うことで、後続のノードで複数のメッセージを一括して処理できるようになります。
フローの動作イメージ

メッセージフロー上にCollectorノードがある場合、処理の流れは一旦Collectorノードでストップします。指定された条件に基づき他のメッセージの到着を待ち、条件が達成されたら次ノードに処理が移ります。
入力ターミナルの追加
多くのノードでは「In」という単一の入力ターミナルが提供されますが、Collectorは複数のノードからのメッセージ入力・制御を想定しており、入力元ノードを識別するために入力ターミナルを手動で設定します。この入力ターミナルは複数設置することができます。

入力ターミナルの追加は、Collectorノード上で右クリックメニューを開き、ここから選択します。
集約条件の設定
Collectionノードでメッセージを受け取る際に、
- どのメッセージを集約するか
- どのように集約完了を判断するか
の2点を、入力ターミナル毎に指定できます。
偶然同じタイミングで到着した無関係なメッセージを集約対象に含めないように「これが同じメッセージ群」と指定します。
また、「ここからここまでが一群のメッセージ」を区切る条件を付加して、適切なタイミングで次ノードに伝搬するタイミングを指定します。
集約条件

Collectorノードのプロパティにある「Correlation Path」「Correlation pattern」の2つで指定します。
Correlation Pathは、メッセージ中のどの要素をもとに判断するかを指定します。
Correlation patternは、pathで指定した要素について、同一メッセージ群か否かを判断する条件を指定します。上の例では何も指定していません、この場合はpathに指定された項目が完全一致したら同一メッセージ群と判断します。
出力条件

メッセージを次ノードに進める条件としてQuantity(何件メッセージが到着したとき)、Timeout(最初のメッセージが到着した後、何秒待機したとき)を指定することができます。
メッセージ仕様上、集約する件数が固定の場合はQuantityを指定する。それ以外の場合は設計上どの程度の時間間隔で到着するかを考慮のうえTimeoutを指定する。という使い分けが考えられます。
MQ メッセージグループ
Collectorノードの設定で、メッセージの集約件数が不定の場合にはTimeoutを指定することになります。ただ時間間隔が明確に指定できない場合もあると思います。
この場合に、MQのメッセージグループという機能が役に立ちます。
これは、物理的には複数のメッセージでありながら、論理的にそれらが1つのグループに属することを示す仕組みです。これにより、アプリケーションは関連する一連のメッセージをまとめて扱うことができます。
グループメッセージは、MQ メッセージヘッダー (MQMD) の以下のフィールドによって制御されます。
MQMDフィールド | 説明 |
---|---|
GroupId | 同じグループに属するメッセージは、すべて同じ GroupId を持ちます。これにより、どのメッセージが同じグループかを識別できます。 |
MsgFlags | メッセージがグループに属しているか、またグループの最後のメッセージかを示すフラグが含まれます。 MQMF_MSG_IN_GROUP: メッセージがグループの一部であることを示します。 MQMF_LAST_MSG_IN_GROUP: メッセージがグループの最後のメッセージであることを示します。グループの最後のメッセージには、MQMF_MSG_IN_GROUP と MQMF_LAST_MSG_IN_GROUP の両方が設定されます。 |
MsgSeqNumber | グループ内でのメッセージの順序を示す番号です。通常は 1 から始まり、メッセージごとに 1 ずつ増加します。これにより、受信側でメッセージの順序性を保証できます。 |
MQ グループメッセージを利用することで、送信側は関連するメッセージ群を明確に定義でき、受信側はグループ全体のメッセージが揃ったことを確実に認識できます。
Collector ノードと MQ グループメッセージの連携
これら2つの機能を連携させることで、「関連するメッセージがすべて揃うまで待ち、揃ったらまとめて処理する」という動作を実現できます。
MQInput ノードの設定

プロパティのAdvancedタブにあるAll messages availableという項目をチェックします。これにより、グループに含まれるメッセージはグループ最後のメッセージ(同一GroupIdのMQMF_LAST_MSG_IN_GROUPフラグを持つメッセージ)が到着するまで受信しません。
Collector ノードの設定
次に、MQInput ノードの後続に配置した Collector ノードで、どのメッセージを同じコレクションにまとめるかを指定します。

Correlation path には、MQメッセージの仕様にあわせて「$Root/MQMD/GroupId」というフィールドを指定します。Timeoutはプロジェクト毎に設定してください。全件揃った段階でメッセージが到着するため、あまり長いインターバルを設定する必要はないでしょう。
ESQL での統合メッセージの参照
Collector ノードから出力されるメッセージは、特殊な構造を持っています。ルート要素 (例: InputRoot) の下に、収集された個々のメッセージが要素の配列として格納されます。デフォルトでは、この配列は Collection という名前を持ち、各要素は 入力ターミナルの名前が割り当てられます。
例えば、入力ターミナルの名前が「InMsg」である場合は次のような構造になります。
InputRoot
Properties
MQMD
...
Collection
InMsg[1]
MQMD (元のメッセージ1のMQMD)
DFDL (元のメッセージ1のボディ)
InMsg[2]
MQMD (元のメッセージ2のMQMD)
DFDL (元のメッセージ2のボディ)
InMsg[3]
MQMD (元のメッセージ3のMQMD)
DFDL (元のメッセージ3のボディ)
Computeノードでコレクションメッセージを処理する場合、次のように全件を参照することができます(一例です、もっとスマートな記述はありそうです)。
DECLARE msgData CHARACTER '';
DECLARE index Integer 1;
DECLARE myref REFERENCE TO InputRoot.Collection.InMsg[1];
MOVE myref TO InputRoot.Collection.InMsg[1];
WHILE LASTMOVE(myref) = TRUE DO
SET msgData = msgData || InputRoot.Collection.InMsg[index].DFDL.msgModel.record.msgdata;
SET index = index + 1;
MOVE myref TO InputRoot.Collection.InMsg[index];
END WHILE;
おわりに
今回は、ACEで複数メッセージを統合する方法として、CollectorノードとMQメッセージグループを連携させるアプローチをご紹介しました。
当記事が、ACEを利用したメッセージフロー開発の一助となれば幸いです。
さらに詳しい情報をお探しの方へ
IBMの最新情報、イベント情報、さらに役立つ資料は、以下のIBM Communityでも発信・格納されています。
最新のトレンドや有益な情報をチェックするために、ぜひご覧ください!
-
WebSphere: IBM Community - WebSphere
WebSphere関連の最新情報やディスカッション、イベント情報、技術資料を公開中! -
ELM (Engineering Lifecycle Management): IBM Community - ELM
ELMに関する最新のイベント情報、ナレッジ共有、便利なドキュメントをチェック! -
Integration: IBM Community - Integration
Integrationに関する最新のイベント情報、ナレッジ共有、便利なドキュメントをチェック!