LoginSignup
3
3

More than 5 years have passed since last update.

RabbitMQでPublisherとConsumer(C#編)

Last updated at Posted at 2018-05-08

始めに

前回「次はC#で書く」と書いていたことをかろうじて思い出したので書いてみる。ぶっちゃけ公式のtutorialsと大差ない。

※ RabbitMQ自体の解説はしません

構成

.Net Framework: 4.6.1(別に4.5系でもOK)
RabbitMQ.Client: 5.0
Newtonsoft.Json: 11.0

成果物

メッセージモデル

例によってモデルを作っておきます(SendとConsumedに分けてるのに特別な理由はないです)。

Message.cs
namespace DotNetRabbitMQExample
{
    public class SendMessage
    {
        public string Message { get; set; }

        public long Timestamp { get; set; }
    }

    public class ConsumedMessage
    {
        public string Message { get; set; }

        public long Timestamp { get; set; }
    }
}

パブリッシャー&コンシューマー

面倒くさくなったから今回はパブリッシャーとコンシューマー共に載せてしまいます。

Program.cs
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetRabbitMQExample
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("please input rabbitmq hostname");

            // Get ホスト名
            var hostname = Console.ReadLine();

            // Taskキャンセルトークン
            var tokenSource = new CancellationTokenSource();

            Console.WriteLine($"start .Net RabbitMQ Example. Ctl+C to exit");

            // ファクトリ生成
            var factory = new ConnectionFactory()
            {
                HostName = hostname
            };

            // パブリッシャータスク
            var pTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>(async(f, cancel) => {
                // コネクション&チャンネル生成
                using (var conn = f.CreateConnection())
                using (var channel = conn.CreateModel())
                {
                    // Exchange生成
                    channel.ExchangeDeclare("test", "fanout", false, true);

                    while(true)
                    {
                        // キャンセル待ち
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }

                        var msg = new SendMessage()
                        {
                            Message = "Hello",
                            Timestamp = DateTime.UtcNow.ToBinary()
                        };
                        var body = JsonConvert.SerializeObject(msg);

                        // Publish!!
                        try
                        {
                            channel.BasicPublish("test", "", null, Encoding.UTF8.GetBytes(body));
                            Console.WriteLine($"success send. message: {msg.Message}, timestamp: {msg.Timestamp}");
                        } catch (Exception ex)
                        {
                            Console.WriteLine($"failer send. reason: {ex.Message}");
                        }

                        await Task.Delay(10000);
                    }
                }
            })(factory, tokenSource.Token), tokenSource.Token);

            // コンシューマータスク
            var cTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>((f, cancel) => {
                // コネクション&チャンネル生成
                using (var conn = f.CreateConnection())
                using (var channel = conn.CreateModel())
                {
                    // Exchange生成
                    channel.ExchangeDeclare("test", "fanout", false, true);

                    // Queue生成
                    var queueName = channel.QueueDeclare().QueueName;

                    // Bind Queue
                    channel.QueueBind(queueName, "test", "");

                    // コンシューマー生成
                    var consumer = new EventingBasicConsumer(channel);

                    // 受信イベント定義
                    consumer.Received += (_, ea) =>
                    {
                        var msg = JsonConvert.DeserializeObject<ConsumedMessage>(Encoding.UTF8.GetString(ea.Body));
                        Console.WriteLine($"success consumed. message: {msg.Message}, timestamp: {msg.Timestamp}");
                    };

                    // コンシューマー登録
                    channel.BasicConsume(queueName, true, consumer);

                    while(true)
                    {
                        // キャンセル待ち   
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }
                    }
                }
            })(factory, tokenSource.Token), tokenSource.Token);


            // Ctl+C待機
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true;
                tokenSource.Cancel(); // Taskキャンセル
            };

            Task.WaitAll(pTask, cTask);

            Console.WriteLine("stop .Net RabbitMQ Example. press any key to close.");

            Console.ReadKey();
        }
    }
}

内容は前回のJavaクライアントと大差ありません。
var channel = conn.CreateModel()って何かチャンネルなのかモデルなのか直感的じゃないんですが・・・。あとAsyncEventingBasicConsumerなるものがソースにあるんですが、いまいち効果が分かんないので使ってません。

実行

実行するとこんな感じです。

please input rabbitmq hostname
your.rabbitmq.address // 例
start .Net RabbitMQ Example. Ctl+C to exit
success send. message: Hello, timestamp: 5248299395184651547
success consumed. message: Hello, timestamp: 5248299395184651547
success send. message: Hello, timestamp: 5248299395284769260
success consumed. message: Hello, timestamp: 5248299395284769260
success send. message: Hello, timestamp: 5248299395384921978
success consumed. message: Hello, timestamp: 5248299395384921978
stop .Net RabbitMQ Example. press any key to close.

終わりに

総じてクライアントを書く分にはKafkaより手軽感があります・・・くらいな感想しかない。

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3