LoginSignup
1
3

More than 5 years have passed since last update.

KafkaでProducerとConsumer(C#編)

Posted at

始めに

Kafkaを使うことになりそうなのでいくつかの言語ごとにProducerとConsumerを記述してみる。

前々回はGolang、前回はScalaを試してみました。

今回はC#編です。C#ではGolang編で試せなかったconfluent製のconfluent-kafka-dotnetを利用してみます(こちらはNuGetで組み込まれるためlibrdkafkaの直接インストールが不要です)。

*今回もKafka自体の解説はしません。

構成

VisualStudio: 2017
.NET Framework: 4.5
confluent-kafka-dotnet: 0.11
Newtonsoft.Json: 11.0

成果物

プロジェクト作成&NuGet

まずは標準のコンソールアプリを新規作成し、NuGetパッケージの管理から「Confluent.Kafka」とメッセージのシリアライズ・デシリアライズ用に「Newtonsoft.Json」をインストールします。

メッセージオブジェクト

送受信メッセージを定義しておきます。中身は前回までと一緒です。

Message.cs
namespace DotNetKafkaExample
{
    // 送信メッセージ
    class SendMessage
    {
        public string Message { get; set; }

        public long Timestamp { get; set; }

    }

    // 受信メッセージ
    class ConsumedMessage
    {
        public string Message { get; set; }

        public long Timestamp { get; set; }

    }
}

Producer

まずはプロデューサーからexampleを参考に書いてみます。

Program.cs
// usingは省略

namespace DotNetKafkaExample
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("please input bootstrap servers.");

            var bootstrapServers = Console.ReadLine();

            // Taskキャンセルトークン
            var tokenSource = new CancellationTokenSource();

            Console.WriteLine($"start .Net Kafka Example. Ctl+C to exit");

            // プロデューサータスク
            var pTask = Task.Run(() => new Action<string, CancellationToken>(async (bs, cancel) =>
            {
                var cf = new Dictionary<string, object> {
                    { "bootstrap.servers", bs }
                };

                using (var producer = new Producer<string, string>(cf, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
                {
                    producer.OnError += (_, error) => Console.WriteLine($"fail send. reason: {error.Reason}");

                    while (true)
                    {
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }

                        var timestamp = DateTime.UtcNow.ToBinary();

                        var pa = producer.ProduceAsync("test.C", timestamp.ToString(), JsonConvert.SerializeObject(new SendMessage
                        {
                            Message = "Hello",
                            Timestamp = timestamp
                        }));

                        await pa.ContinueWith(t => Console.WriteLine($"success send. message: {t.Result.Value}"));
                        await Task.Delay(10000);
                    }

                    // 停止前処理
                    producer.Flush(TimeSpan.FromMilliseconds(10000));
                }
            })(bootstrapServers, tokenSource.Token), tokenSource.Token);

            // Ctl+C待機
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true;
                tokenSource.Cancel(); // Taskキャンセル
            };

            Task.WaitAll(pTask, cTask);

            Console.WriteLine("stop .Net Kafka Example. press any key to close.");

            Console.ReadKey();
        }
    }
}

前回までと違い、Kafkaのアドレスはコンソールから受け付ける形にしています。
プロデューサーの動作をTask化して10秒に一度メッセージを送信しています。コンソールのCtl+Cを待ってCancellationTokenSourceからCancelを通知します。
久しぶりにC#を書いたんですがクロージャーの扱いってどうなんだっけって感じです・・・。

Consumer

続いてコンシューマーを同じくexampleを参考に書いてみます。

Program.cs
// usingは省略

namespace DotNetKafkaExample
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("please input bootstrap servers.");

            var bootstrapServers = Console.ReadLine();

            // Taskキャンセルトークン
            var tokenSource = new CancellationTokenSource();

            Console.WriteLine($"start .Net Kafka Example. Ctl+C to exit");

            // コンシューマータスク
            var cTask = Task.Run(() => new Action<string, CancellationToken>((bs, cancel) =>
            {
                var cf = new Dictionary<string, object> {
                    { "bootstrap.servers", bs },
                    { "group.id", "test" },
                    { "enable.auto.commit", false },
                    { "default.topic.config", new Dictionary<string, object>()
                        {
                            { "auto.offset.reset", "earliest" }
                        }
                    }
                };

                using (var consumer = new Consumer<string, string>(cf, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
                {
                    consumer.OnError += (_, error) => Console.WriteLine($"consumer error. reason: {error.Reason}");

                    consumer.OnConsumeError += (_, error) => Console.WriteLine($"fail consume. reason: {error.Error}");

                    consumer.OnPartitionsAssigned += (_, partitions) => consumer.Assign(partitions);

                    consumer.OnPartitionsRevoked += (_, partitions) => consumer.Unassign();

                    consumer.Subscribe("test.C");

                    while (true)
                    {
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }

                        Message<string, string> msg;
                        if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100)))
                        {
                            continue;
                        }

                        var cm = JsonConvert.DeserializeObject<ConsumedMessage>(msg.Value);
                        Console.WriteLine($"success consumed. message: {cm.Message}, timestamp: {cm.Timestamp}");

                        consumer.CommitAsync(msg);
                    }
                }
            })(bootstrapServers, tokenSource.Token), tokenSource.Token);

            // Ctl+C待機
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true;
                tokenSource.Cancel(); // Taskキャンセル
            };

            Task.WaitAll(pTask, cTask);

            Console.WriteLine("stop .Net Kafka Example. press any key to close.");

            Console.ReadKey();
        }
    }
}

大まかにはプロデューサーと似たような感じですが、コンシューマーの方が拾えるイベントが多いみたいです。
consumer.Consume(out msg, TimeSpan.FromMilliseconds(100))が受信部分です。

実行

前回まで同様にプロデューサーとコンシューマーを組み合わせて実行してみます(Program.cs)。
どうもプロデューサーの処理結果の出力より、コンシューマーの受信処理の方が早く出てしまうこともあるみたいです。
Your.Kafka.Server:9092はダミーです。

please input bootstrap servers.
Your.Kafka.Server:9092 // ダミー
start .Net Kafka Example. Ctl+C to exit
success send. message: {"Message":"Hello","Timestamp":5248282179105326175}
success consumed. message: Hello, timestamp: 5248282179105326175
success consumed. message: Hello, timestamp: 5248282179235972765
success send. message: {"Message":"Hello","Timestamp":5248282179235972765}
success consumed. message: Hello, timestamp: 5248282179346466796
success send. message: {"Message":"Hello","Timestamp":5248282179346466796}
stop .Net Kafka Example. press any key to close.

終わりに

これまで同様に閉じた世界でやってますが、外につなげたい時、C#ではどうすればいいだろうか?RXあたりを使ってObservableなプロデューサー・コンシューマーを作ってみるのも有りかも・・・。

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3