LoginSignup
0
0

More than 5 years have passed since last update.

amqplibを使ったTopicのメッセージングでハマったのと解決策

Last updated at Posted at 2018-03-17

背景

Hubotを使ったBot開発をしている際にBot間でのメッセージングをするためにRabbitMQを利用しようと思い、Topicsのチュートリアルをベースに開発したところうまくいかなかった。

そこで色々試行錯誤とamqplibのソースコードを読んで解決に至った。

結論

先に結論を言っておくとamqplibでTopicを使う際には

  1. ExchangeにPublishする際はPromiseで書く
  2. QueueにKeyをbindingしてConsumeしてくるところはcallback関数で書く

ハマったところ

1. ExchangeにPublishする

チュートリアルではcallback関数で記載されているが、手元の環境ではExchangeにうまくデータが飛んでおらずrabbitmqctl list_queuesをしてもbindingされたQueueにデータは入っていない状態だった。

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var ex = 'topic_logs';
    var args = process.argv.slice(2);
    var key = (args.length > 0) ? args[0] : 'anonymous.info';
    var msg = args.slice(1).join(' ') || 'Hello World!';

    ch.assertExchange(ex, 'topic', {durable: false});
    ch.publish(ex, key, new Buffer(msg));
    console.log(" [x] Sent %s:'%s'", key, msg);
  });

  setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

なんとなく原因が非同期処理のような気配があったので、promiseを使って同期処理にしたところ無事にメッセージが飛ばせることがわかった。
(HubotなのでCoffeeScriptで書いていた)

#!/usr/bin/env node

amqp = require('amqplib')
amqp.connect('amqp://localhost')
  .then((conn) ->
    conn.createChannel().then((ch) ->
      ex = 'topic_logs'
      args = process.argv.slice(2);
      key = (args.length > 0) ? args[0] : 'anonymous.info';
      msg = args.slice(1).join(' ') || 'Hello World!';

      ch.assertExchange(ex, 'topic', {durable: false});
      ch.publish(ex, key, new Buffer(msg));
      console.log(" [x] Sent %s:'%s'", key, msg);

2. QueueにKeyをBindingしてConsumeする

先程1(ExchangeにPublish)する際にpromiseでうまく行ったので、今度もPromiseで書きたくなるところですが、実際書いてみたところ動かない・・・。
色々デバッグしてみたりソースを読んだところ、どうもPromiseのところがうまく動いていないようなのでCallback関数で書き直す(というかチュートリアルほぼそのまま)。

なおソースには

// NB we want the callback to be run synchronously, so that we've
// registered the consumerTag before any messages can arrive.

とMessageが来る前にconsumerTagを登録するって言ってるけど登録できてなくて"Unknown consumer"って言われた。。。

なのでこちらはRabbitMQのチュートリアルで動くと思われる。

思われるっていうのは自分のやつはチュートリアルを真似してCallbackで書いたけど下のソースでは動かしてないから。(自分の書いた動くソースはこちら


#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

var args = process.argv.slice(2);

if (args.length == 0) {
  console.log("Usage: receive_logs_topic.js <facility>.<severity>");
  process.exit(1);
}

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var ex = 'topic_logs';

    ch.assertExchange(ex, 'topic', {durable: false});

    ch.assertQueue('', {exclusive: true}, function(err, q) {
      console.log(' [*] Waiting for logs. To exit press CTRL+C');

      args.forEach(function(key) {
        ch.bindQueue(q.queue, ex, key);
      });

      ch.consume(q.queue, function(msg) {
        console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
      }, {noAck: true});
    });
  });
});

まとめ

amqplibでTopicを使う際には

  1. ExchangeにPublishする際はPromiseで書く
  2. QueueにKeyをbindingしてConsumeしてくるところはcallback関数で書く

あと非同期処理のデバッグ難しい。。。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0