前回までに、トークンのブリッジアプリとトークンの一斉配布機能を作成しました。
今回は、ブロックチェーンのスマートコントラクトのイベント処理とRabbitMQを使ったキュー管理について検証した点を紹介します。
前回までの課題と改善策
以前作成したトークンのブリッジアプリでは、送信元トークンから送信先トークンに変換する際、スマートコントラクトが持つTransferイベントの受信をトリガーとして使用していました。このイベント受信に基づいて処理が実行され、リクエストが増えるとそれに合わせて処理が複数回実行されました。
事象および問題点は以下:
- リクエストの重複送信問題: 多くのリクエストが発生すると、同じトランザクションが複数回送信されてしまい、EVMに送信されたトランザクションデータが拒否されるか、空振りします。
- 処理失敗後の再送信問題: SecondNetworkへのリクエストを送信した後、エラーが発生すると、そのリクエストが保存されずに失われるため、再送信できません。情報をログファイルなどに記録し、再処理の際にその情報を取得して再実行できる方法もありますが、この方法ではログファイルの管理が複雑になります。
処理の流れと問題発生タイミングのイメージは以下:
複数のユーザが1000FIRSTを送付後、SecondNetworkのトークン転送までの処理の間にエラーが発生します。
❶ 同時リクエストの上昇により、送信先へのリクエストの重複(Nonce値の重複問題など)が発生します。
❷ 上記❶の動作によりエラーになった場合、リクエストが空振りします。
❸ 上記❶の動作によりエラーになった場合、再送信のための情報が消失しリカバリができません。
そのため、今回はリクエストの数が増えても、トランザクションが確実に送信でき、さらに、トランザクションが拒否された場合でも再実行できるように、リクエストをキューで管理する方法(RabbitMQを使用)に変更しました。この新しい実装について、動作検証の結果と一緒にサンプルコードを紹介したいと思います。
改善方法と実装方針
送信するトランザクション量の制御と、エラー発生時の再処理の対応について実装しました。
実装方針は以下:
キューに格納する(スマートコントラクトのイベント受信処理)
受信したイベントから、後続の処理に必要な文字列を生成し、キューに格納します。
キューに格納する際に、「確認メッセージ」を使用するようにします。
「確認メッセージ」を有効にしてキューに格納すると、キューから情報を取得後、ACKもしくはNACKという応答をもとにキューから削除するか否か制御することができます。
キューから情報を取り出し後続の処理を行った結果をハンドリングし、処理が失敗した場合、当該キューに対して「NACK」を発行する実装にします。
ここで、NACKとして確認メッセージを受信したキューは、キュー管理から削除されず次回のキュー取得処理の対象になります。
これにより、再処理(リカバリ)が可能になります。
キューを取得する(DeQueueから後続のトランザクション送信処理)
キューからメッセージを取り出し、正常なメッセージの場合後続の処理を行います。
メッセージを取り出す際は、1件ずつ取得するようにします。
また、連続したリクエストにより高負荷にならないように、処理のインターバルを設けます。
インターバルは、経過時間と処理確率(30%)により制御するようにしました。
後続処理が正常終了したら「ACK」を発行すると、当該メッセージは管理対象から削除されます。
後続処理が異常終了したら、「NACK」を発行し再送対象にします。
改善後の処理イメージ
検証で実装したソースコード
今回実装したソースコードはこちらのGithubに置いていますので、参考にみていただければと思います。
アプリの詳細のセットアップについては、Github直下のREADME.txtを参照くださいませ。
上記Githubにあるソースコードは、トークンのブリッジのコードが含まれます。
ディレクトリ構成は以下:
- contracts : Smart contract ( ERC20 based Primary and Secondary Token.)
- backend : Backend java script (With RabbitMQ on the backend.)
- frontend : Frontend type script (Ionic Ver7/Angular Ver16)
Smart contractを目的のEVM上にデプロイ後、Backendの実行、フロントエンドを起動し動作を確認することができます。
ソースコードの詳細
送信するトランザクション量の制御と、エラー発生時の再処理の対応について修正した箇所を示します。
backend/events-producer.js (キューに格納する処理部分を一部抜粋)
Line 68 - 81
amqp.connect('amqp://localhost', function(connectErr, connection) {
if (connectErr) {
throw connectErr;
}
// `createConfirmChannel`の関数を用いて確認メッセージに対応しキューを格納します。
connection.createConfirmChannel(async function(createConfirmErr, channel) {
if (createConfirmErr) {
throw createConfirmErr;
}
var queue = PRIMARY_QUEUE;
var msg = from + '|' + to + '|' + value;
// `{ persistent: true }`はメッセージを永続的としてマークすることを示します。
// RabbitMQでは、メッセージを永続的としてマークすると、メッセージがディスクに保存され、
// RabbitMQサーバーが再起動してもメッセージが失われないことを保証します。
channel.sendToQueue(queue, Buffer.from(msg), { persistent: true }, function (err) {
console.log(" [x] Response: events-consumer >> ", err ? "NACK" : "ACK", err);
});
console.log(" [x] sendToQueue %s", msg);
console.log('[I] SUCCESS: Successfully executed the handlePrimaryEvent Queueing.');
// イベント発生量に必要に応じて`sleep`を定義します。
await sleep(1500, 1000);
});
});
backend/events-consumer.js(キューを取得する処理部分を一部抜粋)
Line 157 - 195
// キューを1件ずつ取得します。
channel.prefetch(1);
channel.consume(queuePrimaryEvent, async function(msg) {
console.log("[+] queuePrimaryEvent");
// キューを取得後、30%の確率で後続の処理をします。
// リクエスト量とEVMへのリクエスト応答状態に合わせて確率を変更します。
if (Math.random() > 0.7) {
let ret = false;
count+=1;
console.log(" [x] %s Received %s : count %d .", queuePrimaryEvent, msg.content.toString(), count );
const words = msg.content.toString().split('|');
if ( words.length === 3 ) {
console.log(" [x] Call consumePrimaryEvent");
// The approach to parsing the string depends on the format of the string in the producer's queue.
let from = words[0];
let to = words[1];
let value = words[2];
ret = await consumePrimaryEvent(secondaryWallet, from, to, value, secondaryRpcProvider, secondaryTokenContract)
}
if (!(ret)) {
// `consumePrimaryEvent`内の処理が異常終了した場合、NACKを発行します。
// ここで実行されたキューは、リトライ対象になります。
console.log(" [W] Result consumePrimaryEvent >> ", "NACK");
channel.nack(msg);
} else {
// `consumePrimaryEvent`内の処理が正常終了した場合、ACKを発行します。
// ここで実行されたキューは、キュー管理から削除されます。
console.log(" [x] Result consumePrimaryEvent >> ", "ACK");
channel.ack(msg);
}
} else {
// 処理対象にならなかったキューはリトライ対象になります。
console.log(" [I] Result consumePrimaryEvent >> ", "NACK");
channel.nack(msg);
}
// You will check and configure the state of the chain you have chosen.
await sleep(1000, 1000);
});
今回実装する際に使用したライブラリ群
検証で実装したソースコードを動作させるためのライブラリを以下に記します。
使用するアプリケーションとライブラリ
- Erlang 26.0.2
- RabbitMQ 3.12.4
- Node.js
- ethers
- amqplib
キュー管理ソフトウェア
キュー管理ソフトウェアにRabbitMQを選択しました。
このアプリはオープンソースであり、また、検証するにあたりJavaScriptで実装できるため、学習コスト実装コストがかからないという面からRabbitMQを採用しました。
ダウンロード
トークンの交換アプリを起動する環境に、ErlangとRabbitMQをインストールします。
検証環境はWindows10を使用します。動作確認用のアプリを使ってまずは挙動を確認します。
1. Erlangのダウンロード
https://www.erlang.org/downloads
画面の右側からダウンロードし、exeインストーラをダブルクリックしてインストールします。
インストールはシンプルです。画面の指示に従いインストールを進めます。
2. RabbitMQのダウンロード
https://www.rabbitmq.com/download.html
今回はWindowsを使用しているため、「Windows Installer」リンクをクリックします。
exeインストーラをダブルクリックしてインストールします。
インストールはシンプルです。画面の指示に従いインストールを進めます。
対策をしてみた結果
今回の問題を回避するために、RabbitMQを適用してみました。
キューイングするようにした結果、トランザクションをリクエストするために要する時間がふえましたが、問題点は解消されました。
RabbitMQ自体は色々な機能を備えていますが、今回はメッセージキューイングと可用性信頼性について、安定した動作になったことが確認できました。
メッセージキューイングは、分散システムでのデータ通信を効果的に管理し、処理を非同期に行う仕組みです。これにより、可用性と信頼性を向上させる重要な要素であると実感しました。
メッセージキューを使用することで、アプリケーションやマイクロサービス間でメッセージの安全な受け渡しと処理を実現できます。可用性の面では、メッセージはキューに保存され、受信者が利用可能なときに処理できるため、システムのダウンタイムや過負荷から保護されます。
信頼性に関しては、メッセージは失われることなく確実に処理されます。キューがデータを永続的に保存する機能により、データの一貫性と完全性が確保され、システムの信頼性が向上できたように思えます。
最後に
今回検証するにあたりブリッジ機能のソースコードをもとに改善、改修を行いました。
スマートコントラクトのイベントを契機にバックエンドでなんらかの処理をさせる場合、この考え方が適用できると考えています。
次回は、DAO関連のアプリについて検証したいと思います。