LoginSignup
1
0

More than 3 years have passed since last update.

キューワーカーの標準化コンセプト CQI: Common Queue Interface inspired by CGI

Posted at

CGIのキュー版を作ってみた

各種キューメッセージを標準入出力に引き渡す仕組みにより、言語を選ばず手軽にキューメッセージ駆動のワーカーアプリケーションを実装できます。

とりあえず自分が使うものをコンセプトとして公表してみたという感じですが、新たなアイデアや実装のきっかけになれば幸いです。

NodeJSがインストールされていれば次のコマンドでCLIコマンドをインストールできます。

npm i -g cqi-cli

動機

SQSをよく使うのですが、ワーカーアプリケーションの実装をいろいろな言語で手軽にできたらいいなと思ったのが発端です。
モチーフは古き良きCGIです。

  • 標準入出力にマッピングすれば言語を選ばない
  • ライブラリ不要でもキューとの連携ができる
  • メッセージキューシステムの切り替えも容易

概要

メッセージモデル

メッセージは構造化データをJSONでシリアライズした1行のテキストであることを前提とします。

{"text":"the message"}

キューのシステムによってさまざまなメタデータが使えますが、標準化のため極力シンプルにしてます。

パターン1: キューメッセージごとに単発でプロセスを起動

例えば次のように標準入力が受け取ったJSONのtextというエントリを抜き出し、/tmp/log.txtに追記するプログラムを記述します。
メッセージを正常に消費したことを終了ステータスコード0で示します。

もし終了ステータスコードが0以外の場合はエラーが起きたとしてメッセージを削除せず、リトライなどに回されます。

exec.sh
#!/bin/sh

MESSAGE=$(cat - | jq -r '.text')
echo "$(basename $0) received .text: $MESSAGE" >> /tmp/log.txt
# exit 0

このプログラムをSQSに繋げるには次のように記述します。-lListener-dDispatcherの略で、どこから受け取ったメッセージをどこで処理するかをパラメータとして記述します。

フォーマットは、<コンポーネント名> <jsonicによる設定値>としています。

cqi \
  -l "sqs region: ap-northeast-1, queueUrl: https://SQS/QUEUE" \
  -d "exec programPath: /path/to/exec.sh"

次のように-lオプションを変更すると、SQSではなく、REPLインターフェースを入力にできます。デバッグに便利です。

cqi \
  -l "repl" \
  -d "exec programPath: /path/to/exec.sh"
# -l のデフォルトはreplなので -l "repl" は省略可

逆に、-dオプションを次のように変更するとSQSからのメッセージを標準エラーログに出力します。これもデバッグ用です。

cqi \
  -l "sqs region: ap-northeast-1, queueUrl: https://SQS/QUEUE" \
  -d "echo"
# -d のデフォルトはechoなので -d "echo" は省略可

パターン2 サブプロセスと標準入出力で連続通信

メッセージごとに都度プロセスを起動するのは実行コストが高いので、事前にサブプロセスを起動しておき、メッセージを標準入力に1行ずつ渡す方法もあります。

パターン1がCGIだとしたら、こちらはFastCGIです。消費側をPerlで書くと次のような感じです。while(<STDIN>)で標準入力からデータを受け続け、同じく/tmp/log.txtに追記するだけのプログラムです。

stdio.pl
#!/usr/bin/perl

use File::Basename;
use JSON 'decode_json';

chdir dirname($0);
my $basename = basename($0);

local $| = 1;
while(<STDIN>) {
  chomp;
  my $values = decode_json($_);
  my $message = $values->{text};

  open(F, '>> /tmp/log.txt');
  print F "$basename received .text: $message\n";
  close(F);
  print STDOUT "\n";
}

単発プロセスのときは処理の成功と失敗を終了ステータスコード0と0以外で判断しましたが、このパターンでは標準入力1行に標準出力1行が対応し、標準出力が空行だと処理を成功、それ以外だとエラーとみなします。

このプログラムをSQSと繋げます(パターン1との違いは-dオプションがexecからstdioになった部分です)。

cqi \
  -l "sqs region: ap-northeast-1, queueUrl: https://SQS/QUEUE" \
  -d "stdio programPath: /path/to/stdio.pl"

コンポーネント

Listener / Dispatcher

キューからメッセージを受け付けるListenerと、処理するプログラムに引き渡すDispatcherに抽象化しています。
Listenerはメッセージをひとつずつ受信し、Dispatcherに渡します。
Dispatcherはメッセージを処理済みとして削除してよいかtrue、削除せずリトライに回すかfalseを返します。

Container

ListenerDispatcherの組み合わせをContainerと呼びます。
ListenerDispatcherのライフサイクル管理や、メッセージの経路制御と標準化などコントローラー的な役割を果たします。

現在はこのContainerが並列化の単位にもなっています。
次の例ではメッセージを処理するプロセスが最大4本、同時に稼動します。

cqi \
  -l "sqs region: ap-northeast-1, queueUrl: https://SQS/QUEUE" \
  -d "exec programPath: /path/to/exec.sh"
  --containers 4

ListenerDispatcherContainerあとLoggerは拡張可能になってます。
詳しくはソースコードを見てみてください!

所感や負け惜しみ

  • TypeScriptを始めてまともに使いましたが、想像以上にすごい! これはJavaScriptに戻れない。
  • Lernaによる単一リポジトリに挑戦しましたが挫折しました。
  • ほんとはgolangとかで作った方がよさげ。
  • そもそも似たようなものがすでにあったのかも。
  • テストコードもこれから。
  • 並列化はClusterによるフォークと迷いましたが、処理は外部プロセス前提だからいいのかな。
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0