対象読者
2つの疎結合なシステム間を、メモリを大量に使っちゃうような大がかりな仕組みなしで、ほぼリアルタイムにデータロスなしに同期させたい、そういうアーキテクチャが欲しい方。
動機
Microservicesを疎結合に保ちつつ、バッチ処理やレポート機能でのクエリの高速化のため、データを同期させたいことがままあります。このあたりの話は、テキストの「5章 モノリスの分割」に載っています。
あるサービスから、データを定期的に吸い上げて、別のサービス、ツールに送るデータポンプという仕組みがあります。
イベント駆動でメッセージを関連サービスに送るイベントデータポンプや、バックアップツールを使って高速にデータ転送するバックアップデータポンプが紹介されていますが、結果整合性だけどなるべくリアルタイム(Near Real-Time)で同期させたいとなると、イベントデータポンプな仕組みしか選択肢にはならないでしょう。
これをどう実現するか考えてみます。
同期の方式
メッセージキューを使ってデータを送る
イベント駆動で外にメッセージを送るのは、メッセージキュー(通称えむきゅー)を使うのがふつうかと思いますが、大がかりなアーキテクチャになるし、障害点も増えるし結構大変です。
このあたりの話は、Enterprise Integration Patterns(EIP)が詳しいです。
http://www.enterpriseintegrationpatterns.com/patterns/messaging/
多数のパターンの存在が示すとおり、ただリモートのアプリケーションとのメッセージのやりとりといっても、多くの設計考慮点があります。
- メッセージ送るときのトランザクションは?
- メッセージの順番をどう担保するか?
- メッセージ送るときのフォーマットをどうするか?
- メッセージの受信側が落ちているときに、どうやってメッセージを保管し再送するか?
などなど… 特に再送の問題は厄介で、通常は送信が失敗するとDead Letter Queueに入りますが、受信側が長期間接続されないと、これが溢れてデータのロストにつながる可能性があるため、そのキャパシティプランニングは慎重におこなわなければなりません。
送受信方向の逆転
これは2年前のシステムエンジニア アドベントカレンダーで書いたのですが、接続方向を逆転させるとうまくいきます。
メッセージを受信したい側から送信側に、データを取りに行くようにします。そうすることで、送信側のと受信側で関連をもたせることなく、互いに疎にトランザクションを完結できます。中立なメッセージキューを必要とせず、障害ポイントも増やさずにすみます。
実装 StrongZero
2年前の実装はWebSocketを使ったClojureの実装でしたが、メッセージ送信処理が直列処理になっており、スループットの面で実用性が低かったので、Java+ØMQで作り直しました。ØMQ(ZeroMQ; ぜろえむきゅー)は、メッセージをやりとりする独立したプロセスを立てる必要なく(もちろんそうすることもできます)動作可能なメッセージングの仕組みです。プロトコルがシンプルなので、多くの言語バインディングがあります。今回はJavaの実装であるjeromq(じぇろえむきゅー)を使ってみました。
ØMQは、シンプルかつ高速なデータ転送の仕組みですが、メッセージの永続化や再送機能は省かれているので、フォールトトレラントな仕組みを作ろうとすると、自前でその辺りを用意する必要があります。
これを前述の送受信方向を逆転させる形で実装しました。ゼロエムキューのフォールトトレラントな部分を大幅に強化した仕組みなので、このたびストロングゼロと名付けてみました。
アーキテクチャは以下のようになります。一見複雑に見えますが、PumpはProducerと同じインスタンスで動作させることもできるので、実際は送信側にStrongZeroProducerを、受信側にStrognZeroConsumerを立てておくだけです。GitHub上のExampleを見ていただければ、その簡単さは伝わるかと思います。
StrongZeroを使ったサービス連携
まずSenderですが、以下の例のように自DBの更新と同一トランザクションでStrongZeroSender#sendを呼び、更新ジャーナルとして記録します。トランザクションがコミットされたら、変更があったことをStrongZeroPumpに知らせるため、StrongZeroSender#updatedを呼びます。システム間を連携させるために、既存のコードに影響あるのはこれだけです。
StrongZeroSender sender = new StrongZeroSender(notificationAddress, appConfig.getDataSource());
User user = new User();
user.setName("kawasima");
user.setEmail("kawasima@example.org");
UserDao userDao = new UserDaoImpl(appConfig);
appConfig.getTransactionManager().required(() -> {
userDao.insert(user);
sender.send("USER", user);
});
sender.updated();
送信側は、あとProducerとPumpを動かしておけば、自動的に送信がおこなわれます。下記例だとポート5959(ごくごく)でConsumerを待ち受けます。
String frontendAddress = "tcp://127.0.0.1:5959";
String backendAddress = "ipc://backend";
String notificationAddress = "ipc://notification";
ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
StrongZeroProducer producer = new StrongZeroProducer(frontendAddress, backendAddress);
producer.setObjectMapper(mapper);
producer.start();
workerThreadPool.submit(new StrongZeroPump(backendAddress, notificationAddress, datasource);
受信側は、取り込みロジックを登録したConsumerを動かしておけば、自動的にProducerからデータが同期されます。
String producerAddress ="tcp://127.0.0.1:5959";
ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
StrongZeroConsumer strongZeroConsumer = new StrongZeroConsumer("producer1", producerAddress, appConfig.getDataSource());
strongZeroConsumer.setObjectMapper(mapper);
MemberDao memberDao = new MemberDaoImpl(appConfig);
strongZeroConsumer.resisterHandler("USER", (id, msg) -> {
Member user = mapper.readValue(msg, Member.class);
appConfig.getTransactionManager().required(() -> {
if (memberDao.update(user) == 0) {
memberDao.insert(user);
}
strongZeroConsumer.consumed(id);
});
});
appConfig.getTransactionManager().required(strongZeroConsumer::start);
非常に簡単、かつほぼリアルタイム、かつデータのロスの心配なく、サービス間の同期が可能です。どんなマイクロサービスにもあいます!
更新データの順序性
StrongZeroのアーキテクチャ上、受信側から送信側に時系列にデータを取りに行くので、更新イベントの時系列性が肝になります。
ここではID生成大全にしたがい、Flakeを使って複数ノードからの更新でも、ミリ秒解像度で時系列に発行されるIDを元にソートしてデータを取り出すことによって、順序性を保っています。
ØMQについて
ØMQはメッセージキューというよりは、ネットワークソケットの再発明という感じの代物です。
- クライアント、サーバのどちらを先に起動しても良い。(両方が立ち上がったら自動的にコネクションが作られるし、切断されたら自動的に再接続に動く)
- ソケットをバインドするアドレスには、TCPやプロセス間通信、スレッド間通信などがあって、アドレス表記の違いだけで区別される。オペレーションは共通であるので、最初はスレッド間通信でやっていたものをスケールさせるために、プロセス間通信やTCPに変えていくのが非常に簡単である。
- ソケットには、メッセージのやりとりのルール(前述のEIPに載っているパターン)によって、いくつかの種類がある。
- REQ/REP - メッセージ送信/受信を繰り返す単純なパターン
- DEALER/ROUTER - REQ/REPを複数のクライアント、複数のサーバとの接続を想定して拡張したパターン
- PUB/SUB - あるサーバから送ったメッセージを、その種類によってフィルタしながら複数のクライアントが受け取るパターン
- PULL/PUSH
- PAIR
ExecutorService executor = Executors.newCachedThreadPool();
ZContext context = new ZContext();
Socket reqSocket = context.createSocket(REQ);
Socket repSocket = context.createSocket(REP);
repSocket.bind("inproc://reqrep");
reqSocket.connect("inproc://reqrep");
executor.submit(() -> {
LOG.info("{}", repSocket.recvStr());
repSocket.send("Fine, thank you!");
});
executor.submit(() -> {
reqSocket.send("How are you?");
LOG.info("{}", reqSocket.recvStr());
});
特にØMQのスレッド間通信を使って書くと、非同期プログラミングがコールバックではなく、ソケットを経由したメッセージの受け取りになるので、また違ったパラダイムでコードが書けて楽しみがえられます。
なお、ØMQについて深く知りたい方は、ØMQガイドブックの素晴らしい和訳があるので、そちらを読んでみてください。
http://www.cuspy.org/diary/2015-05-07-zmq/zguide-ja.pdf
まとめ
StrongZeroは、Web APIでは性能が出にくい場合のマイクロサービスアーキテクチャにおける一服の清涼剤です。即効性と常習性があり、簡単に導入できるので、ぜひみなさんも使ってみてください!