背景
Hubotを使ったBot開発をしている際にBot間でのメッセージングをするためにRabbitMQを利用しようと思い、Topicsのチュートリアルをベースに開発したところうまくいかなかった。
そこで色々試行錯誤とamqplibのソースコードを読んで解決に至った。
結論
先に結論を言っておくとamqplibでTopicを使う際には
- ExchangeにPublishする際はPromiseで書く
- QueueにKeyをbindingしてConsumeしてくるところはcallback関数で書く
ハマったところ
1. ExchangeにPublishする
チュートリアルではcallback関数で記載されているが、手元の環境ではExchangeにうまくデータが飛んでおらずrabbitmqctl list_queues
をしてもbindingされたQueueにデータは入っていない状態だった。
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var ex = 'topic_logs';
var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'anonymous.info';
var msg = args.slice(1).join(' ') || 'Hello World!';
ch.assertExchange(ex, 'topic', {durable: false});
ch.publish(ex, key, new Buffer(msg));
console.log(" [x] Sent %s:'%s'", key, msg);
});
setTimeout(function() { conn.close(); process.exit(0) }, 500);
});
なんとなく原因が非同期処理のような気配があったので、promiseを使って同期処理にしたところ無事にメッセージが飛ばせることがわかった。
(HubotなのでCoffeeScriptで書いていた)
#!/usr/bin/env node
amqp = require('amqplib')
amqp.connect('amqp://localhost')
.then((conn) ->
conn.createChannel().then((ch) ->
ex = 'topic_logs'
args = process.argv.slice(2);
key = (args.length > 0) ? args[0] : 'anonymous.info';
msg = args.slice(1).join(' ') || 'Hello World!';
ch.assertExchange(ex, 'topic', {durable: false});
ch.publish(ex, key, new Buffer(msg));
console.log(" [x] Sent %s:'%s'", key, msg);
2. QueueにKeyをBindingしてConsumeする
先程1(ExchangeにPublish)する際にpromiseでうまく行ったので、今度もPromiseで書きたくなるところですが、実際書いてみたところ動かない・・・。
色々デバッグしてみたりソースを読んだところ、どうもPromiseのところがうまく動いていないようなのでCallback関数で書き直す(というかチュートリアルほぼそのまま)。
なおソースには
// NB we want the callback to be run synchronously, so that we've
// registered the consumerTag before any messages can arrive.
とMessageが来る前にconsumerTagを登録するって言ってるけど登録できてなくて"Unknown consumer"って言われた。。。
なのでこちらはRabbitMQのチュートリアルで動くと思われる。
思われるっていうのは自分のやつはチュートリアルを真似してCallbackで書いたけど下のソースでは動かしてないから。(自分の書いた動くソースはこちら)
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length == 0) {
console.log("Usage: receive_logs_topic.js <facility>.<severity>");
process.exit(1);
}
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var ex = 'topic_logs';
ch.assertExchange(ex, 'topic', {durable: false});
ch.assertQueue('', {exclusive: true}, function(err, q) {
console.log(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach(function(key) {
ch.bindQueue(q.queue, ex, key);
});
ch.consume(q.queue, function(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, {noAck: true});
});
});
});
まとめ
amqplibでTopicを使う際には
- ExchangeにPublishする際はPromiseで書く
- QueueにKeyをbindingしてConsumeしてくるところはcallback関数で書く
あと非同期処理のデバッグ難しい。。。