はじめに
MQを使ったリアルタイムなワーカー処理をやりたかった。
そして多少の運用ならほぼ無料で使えるAmazonSQSをワーカーのMQとして使ってみたかった。
そんな気持ちになったので試してみる。
実現方法について
多分tomcatなりでAPIサーバーを立ててAWSのElastic BeanstalkのWorker tier使うのがベスト。SQSのめんどくさいところもよしなにやってくれてAPIの文脈でリトライの機構も実装できるのでめっちゃ楽。
※結局Beanstalkは検証したわけでもないので上記は全て個人的な感想です
けれども、
- インフラ管理者が導入を認めてくれない
- SQSやEC2インスタンスを利用するわけではない(本題関係なくなる……)
- 1インスタンスに色んな機能を詰め込んでコストを下げたい、という上からの要望が入る
- 共通の監査ツールが動かせない
といった様々な要因でBeanstalkを使えないパターンは往々にしてある。今回は番号は伏せるけどそんな状況にブチ当たったので、普通にEC2の上にワーカープロセスを動かす方法で実現してみる。
その際色んな紆余曲折があったけど、SQSをキューイングするワーカーを作る部分にのみ着目。
Amazon SQSの注意点
SQSは以下の特性があるのと、Javaで各MQでそれを吸収したライブラリ的なものは(当時見た感じ)なかったので自前でなんとかする必要がある。
- Fi-Foではない。順番を意識する必要のある要件には使えない
- 同じメッセージが複数届くことがある
- reseiveした後deleteを渡さないと同じメッセージが設定された時間後にまた届く
SQSを扱うクラスを用意する
とりあえずコネクションを取る部分と受信、削除の機能だけ用意する。
AWS系は共通で、ProfileCredentialsProvider
を使うことで.aws/credentials
の認証情報を利用出来るようになるのでそれを使う。リポジトリに鍵情報を置いておくとgithubに上げた時に悲惨なことになる。
受信の部分は↓のソースのような感じで、上限である10件をまとめて拾って、取れるまでずっとreceiveのrequestを投げ続けるような感じで作ってる。
public class SQSService{
private String url;
private AmazonSQSClient client;
public SQSService(String url){
AWSCredentials awsCredentials = new ProfileCredentialsProvider().getCredentials();
this.client = new AmazonSQSClient(awsCredentials);
this.url = url;
}
public List<Message> receiveMessages(String queue) {
String queueUrl = createQueueUrl(queue);
ReceiveMessageResult result = null;
while (result == null || result.getMessages().size() == 0) {
// ※例外系は行数の都合スルー
ReceiveMessageRequest req = new ReceiveMessageRequest(queueUrl);
req.withMaxNumberOfMessages(10);
result = this.client.receiveMessage(req);
}
return result.getMessages();
}
public void deleteMessage(String queue, Message message){
String queueUrl = createQueueUrl(queue);
DeleteMessageRequest delMsgRequest = new DeleteMessageRequest(queueUrl, message.getReceiptHandle());
this.client.deleteMessage(delMsgRequest);
}
}
呼び出しを書く
while(true)でひたすら回し続ける。SIGHUPとかSIGTERMとか受け取ったときにフラグをfalseにするようなことをしたい。
isNotDuplicateMessageは上述の重複回避。Redisのsetnxを使って重複チェックをしている。成功した場合期限を設定するのが筋だが、ここでは割愛。
this.executorとTaskに関しては後述する。
public void execute(String url, String queue){
SQSService service = new SQSService(url);
while(true){
List<Message> messages = service.receiveMessages(queue);
for(Message message : messages){
if (isNotDuplicateMessage(message)) {
this.executor.execute(new Task(message, service, queue));
}
}
}
}
private boolean isNotDuplicateMessage(Message message) {
String value = Long.toString(System.currentTimeMillis());
try(Jedis jedis = pool.getResource()) {
return (jedis.setnx("lock::" + message.getMessageId, value) == 1L);
}
}
複数スレッドで処理させる
理想としては1スレッドでキューイングして、メッセージごとにスレッドを割り当てる、ということをやりたい。受けたメッセージの分スレッドを作っているとまとめて降ってきた時にシステムが落ちることが容易に考えられるので、スレッドプールを用意する。
スレッド全部処理中で新規にタスクを割り当てできなかった時に待ちたかったので、詰まった場合SynchronousQueueにputして待つような実装にしてみた。
this.executor = new ThreadPoolExecutor(
threadMin,threadMax,0,
TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){
try{
executor.getQueue().put(r);
}catch(InterruptedException e){
//なにか
}
}
}
);
最初はExecutorServiceを使って実装していたけど、この場合プロセスがうっかり止まってSQSにメッセージがいっぱい溜まった時にまとめてキューイングしてメモリ食い潰して死ぬ可能性があった。こわい。
メッセージの削除と再送を決める
SQSではdeleteMessageを送ることで一般的なMQにおけるackをすることができる。ということで、実処理を行った結果を見てackするか否かを決める。
ということで、executorに渡したRunnable拡張クラスにその辺を持たせる。各種スレッドタスクでSQS扱う1インスタンスを使い回すような形になっているのが不安。
public class Task implements Runnable {
final private Message message;
final private SQSService service;
final private String queue;
public Task(Message message, SQSService service, String queue){
this.message = message; this.service = service; this.queue = queue;
}
@Override
public void run(){
try{
do(message.getBody());// 実処理
service.deleteMessage(queue, message);
}catch(RetryableException e){
// 何もしない
logger.warn(e);
}catch(FatalException e){
service.deleteMessage(queue, message);
logger.error(e);
}
}
}
動かした
ソースがいろいろバラバラになってるけど、こんなソースを組み合わせてとりあえず動くことは動いた。少なくとも1年は妙なエラーなく動き続けてる。
本当にそれでいいのか……?という疑問を載せつつ。