CGIのキュー版を作ってみた
各種キューメッセージを標準入出力に引き渡す仕組みにより、言語を選ばず手軽にキューメッセージ駆動のワーカーアプリケーションを実装できます。
とりあえず自分が使うものをコンセプトとして公表してみたという感じですが、新たなアイデアや実装のきっかけになれば幸いです。
NodeJSがインストールされていれば次のコマンドでCLIコマンドをインストールできます。
npm i -g cqi-cli
動機
SQSをよく使うのですが、ワーカーアプリケーションの実装をいろいろな言語で手軽にできたらいいなと思ったのが発端です。
モチーフは古き良きCGIです。
- 標準入出力にマッピングすれば言語を選ばない
- ライブラリ不要でもキューとの連携ができる
- メッセージキューシステムの切り替えも容易
概要
メッセージモデル
メッセージは構造化データをJSONでシリアライズした1行のテキストであることを前提とします。
{"text":"the message"}
キューのシステムによってさまざまなメタデータが使えますが、標準化のため極力シンプルにしてます。
パターン1: キューメッセージごとに単発でプロセスを起動
例えば次のように標準入力が受け取ったJSONのtext
というエントリを抜き出し、/tmp/log.txt
に追記するプログラムを記述します。
メッセージを正常に消費したことを終了ステータスコード0で示します。
もし終了ステータスコードが0以外の場合はエラーが起きたとしてメッセージを削除せず、リトライなどに回されます。
#!/bin/sh
MESSAGE=$(cat - | jq -r '.text')
echo "$(basename $0) received .text: $MESSAGE" >> /tmp/log.txt
# exit 0
このプログラムをSQSに繋げるには次のように記述します。-l
はListener
、-d
はDispatcher
の略で、どこから受け取ったメッセージをどこで処理するかをパラメータとして記述します。
フォーマットは、<コンポーネント名> <jsonicによる設定値>
としています。
cqi \
-l "sqs region: ap-northeast-1, queueUrl: https://SQS/QUEUE" \
-d "exec programPath: /path/to/exec.sh"
次のように-l
オプションを変更すると、SQSではなく、REPLインターフェースを入力にできます。デバッグに便利です。
cqi \
-l "repl" \
-d "exec programPath: /path/to/exec.sh"
# -l のデフォルトはreplなので -l "repl" は省略可
逆に、-d
オプションを次のように変更するとSQSからのメッセージを標準エラーログに出力します。これもデバッグ用です。
cqi \
-l "sqs region: ap-northeast-1, queueUrl: https://SQS/QUEUE" \
-d "echo"
# -d のデフォルトはechoなので -d "echo" は省略可
パターン2 サブプロセスと標準入出力で連続通信
メッセージごとに都度プロセスを起動するのは実行コストが高いので、事前にサブプロセスを起動しておき、メッセージを標準入力に1行ずつ渡す方法もあります。
パターン1がCGI
だとしたら、こちらはFastCGI
です。消費側をPerlで書くと次のような感じです。while(<STDIN>)
で標準入力からデータを受け続け、同じく/tmp/log.txt
に追記するだけのプログラムです。
#!/usr/bin/perl
use File::Basename;
use JSON 'decode_json';
chdir dirname($0);
my $basename = basename($0);
local $| = 1;
while(<STDIN>) {
chomp;
my $values = decode_json($_);
my $message = $values->{text};
open(F, '>> /tmp/log.txt');
print F "$basename received .text: $message\n";
close(F);
print STDOUT "\n";
}
単発プロセスのときは処理の成功と失敗を終了ステータスコード0と0以外で判断しましたが、このパターンでは標準入力1行に標準出力1行が対応し、標準出力が空行だと処理を成功、それ以外だとエラーとみなします。
このプログラムをSQSと繋げます(パターン1との違いは-d
オプションがexec
からstdio
になった部分です)。
cqi \
-l "sqs region: ap-northeast-1, queueUrl: https://SQS/QUEUE" \
-d "stdio programPath: /path/to/stdio.pl"
コンポーネント
Listener / Dispatcher
キューからメッセージを受け付けるListener
と、処理するプログラムに引き渡すDispatcher
に抽象化しています。
Listener
はメッセージをひとつずつ受信し、Dispatcher
に渡します。
Dispatcher
はメッセージを処理済みとして削除してよいかtrue
、削除せずリトライに回すかfalse
を返します。
Container
Listener
とDispatcher
の組み合わせをContainer
と呼びます。
Listener
とDispatcher
のライフサイクル管理や、メッセージの経路制御と標準化などコントローラー的な役割を果たします。
現在はこのContainer
が並列化の単位にもなっています。
次の例ではメッセージを処理するプロセスが最大4本、同時に稼動します。
cqi \
-l "sqs region: ap-northeast-1, queueUrl: https://SQS/QUEUE" \
-d "exec programPath: /path/to/exec.sh"
--containers 4
Listener
、Dispatcher
、Container
あとLogger
は拡張可能になってます。
詳しくはソースコードを見てみてください!
所感や負け惜しみ
- TypeScriptを始めてまともに使いましたが、想像以上にすごい! これはJavaScriptに戻れない。
- Lernaによる単一リポジトリに挑戦しましたが挫折しました。
- ほんとはgolangとかで作った方がよさげ。
- そもそも似たようなものがすでにあったのかも。
- テストコードもこれから。
- 並列化はClusterによるフォークと迷いましたが、処理は外部プロセス前提だからいいのかな。