C#
RabbitMQ

RabbitMQでPublisherとConsumer(C#編)

始めに

前回「次はC#で書く」と書いていたことをかろうじて思い出したので書いてみる。ぶっちゃけ公式のtutorialsと大差ない。

※ RabbitMQ自体の解説はしません

構成

.Net Framework: 4.6.1(別に4.5系でもOK)
RabbitMQ.Client: 5.0
Newtonsoft.Json: 11.0

成果物

https://github.com/lightstaff/DotNetRabbitMQExample

メッセージモデル

例によってモデルを作っておきます(SendとConsumedに分けてるのに特別な理由はないです)。

Message.cs
namespace DotNetRabbitMQExample
{
    public class SendMessage
    {
        public string Message { get; set; }

        public long Timestamp { get; set; }
    }

    public class ConsumedMessage
    {
        public string Message { get; set; }

        public long Timestamp { get; set; }
    }
}

パブリッシャー&コンシューマー

面倒くさくなったから今回はパブリッシャーとコンシューマー共に載せてしまいます。

Program.cs
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetRabbitMQExample
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("please input rabbitmq hostname");

            // Get ホスト名
            var hostname = Console.ReadLine();

            // Taskキャンセルトークン
            var tokenSource = new CancellationTokenSource();

            Console.WriteLine($"start .Net RabbitMQ Example. Ctl+C to exit");

            // ファクトリ生成
            var factory = new ConnectionFactory()
            {
                HostName = hostname
            };

            // パブリッシャータスク
            var pTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>(async(f, cancel) => {
                // コネクション&チャンネル生成
                using (var conn = f.CreateConnection())
                using (var channel = conn.CreateModel())
                {
                    // Exchange生成
                    channel.ExchangeDeclare("test", "fanout", false, true);

                    while(true)
                    {
                        // キャンセル待ち
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }

                        var msg = new SendMessage()
                        {
                            Message = "Hello",
                            Timestamp = DateTime.UtcNow.ToBinary()
                        };
                        var body = JsonConvert.SerializeObject(msg);

                        // Publish!!
                        try
                        {
                            channel.BasicPublish("test", "", null, Encoding.UTF8.GetBytes(body));
                            Console.WriteLine($"success send. message: {msg.Message}, timestamp: {msg.Timestamp}");
                        } catch (Exception ex)
                        {
                            Console.WriteLine($"failer send. reason: {ex.Message}");
                        }

                        await Task.Delay(10000);
                    }
                }
            })(factory, tokenSource.Token), tokenSource.Token);

            // コンシューマータスク
            var cTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>((f, cancel) => {
                // コネクション&チャンネル生成
                using (var conn = f.CreateConnection())
                using (var channel = conn.CreateModel())
                {
                    // Exchange生成
                    channel.ExchangeDeclare("test", "fanout", false, true);

                    // Queue生成
                    var queueName = channel.QueueDeclare().QueueName;

                    // Bind Queue
                    channel.QueueBind(queueName, "test", "");

                    // コンシューマー生成
                    var consumer = new EventingBasicConsumer(channel);

                    // 受信イベント定義
                    consumer.Received += (_, ea) =>
                    {
                        var msg = JsonConvert.DeserializeObject<ConsumedMessage>(Encoding.UTF8.GetString(ea.Body));
                        Console.WriteLine($"success consumed. message: {msg.Message}, timestamp: {msg.Timestamp}");
                    };

                    // コンシューマー登録
                    channel.BasicConsume(queueName, true, consumer);

                    while(true)
                    {
                        // キャンセル待ち   
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }
                    }
                }
            })(factory, tokenSource.Token), tokenSource.Token);


            // Ctl+C待機
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true;
                tokenSource.Cancel(); // Taskキャンセル
            };

            Task.WaitAll(pTask, cTask);

            Console.WriteLine("stop .Net RabbitMQ Example. press any key to close.");

            Console.ReadKey();
        }
    }
}

内容は前回のJavaクライアントと大差ありません。
var channel = conn.CreateModel()って何かチャンネルなのかモデルなのか直感的じゃないんですが・・・。あとAsyncEventingBasicConsumerなるものがソースにあるんですが、いまいち効果が分かんないので使ってません。

実行

実行するとこんな感じです。

please input rabbitmq hostname
your.rabbitmq.address // 例
start .Net RabbitMQ Example. Ctl+C to exit
success send. message: Hello, timestamp: 5248299395184651547
success consumed. message: Hello, timestamp: 5248299395184651547
success send. message: Hello, timestamp: 5248299395284769260
success consumed. message: Hello, timestamp: 5248299395284769260
success send. message: Hello, timestamp: 5248299395384921978
success consumed. message: Hello, timestamp: 5248299395384921978
stop .Net RabbitMQ Example. press any key to close.

終わりに

総じてクライアントを書く分にはKafkaより手軽感があります・・・くらいな感想しかない。