関連する記事 : http://qiita.com/muzudho1/items/5a36ecdcd9b0ba444577
雑で手短なコードを置いておく。じゃあ、MIT License で。https://opensource.org/licenses/MIT
サンプル・プログラム
概要
- Windows 10 と Ubuntu16.04 で動いた
- これを2つ起動させれば 手入力でプロセス間通信を使った文字列送受信のテストができる
- このプログラムが対応していない操作をすると、間違ったまま進んだり、強制終了したりする
操作の流れ
- Step 1. 削除したいキューAの名前と寿命を指定し、キューを削除するループ。空文字列入力で抜ける
- Step 2. エンキューしたいキューBの名前と寿命を入力
- Step 3. デキューしたいキューCの名前と寿命を入力。以降、キューCは監視、非同期割込み
- Step 4. キューBにメッセージをエンキューするループ。[Ctrl]+[C]で抜ける
tamesi34_cs.cs
// OS : Windows 10
// IDE : Visual Studio 2015
// Install : NuGet : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS : Ubuntu 16.04
// Compile : Command : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi34_cs.cs
// : Command : chmod 755 tamesi34_cs.cs
// Execute : Command : // フォアグラウンドで実行する
// : ./tamesi34_cs.exe
// Check : Command : // キューの中身の数を調べる
// : rabbitmqctl list_queues
//
// Library : RabbitMQ
// Refference : Website : RabbitMQ http://www.rabbitmq.com/
// : Website : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
// : Website : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//--------------------------------------------------------------------------------
// tamesi34_cs.cs
// Ubuntu の RabbitMQ はソースのバージョンが古いのか、API が異なった。
// #define UBUNTU
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace UsagiMQ
{
/// <summary>
/// メッセージを エンキューします。
/// キューの名前は指定してください。
/// デキューは割込みを受け付けます。
///
/// 参照 : QueueDeclare (v1.0) http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
/// 参照 : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
/// 参照 : QueueDelete (v1.4) https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.QueueDelete(System.UInt16,System.String,System.Boolean,System.Boolean,System.Boolean)
/// 参照 : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
/// 参照 : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
/// </summary>
class Program
{
const int ENQUEUE_INDEX = 0;
const int DEQUEUE_INDEX = 1;
const int DELETEQUEUE_INDEX = 2;
const int NUM_INDEX = 3;
const string HOST_NAME = "localhost";
static string[] QUEUE_NAMES = new string[NUM_INDEX];
/// <summary>
/// キューの寿命
/// (0) durable : RabbitMQが止まってもキューを残す
/// (1) autodelete : コンシューマーが1人も接続していなかったら消す
/// (2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
/// (3) exclusive : この接続でだけ使える。この接続が切れたら消す
/// </summary>
static int[] lifeSpans_queue = new int[NUM_INDEX];
static bool[]
durable_lifeSpans = new bool[NUM_INDEX],
autodelete_lifeSpans = new bool[NUM_INDEX],
passive_lifeSpans = new bool[NUM_INDEX],
exclusive_lifeSpans = new bool[NUM_INDEX];
/// <summary>
///
/// </summary>
/// <param name="index_queue"></param>
/// <param name="name_queue"></param>
/// <param name="lifeSpan">
/// (0) durable
/// (1) autodelete
/// (2) passive
/// (3) exclusive
/// </param>
static void SetLifeSpan(int index_queue, string name_queue, int lifeSpan)
{
QUEUE_NAMES[index_queue] = name_queue;
lifeSpans_queue[index_queue] = lifeSpan;
// 一旦クリアー
durable_lifeSpans[index_queue] = false;
autodelete_lifeSpans[index_queue] = false;
passive_lifeSpans[index_queue] = false;
exclusive_lifeSpans[index_queue] = false;
switch (lifeSpan)
{
case 0: // durable
durable_lifeSpans[index_queue] = true;
break;
case 1: // autodelete
autodelete_lifeSpans[index_queue] = true;
break;
case 3: // exclusive
exclusive_lifeSpans[index_queue] = true;
break;
default: // passive
passive_lifeSpans[index_queue] = true;
break;
}
}
public static ConnectionFactory GetFactory()
{
if (null == m_factory_)
{
m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
}
return m_factory_;
}
static ConnectionFactory m_factory_;
public static IConnection GetConnection()
{
if (null == m_connection_)
{
m_connection_ = GetFactory().CreateConnection();
}
return m_connection_;
}
static IConnection m_connection_;
public static IModel GetChannel(int index)
{
if (null == m_channels_[index])
{
m_channels_[index] = GetConnection().CreateModel();
#if UBUNTU
// Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
m_channels_[index].QueueDeclare(QUEUE_NAMES[index], passive_lifeSpans[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], false, null);
#else
m_channels_[index].QueueDeclare(QUEUE_NAMES[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], null);
#endif
}
return m_channels_[index];
}
static IModel[] m_channels_ = new IModel[NUM_INDEX];
public static EventingBasicConsumer GetConsumer(int index)
{
if (null == m_consumers_[index])
{
#if UBUNTU
// Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
m_consumers_[index] = new EventingBasicConsumer();
#else
m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif
}
return m_consumers_[index];
}
static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[NUM_INDEX];
/// <summary>
/// 受信できたときに割り込んでくる処理
/// </summary>
#if UBUNTU
public static BasicDeliverEventHandler GetDequeueHandler()
#else
public static EventHandler<BasicDeliverEventArgs> GetDequeueHandler()
#endif
{
if (null == m_dequeueHandler_)
{
#if UBUNTU
m_dequeueHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
m_dequeueHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
{
byte[] body = ea.Body;
string message = Encoding.UTF8.GetString(body);
Console.WriteLine("<---- [interrupt!] Dequeue(^q^) {0}", message);
});
}
return m_dequeueHandler_;
}
#if UBUNTU
static BasicDeliverEventHandler m_dequeueHandler_;
#else
static EventHandler<BasicDeliverEventArgs> m_dequeueHandler_;
#endif
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseConnection()
{
if (null != m_connection_)
{
m_connection_.Close();
m_connection_ = null;
}
}
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseChannel(int index)
{
if (null != m_channels_[index])
{
m_channels_[index].Close();
m_channels_[index] = null;
}
}
static void Main(string[] args)
{
//----------------------------------------
// Delete
//----------------------------------------
for (;;)
{
Console.Write(@"削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? > ");
string queueName_delete = Console.ReadLine();
if (""== queueName_delete.Trim())
{
break;
}
Console.Write(@"削除するメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
int lifeSpan_delete = int.Parse(Console.ReadLine());
SetLifeSpan(DELETEQUEUE_INDEX, queueName_delete, lifeSpan_delete);
uint result = DeleteQueue();
Console.WriteLine(@"["+ queueName_delete + "]キューを削除したはずだぜ☆(^~^) result=["+ result + "]");
}
//----------------------------------------
// Enqueue settings
//----------------------------------------
for (;;)
{
Console.Write(@"エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
string queueName_enqueue = Console.ReadLine();
Console.Write(@"エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
int lifeSpan_enqueue;
if(int.TryParse(Console.ReadLine(),out lifeSpan_enqueue))
{
SetLifeSpan(ENQUEUE_INDEX, queueName_enqueue, lifeSpan_enqueue);
break;
}
}
//----------------------------------------
// Enqueue settings
//----------------------------------------
for (;;)
{
Console.Write(@"デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
string queueName_dequeue = Console.ReadLine();
Console.Write(@"デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
int lifeSpan_dequeue;
if(int.TryParse(Console.ReadLine(),out lifeSpan_dequeue))
{
StartConsume(queueName_dequeue, lifeSpan_dequeue);
break;
}
}
Console.Write(@"終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > ");
for (;;)
{
// "Hello World!" などを入力
string line = Console.ReadLine();
Enqueue(line);
Console.Write(@"Enqueue? > ");
}
// ここには来ない
// CloseConnection();
}
static uint DeleteQueue()
{
IModel channel = GetChannel(DELETEQUEUE_INDEX);
#if UBUNTU
// Ubuntuでは何故か 昔の .Net 用の引数 5 つのやつの変則版になっている。
// uint QueueDelete(ushort ticket, string queue, bool ifUnused, bool ifEmpty, bool nowait)
uint result = channel.QueueDelete( QUEUE_NAMES[DELETEQUEUE_INDEX],true,true,true);
#else
uint result = channel.QueueDelete(QUEUE_NAMES[DELETEQUEUE_INDEX],true,true);
#endif
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel(DELETEQUEUE_INDEX);
return result;
}
static void Enqueue(string message)
{
IModel channel = GetChannel(ENQUEUE_INDEX);
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", QUEUE_NAMES[ENQUEUE_INDEX], null, body);
Console.WriteLine(" Enqueue(^q^) {0}", message);
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel(ENQUEUE_INDEX);
}
/// <summary>
///
/// </summary>
/// <param name="name_queue">メッセージ・キューの名前</param>
/// <param name="lifeSpan_queue">既存のメッセージ・キューの場合、メッセージ・キューの設定は合わせる必要がある。
/// (0) durable : RabbitMQが止まってもキューを残す
/// (1) autodelete : コンシューマーが1人も接続していなかったら消す
/// (2) passive : キューが存在するかどうかチェックするだけで、中身を見ない時はこれ
/// (3) exclusive : この接続でだけ使える。この接続が切れたら消す
/// </param>
static void StartConsume(string name_queue, int lifeSpan_queue)
{
SetLifeSpan(DEQUEUE_INDEX, name_queue, lifeSpan_queue);
IModel channel = GetChannel(DEQUEUE_INDEX);
EventingBasicConsumer consumer = GetConsumer(DEQUEUE_INDEX);
// 受信できたときに割り込んでくる処理
consumer.Received += GetDequeueHandler();
#if UBUNTU
// Ubuntuでは何故か引数が 5 個のやつになっている。
channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, "", null, consumer);
#else
channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, consumer);
#endif
// 終了はさせない
// consumer.Received -= GetReceiveHandler();
// CloseChannel(DEQUEUE_INDEX);
}
}
}