LoginSignup
3
4

More than 5 years have passed since last update.

[C#] RabbitMQ サンプルプログラム

Last updated at Posted at 2017-03-12

関連する記事 : http://qiita.com/muzudho1/items/5a36ecdcd9b0ba444577

雑で手短なコードを置いておく。じゃあ、MIT License で。https://opensource.org/licenses/MIT

サンプル・プログラム

概要

  • Windows 10 と Ubuntu16.04 で動いた
  • これを2つ起動させれば 手入力でプロセス間通信を使った文字列送受信のテストができる
  • このプログラムが対応していない操作をすると、間違ったまま進んだり、強制終了したりする

操作の流れ

  • Step 1. 削除したいキューAの名前と寿命を指定し、キューを削除するループ。空文字列入力で抜ける
  • Step 2. エンキューしたいキューBの名前と寿命を入力
  • Step 3. デキューしたいキューCの名前と寿命を入力。以降、キューCは監視、非同期割込み
  • Step 4. キューBにメッセージをエンキューするループ。[Ctrl]+[C]で抜ける

tamesi34_cs.cs

// OS  : Windows 10
// IDE : Visual Studio 2015
//       Install : NuGet   : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS  : Ubuntu 16.04
//       Compile : Command : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi34_cs.cs
//               : Command : chmod 755 tamesi34_cs.cs
//       Execute : Command : // フォアグラウンドで実行する
//                         : ./tamesi34_cs.exe
//       Check   : Command : // キューの中身の数を調べる
//                         : rabbitmqctl list_queues
//
// Library : RabbitMQ
//           Refference : Website : RabbitMQ http://www.rabbitmq.com/
//                      : Website : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                      : Website : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//--------------------------------------------------------------------------------
// tamesi34_cs.cs

// Ubuntu の RabbitMQ はソースのバージョンが古いのか、API が異なった。
// #define UBUNTU

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace UsagiMQ
{
    /// <summary>
    /// メッセージを エンキューします。
    /// キューの名前は指定してください。
    /// デキューは割込みを受け付けます。
    /// 
    /// 参照 : QueueDeclare (v1.0) http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
    /// 参照 : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
    /// 参照 : QueueDelete (v1.4) https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.QueueDelete(System.UInt16,System.String,System.Boolean,System.Boolean,System.Boolean)
    /// 参照 : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
    /// 参照 : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
    /// </summary>
    class Program
    {
        const int ENQUEUE_INDEX = 0;
        const int DEQUEUE_INDEX = 1;
        const int DELETEQUEUE_INDEX = 2;
        const int NUM_INDEX = 3;
        const string HOST_NAME = "localhost";
        static string[] QUEUE_NAMES = new string[NUM_INDEX];
        /// <summary>
        /// キューの寿命
        /// (0) durable    : RabbitMQが止まってもキューを残す
        /// (1) autodelete : コンシューマーが1人も接続していなかったら消す
        /// (2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
        /// (3) exclusive  : この接続でだけ使える。この接続が切れたら消す
        /// </summary>
        static int[] lifeSpans_queue = new int[NUM_INDEX];
        static bool[]
            durable_lifeSpans = new bool[NUM_INDEX],
            autodelete_lifeSpans = new bool[NUM_INDEX],
            passive_lifeSpans = new bool[NUM_INDEX],
            exclusive_lifeSpans = new bool[NUM_INDEX];
        /// <summary>
        /// 
        /// </summary>
        /// <param name="index_queue"></param>
        /// <param name="name_queue"></param>
        /// <param name="lifeSpan">
        /// (0) durable
        /// (1) autodelete
        /// (2) passive
        /// (3) exclusive
        /// </param>
        static void SetLifeSpan(int index_queue, string name_queue, int lifeSpan)
        {
            QUEUE_NAMES[index_queue] = name_queue;
            lifeSpans_queue[index_queue] = lifeSpan;

            // 一旦クリアー
            durable_lifeSpans[index_queue] = false;
            autodelete_lifeSpans[index_queue] = false;
            passive_lifeSpans[index_queue] = false;
            exclusive_lifeSpans[index_queue] = false;

            switch (lifeSpan)
            {
                case 0: // durable
                    durable_lifeSpans[index_queue] = true;
                    break;
                case 1: // autodelete
                    autodelete_lifeSpans[index_queue] = true;
                    break;
                case 3: // exclusive
                    exclusive_lifeSpans[index_queue] = true;
                    break;
                default: // passive
                    passive_lifeSpans[index_queue] = true;
                    break;
            }
        }


        public static ConnectionFactory GetFactory()
        {
            if (null == m_factory_)
            {
                m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
            }
            return m_factory_;
        }
        static ConnectionFactory m_factory_;

        public static IConnection GetConnection()
        {
            if (null == m_connection_)
            {
                m_connection_ = GetFactory().CreateConnection();
            }
            return m_connection_;
        }
        static IConnection m_connection_;

        public static IModel GetChannel(int index)
        {
            if (null == m_channels_[index])
            {
                m_channels_[index] = GetConnection().CreateModel();
#if UBUNTU
                // Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
                m_channels_[index].QueueDeclare(QUEUE_NAMES[index], passive_lifeSpans[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], false, null);
#else
                m_channels_[index].QueueDeclare(QUEUE_NAMES[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], null);
#endif
            }
            return m_channels_[index];
        }
        static IModel[] m_channels_ = new IModel[NUM_INDEX];

        public static EventingBasicConsumer GetConsumer(int index)
        {
            if (null == m_consumers_[index])
            {
#if UBUNTU
                // Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
                m_consumers_[index] = new EventingBasicConsumer();
#else
                m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif

            }
            return m_consumers_[index];
        }
        static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[NUM_INDEX];

        /// <summary>
        /// 受信できたときに割り込んでくる処理
        /// </summary>
#if UBUNTU
        public static BasicDeliverEventHandler GetDequeueHandler()
#else
        public static EventHandler<BasicDeliverEventArgs> GetDequeueHandler()
#endif
        {
            if (null == m_dequeueHandler_)
            {
#if UBUNTU
                m_dequeueHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
                m_dequeueHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
                {
                    byte[] body = ea.Body;
                    string message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("<---- [interrupt!] Dequeue(^q^) {0}", message);
                });
            }

            return m_dequeueHandler_;
        }

#if UBUNTU
        static BasicDeliverEventHandler m_dequeueHandler_;
#else
        static EventHandler<BasicDeliverEventArgs> m_dequeueHandler_;
#endif

        /// <summary>
        /// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
        /// </summary>
        static void CloseConnection()
        {
            if (null != m_connection_)
            {
                m_connection_.Close();
                m_connection_ = null;
            }
        }
        /// <summary>
        /// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
        /// </summary>
        static void CloseChannel(int index)
        {
            if (null != m_channels_[index])
            {
                m_channels_[index].Close();
                m_channels_[index] = null;
            }
        }

        static void Main(string[] args)
        {
            //----------------------------------------
            // Delete
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? > ");
                string queueName_delete = Console.ReadLine();
                if (""== queueName_delete.Trim())
                {
                    break;
                }

                Console.Write(@"削除するメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
                int lifeSpan_delete = int.Parse(Console.ReadLine());
                SetLifeSpan(DELETEQUEUE_INDEX, queueName_delete, lifeSpan_delete);
                uint result = DeleteQueue();
                Console.WriteLine(@"["+ queueName_delete + "]キューを削除したはずだぜ☆(^~^) result=["+ result + "]");
            }

            //----------------------------------------
            // Enqueue settings
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
                string queueName_enqueue = Console.ReadLine();

                Console.Write(@"エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
                int lifeSpan_enqueue;
                if(int.TryParse(Console.ReadLine(),out lifeSpan_enqueue))
                {
                    SetLifeSpan(ENQUEUE_INDEX, queueName_enqueue, lifeSpan_enqueue);
                    break;
                }
            }

            //----------------------------------------
            // Enqueue settings
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
                string queueName_dequeue = Console.ReadLine();

                Console.Write(@"デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
                int lifeSpan_dequeue;
                if(int.TryParse(Console.ReadLine(),out lifeSpan_dequeue))
                {
                    StartConsume(queueName_dequeue, lifeSpan_dequeue);
                    break;
                }
            }

            Console.Write(@"終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > ");
            for (;;)
            {
                // "Hello World!" などを入力
                string line = Console.ReadLine();
                Enqueue(line);
                Console.Write(@"Enqueue? > ");
            }

            // ここには来ない
            // CloseConnection();
        }

        static uint DeleteQueue()
        {
            IModel channel = GetChannel(DELETEQUEUE_INDEX);

#if UBUNTU
            // Ubuntuでは何故か 昔の .Net 用の引数 5 つのやつの変則版になっている。
            // uint QueueDelete(ushort ticket, string queue, bool ifUnused, bool ifEmpty, bool nowait)
            uint result = channel.QueueDelete( QUEUE_NAMES[DELETEQUEUE_INDEX],true,true,true);
#else
            uint result = channel.QueueDelete(QUEUE_NAMES[DELETEQUEUE_INDEX],true,true);
#endif

            // 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
            CloseChannel(DELETEQUEUE_INDEX);
            return result;
        }

        static void Enqueue(string message)
        {
            IModel channel = GetChannel(ENQUEUE_INDEX);

            byte[] body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", QUEUE_NAMES[ENQUEUE_INDEX], null, body);

            Console.WriteLine(" Enqueue(^q^) {0}", message);

            // 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
            CloseChannel(ENQUEUE_INDEX);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="name_queue">メッセージ・キューの名前</param>
        /// <param name="lifeSpan_queue">既存のメッセージ・キューの場合、メッセージ・キューの設定は合わせる必要がある。
        /// (0) durable    : RabbitMQが止まってもキューを残す
        /// (1) autodelete : コンシューマーが1人も接続していなかったら消す
        /// (2) passive    : キューが存在するかどうかチェックするだけで、中身を見ない時はこれ
        /// (3) exclusive  : この接続でだけ使える。この接続が切れたら消す
        /// </param>
        static void StartConsume(string name_queue, int lifeSpan_queue)
        {
            SetLifeSpan(DEQUEUE_INDEX, name_queue, lifeSpan_queue);

            IModel channel = GetChannel(DEQUEUE_INDEX);
            EventingBasicConsumer consumer = GetConsumer(DEQUEUE_INDEX);

            // 受信できたときに割り込んでくる処理
            consumer.Received += GetDequeueHandler();

#if UBUNTU
            // Ubuntuでは何故か引数が 5 個のやつになっている。
            channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, "", null, consumer);
#else
            channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, consumer);
#endif
            // 終了はさせない
            // consumer.Received -= GetReceiveHandler();
            // CloseChannel(DEQUEUE_INDEX);
        }
    }
}
3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4