1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache Kafka を使った簡単な Pub/Sub メッセージング

Posted at

このドキュメントの内容

Apache Kafka を使った簡単な Pub/Sub メッセージングを実装してみます。

  • 特定のトピックに対してメッセージを発行する Publisher アプリケーション
  • 特定のトピックのメッセージを購読する Subscriber アプリケーション

アプリケーション構成.png

Kafka の動作環境を構築する

Docker を使用して Kafka の動作環境を構築することにしました。2020/12/31 時点のバージョンは ver 2.6.0 です。

docker-compose.yml を作成する

docker-compose.yml を作成します。

  • docker image は wurstmeister/kafka を使用することにしました。
  • トピックの自動生成を有効にしました。
    • Confluent.Kafka ver 1.5.3 で確認したところ、メッセージの発行時にはトピックが自動生成されましたが、メッセージの購読時には自動生成されずに「トピックが存在しない」例外が発生しました。
docker-compose.yml
version: '3'

services:

  test-zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  test-kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: test-zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

Dockerコンテナを起動する

docker-compose.yml が格納されているフォルダで powershell コンソールを開き、次のコマンドを実行します。

powershell
PS> docker-compose.yml up -d
Creating network "kafka_default" with the default driver
Creating kafka_test-zookeeper_1 ... done
Creating kafka_test-kafka_1     ... done

正常に起動できたかどうかは次のコマンドで確認できます。

powershell
PS> docker-compose.yml ps
         Name                       Command               State                         Ports
--------------------------------------------------------------------------------------------------------------------
kafka_test-kafka_1       start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp
kafka_test-zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

Dockerコンテナを停止(と削除)する

docker-compose.yml が格納されているフォルダで powershell コンソールを開き、次のコマンドを実行します。

powershell
PS> docker-compose.yml down

Kafka の状態を確認する

docker desktop のダッシュボードで Kafka の状態を確認します。

Kafkaの起動ログ.png

正常に起動できている場合、次のようなログが表示されているはずです。

test-kafka_1 | [2020-12-30 04:19:50,593] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)

CLIを開きます。Kafka のインスタンスページのアイコンから起動することができます。

CLIの起動.png

登録されているトピックの一覧を表示するには、次のコマンドを実行します。トピックが存在しない場合は何も表示されません。

/# kafka-topics.sh --zookeeper test-zookeeper:2181 --list

ここでトピックを登録するには、次のコマンドを実行します。

/# kafka-topics.sh --zookeeper test-zookeeper:2181 --create --topic test-topic --replication-factor 1 --partitions 1

Pub/Sub アプリケーションを実装する

.NET Core 3.1 コンソールアプリケーションとして実装します。
Kafka ライブラリには Confluent.Kafka ver 1.5.3 を使用しました。

完全なソースコードは GitHub にアップロードしてあります。

キーとメッセージ

次のキーとメッセージをアプリケーション間で連携します。

/// <summary>
/// キー
/// </summary>
public readonly struct SampleMessageKey : IEquatable<SampleMessageKey>
{
    public SampleMessageKey(string key)
    {
        Key = key;
    }

    public string Key { get; }

    public override bool Equals(object obj)
    {
        return obj is SampleMessageKey key && Equals(key);
    }
    public bool Equals(SampleMessageKey other)
    {
        return Key == other.Key;
    }
    public override int GetHashCode()
    {
        return 990326508 + EqualityComparer<string>.Default.GetHashCode(Key);
    }
    public override string ToString()
    {
        return Key;
    }
}

/// <summary>
/// メッセージ
/// </summary>
public class SampleMessageBody
{
    public SampleMessageBody(DateTimeOffset time, string message)
    {
        Time = time;
        Message = message;
    }

    public DateTimeOffset Time { get; }
    public string Message { get; }

    public override string ToString()
    {
        return Message;
    }
}

Publisher アプリケーション

Kafka に対して一定間隔でメッセージを発行するコンソールアプリケーションです。
発行先のブートストラップサーバーとトピックはコンソール入力から受け取ります。

エントリポイントの実装

Program.cs
class Program
{
    static async Task Main(string[] args)
    {
        try
        {
            await RunAsync().ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        Console.ReadKey();
    }

    /// <summary>
    /// 処理を実行します。
    /// </summary>
    /// <returns></returns>
    static async Task RunAsync()
    {
        int process = System.Diagnostics.Process.GetCurrentProcess().Id;
        Console.WriteLine($"パブリッシャーをプロセス {process} で起動しました。");

        // コンソールからパラメーターを受け取る
        Console.WriteLine("bootstrap servers を入力してください(省略時 127.0.0.1):");
        var bootstrapServers = Console.ReadLine();
        if (string.IsNullOrEmpty(bootstrapServers)) { bootstrapServers = "127.0.0.1"; }

        Console.WriteLine($"トピックを入力してください(省略時 {Constants.DefaultTopic}):");
        var topic = Console.ReadLine();
        if (string.IsNullOrEmpty(topic)) { topic = Constants.DefaultTopic; }

        // キャンセルトークンを生成する
        using var cancelTokenSource = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) =>
        {
            e.Cancel = true;
            cancelTokenSource.Cancel();
        };

        // 動作設定を生成する
        var publisherSetting = new MessagePublisherSetting()
        {
            BootstrapServers = bootstrapServers
        };

        // キーを生成するメソッド
        static SampleMessageKey GenerateKey()
        {
            return new SampleMessageKey(Guid.NewGuid().ToString());
        }

        // パブリッシャーを生成する
        var factory = new SampleMessagePublisherFactory(publisherSetting, new SampleLogger());

        using IMessagePublisher<SampleMessageBody> publisher
            = factory.CreatePublisher<SampleMessageKey, SampleMessageBody>(topic, GenerateKey);

        Console.WriteLine("メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。");

        // 一定間隔でメッセージを発行する
        int sequence = 0;
        TimeSpan interval = TimeSpan.FromSeconds(5);

        while (true)
        {
            if (cancelTokenSource.Token.IsCancellationRequested) { break; }

            ++sequence;

            await publisher.PublishAsync(
                new SampleMessageBody(DateTimeOffset.UtcNow, $"{sequence}回目のメッセージ(プロセス{process})")
                , cancelTokenSource.Token
                ).ConfigureAwait(false);

            await Task.Delay(interval, cancelTokenSource.Token);
        }

        Console.WriteLine("メッセージの送信処理を終了しました。");
    }
}

パブリッシャーの実装

シンプルなファクトリーパターンを採用しました。

SampleMessagePublisherFactory.cs
/// <summary>
/// パブリッシャーを生成します。
/// </summary>
internal class SampleMessagePublisherFactory : MessagePublisherFactoryBase
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="publisherSetting">パブリッシャーの動作設定</param>
    /// <param name="logger">ロガー</param>
    internal SampleMessagePublisherFactory(MessagePublisherSetting publisherSetting, ILogger logger)
        : base(publisherSetting, logger)
    {
    }

    /// <summary>
    /// シリアライザを取得します。
    /// </summary>
    /// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
    /// <returns>シリアライザ</returns>
    protected override ISerializer<T> GetSerializer<T>()
    {
        return SampleSerializerFactory.Create<T>();
    }
}

/// <summary>
/// パブリッシャーの生成処理の基本実装。
/// </summary>
public abstract class MessagePublisherFactoryBase
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="publisherSetting">パブリッシャーの動作設定</param>
    /// <param name="logger">ロガー</param>
    protected MessagePublisherFactoryBase(MessagePublisherSetting publisherSetting, ILogger logger)
    {
        PublisherSetting = publisherSetting;
        Logger = logger;
    }

    /// <summary>
    /// パブリッシャーの動作設定を取得します。
    /// </summary>
    protected MessagePublisherSetting PublisherSetting { get; }

    /// <summary>
    /// ロガーを取得します。
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    /// パブリッシャーを生成します。
    /// </summary>
    /// <typeparam name="TKey">キーの型</typeparam>
    /// <typeparam name="TMessage">メッセージの型</typeparam>
    /// <param name="topic">トピック</param>
    /// <param name="keyGenerator">キーの生成処理</param>
    /// <returns>パブリッシャー</returns>
    public MessagePublisher<TKey, TMessage> CreatePublisher<TKey, TMessage>(string topic, Func<TKey> keyGenerator)
    {
        return new MessagePublisher<TKey, TMessage>(
            GetSerializer<TKey>()
            , GetSerializer<TMessage>()
            , PublisherSetting
            , topic
            , keyGenerator
            , Logger
            );
    }

    /// <summary>
    /// シリアライザを取得します。
    /// </summary>
    /// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
    /// <returns>シリアライザ</returns>
    protected abstract ISerializer<T> GetSerializer<T>();
}

パブリッシャーは Kafka に対するプロデューサー(IProducer<TKey, TMessage>)を内包し、指定されたメッセージを Kafka に送信します。

MessagePublisher.cs
/// <summary>
/// Kafka にメッセージを送信します。
/// </summary>
/// <typeparam name="TKey">キーの型</typeparam>
/// <typeparam name="TMessage">メッセージの型</typeparam>
public class MessagePublisher<TKey, TMessage> : IMessagePublisher<TMessage>
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="keySerializer">キーに対するシリアライザ</param>
    /// <param name="messageSerializer">メッセージに対するシリアライザ</param>
    /// <param name="setting">プロデューサーの動作設定</param>
    /// <param name="keyGenerator">キーの生成処理</param>
    /// <param name="topic">トピック</param>
    /// <param name="logger">ロガー</param>
    public MessagePublisher(ISerializer<TKey> keySerializer, ISerializer<TMessage> messageSerializer, MessagePublisherSetting setting, string topic, Func<TKey> keyGenerator, ILogger logger)
    {
        KeySerializer = keySerializer;
        MessageSerializer = messageSerializer;
        Topic = topic;
        KeyGenerator = keyGenerator;
        Logger = logger;
        Producer = BuildProducer(GetProducerConfig(setting));
    }

    /// <summary>
    /// 使用しているリソースを解放します。
    /// </summary>
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary>
    /// 使用しているリソースを解放します。
    /// </summary>
    protected virtual void Dispose(bool disposing)
    {
        TerminateProducer();
    }

    /// <summary>
    /// キーに対するシリアライザを取得します。
    /// </summary>
    private ISerializer<TKey> KeySerializer { get; }

    /// <summary>
    /// メッセージに対するシリアライザを取得します。
    /// </summary>
    private ISerializer<TMessage> MessageSerializer { get; }

    /// <summary>
    /// ロガーを取得します。
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    /// トピックを取得します。
    /// </summary>
    private string Topic { get; }

    /// <summary>
    /// キーの生成処理を取得します。
    /// </summary>
    private Func<TKey> KeyGenerator { get; }

    #region プロデューサー

    /// <summary>
    /// プロデューサーを取得します。
    /// </summary>
    private IProducer<TKey, TMessage> Producer { get; }

    /// <summary>
    /// プロデューサーを解放します。
    /// </summary>
    private void TerminateProducer()
    {
        if (Producer == null) { return; }
        Producer.Flush(TimeSpan.FromMilliseconds(10000));
        Producer.Dispose();
    }

    /// <summary>
    /// プロデューサーを生成します。
    /// </summary>
    /// <param name="config">動作設定</param>
    /// <returns>プロデューサー</returns>
    protected virtual IProducer<TKey, TMessage> BuildProducer(IEnumerable<KeyValuePair<string, string>> config)
    {
        var producerBuilder = new ProducerBuilder<TKey, TMessage>(config)
            .SetKeySerializer(KeySerializer)
            .SetValueSerializer(MessageSerializer)
            .SetErrorHandler(OnError)
            ;

        return producerBuilder.Build();
    }

    /// <summary>
    /// プロデューサーの動作設定を取得します。
    /// </summary>
    /// <param name="producerSetting">プロデューサーの動作設定</param>
    /// <returns>動作設定のキーと値の組み合わせ</returns>
    protected virtual IEnumerable<KeyValuePair<string, string>> GetProducerConfig(MessagePublisherSetting producerSetting)
    {
        if (producerSetting.BootstrapServers == null || producerSetting.BootstrapServers == "")
        {
            throw new NullReferenceException("ブートストラップサーバーが設定されていません。");
        }

        return new ProducerConfig()
        {
            BootstrapServers = producerSetting.BootstrapServers,
        };
    }

    /// <summary>
    /// エラーが発生したときの処理を行います。
    /// </summary>
    /// <param name="producer"></param>
    /// <param name="error"></param>
    protected virtual void OnError(IProducer<TKey, TMessage> producer, Error error)
    {
        WriteLog(LogLevel.Error, () => BuildLogMessage(error));
    }

    #endregion

    #region メッセージ発行

    /// <summary>
    /// 指定されたメッセージを発行します。
    /// </summary>
    /// <param name="message">メッセージ</param>
    /// <returns></returns>
    public Task PublishAsync(TMessage message, CancellationToken cancellationToken)
    {
        var kafkaMessage = new Message<TKey, TMessage>()
        {
            Key = GenerateNewKey(),
            Value = message,
            Timestamp = new Timestamp(DateTimeOffset.UtcNow)
        };

        return Producer.ProduceAsync(Topic, kafkaMessage, cancellationToken)
            .ContinueWith(t => OnPublished(t.Result));
    }

    /// <summary>
    /// メッセージを発行したときの処理を行います。
    /// </summary>
    /// <param name="result">発行の結果</param>
    protected virtual void OnPublished(DeliveryResult<TKey, TMessage> result)
    {
        WriteLog(LogLevel.Debug, () => BuildLogMessage(result));
    }

    /// <summary>
    /// 新しいキーを生成します。
    /// </summary>
    /// <returns>キー</returns>
    private TKey GenerateNewKey()
    {
        return KeyGenerator();
    }

    #endregion

    #region ロギング

    /// <summary>
    /// 指定されたログを出力します。
    /// </summary>
    /// <param name="level">ログレベル</param>
    /// <param name="messageBuilder">ログメッセージを生成するメソッド</param>
    /// <param name="exception">例外</param>
    private void WriteLog(LogLevel level, Func<string> messageBuilder, Exception? exception = null)
    {
        if (!Logger.IsEnabled(level)) { return; }

        if (exception == null)
        {
            Logger.Log(level, messageBuilder());
        }
        else
        {
            Logger.Log(level, exception, messageBuilder());
        }
    }

    private string BuildLogMessage(DeliveryResult<TKey, TMessage> result)
    {
        return $"メッセージを発行しました。[{result.Topic}:{result.Offset}] {result.Message.Value}";
    }

    private string BuildLogMessage(Error error)
    {
        return error.Reason;
    }

    #endregion
}

Subscriber アプリケーション

Kafka からメッセージを購読するコンソールアプリケーションです。
購読先のブートストラップサーバーとトピックはコンソール入力から受け取ります。
ReactiveExtensions(System.Reactive)を使用した observable パターンを採用しました。

エントリポイントの実装

Program.cs
class Program
{
    static async Task Main(string[] args)
    {
        try
        {
            await RunAsync().ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        Console.ReadKey();
    }

    static async Task RunAsync()
    {
        int process = System.Diagnostics.Process.GetCurrentProcess().Id;
        Console.WriteLine($"サブスクライバーをプロセス {process} で起動しました。");

        // コンソールからパラメーターを受け取る
        Console.WriteLine("bootstrap servers を入力してください(省略時 127.0.0.1):");
        var bootstrapServers = Console.ReadLine();
        if (string.IsNullOrEmpty(bootstrapServers)) { bootstrapServers = "127.0.0.1"; }

        Console.WriteLine($"コンシューマーグループIDを入力してください(省略時 {Constants.DefaultComsumerGroupID}):");
        var groupID = Console.ReadLine();
        if (string.IsNullOrEmpty(groupID)) { groupID = Constants.DefaultComsumerGroupID; }

        Console.WriteLine($"トピックを入力してください(省略時 {Constants.DefaultTopic}):");
        var topic = Console.ReadLine();
        if (string.IsNullOrEmpty(topic)) { topic = Constants.DefaultTopic; }

        // キャンセルトークンを生成する
        using var cancelTokenSource = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) =>
        {
            e.Cancel = true;
            cancelTokenSource.Cancel();
        };

        // 動作設定を生成する
        var subscriberSetting = new MessageSubscriberSetting()
        {
            BootstrapServers = bootstrapServers,
            ConsumerGroupID = groupID
        };

        // observable パターンでメッセージを監視する
        var factory = new SampleMessageSubscriberFactory(subscriberSetting, new SampleLogger());

        Console.WriteLine($"メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。");

        var subscriber = factory.CreateSubscriber<SampleMessageKey, SampleMessageBody>(topic);
        using var releaser = subscriber.Subscribe(new SampleMessageObserver());

        await subscriber.SubscribeAsync(cancelTokenSource.Token).ConfigureAwait(false);

        Console.WriteLine("メッセージの受信処理を終了しました。");
    }
}

サブスクライバーの実装

シンプルなファクトリーパターンを採用しました。

SampleMessagePublisherFactory.cs
/// <summary>
/// サブスクライバーを生成します。
/// </summary>
internal class SampleMessageSubscriberFactory : MessageSubscriberFactoryBase
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="subscriberSetting">サブスクライバーの動作設定</param>
    /// <param name="logger">ロガー</param>
    internal SampleMessageSubscriberFactory(MessageSubscriberSetting subscriberSetting, ILogger logger)
        : base(subscriberSetting, logger)
    {
    }

    /// <summary>
    /// シリアライザを取得します。
    /// </summary>
    /// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
    /// <returns>シリアライザ</returns>
    protected override IDeserializer<T> GetDeserializer<T>()
    {
        return SampleSerializerFactory.Create<T>();
    }
}

/// <summary>
/// サブスクライバーの生成処理の基本実装。
/// </summary>
public abstract class MessageSubscriberFactoryBase
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="subscriberSetting">サブスクライバーの動作設定</param>
    /// <param name="logger">ロガー</param>
    protected MessageSubscriberFactoryBase(MessageSubscriberSetting subscriberSetting, ILogger logger)
    {
        SubscriberSetting = subscriberSetting;
        Logger = logger;
    }

    /// <summary>
    /// サブスクライバーの動作設定を取得します。
    /// </summary>
    private MessageSubscriberSetting SubscriberSetting { get; }

    /// <summary>
    /// ロガーを取得します。
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    /// サブスクライバーを生成します。
    /// </summary>
    /// <typeparam name="TKey">キーの型</typeparam>
    /// <typeparam name="TMessage">メッセージの型</typeparam>
    /// <returns>サブスクライバー</returns>
    public MessageSubscriber<TKey, TMessage> CreateSubscriber<TKey, TMessage>(string topic)
    {
        return new MessageSubscriber<TKey, TMessage>(
            GetDeserializer<TKey>()
            , GetDeserializer<TMessage>()
            , SubscriberSetting
            , topic
            , Logger
            );
    }

    /// <summary>
    /// シリアライザを取得します。
    /// </summary>
    /// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
    /// <returns>シリアライザ</returns>
    protected abstract IDeserializer<T> GetDeserializer<T>();
}

サブスクライバーは Kafka に対するコンシューマー(IConsumer<TKey, TMessage>)を内包し、指定されたトピックに対して発行されたメッセージを Kafka から受信します。

MessageSubscriber.cs
/// <summary>
/// Kafka からのメッセージを監視します。
/// </summary>
/// <typeparam name="TKey">メッセージのキーの型</typeparam>
/// <typeparam name="TMessage">メッセージの型</typeparam>
public class MessageSubscriber<TKey, TMessage> : System.Reactive.ObservableBase<TMessage>
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="keyDeserializer">キーに対するデシリアライザ</param>
    /// <param name="messageDeserializer">メッセージに対するデシリアライザ</param>
    /// <param name="subscriberSetting">動作設定</param>
    /// <param name="topic">トピック</param>
    /// <param name="logger">ロガー</param>
    public MessageSubscriber(IDeserializer<TKey> keyDeserializer, IDeserializer<TMessage> messageDeserializer, MessageSubscriberSetting subscriberSetting, string topic, ILogger logger) : base()
    {
        KeyDeserializer = keyDeserializer;
        MessageDeserializer = messageDeserializer;
        SubscriberSetting = subscriberSetting;
        Topic = topic;
        Logger = logger ?? Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance;
    }

    /// <summary>
    /// ロガーを取得します。
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    /// 動作設定を取得します。
    /// </summary>
    private MessageSubscriberSetting SubscriberSetting { get; }

    /// <summary>
    /// トピックを取得します。
    /// </summary>
    private string Topic { get; }

    #region 受信

    /// <summary>
    /// キーに対するデシリアライザを取得します。
    /// </summary>
    private IDeserializer<TKey> KeyDeserializer { get; }

    /// <summary>
    /// メッセージに対するデシリアライザを取得します。
    /// </summary>
    private IDeserializer<TMessage> MessageDeserializer { get; }

    /// <summary>
    /// メッセージ受信を開始します。
    /// </summary>
    /// <param name="cancellation">キャンセルトークン</param>
    public Task SubscribeAsync(CancellationToken cancellation)
    {
        Task.Yield();

        TimeSpan interval = SubscriberSetting.ConsumeInterval;

        using var consumer = BuildConsumer(GetConsumerConfig(SubscriberSetting));
        consumer.Subscribe(Topic);

        while (true)
        {
            if (cancellation.IsCancellationRequested) { break; }
            try
            {
                if (m_Observers.Count == 0) { continue; }

                ConsumeResult<TKey, TMessage> result = consumer.Consume(interval);

                if (result == null) { continue; }
                // TODO: 今回の確認では IsPartitionEOF を発生させることができなかった。
                if (result.IsPartitionEOF) { continue; }

                WriteLog(LogLevel.Debug, () => BuildLogMessage(result));
                NotifyMessage(result.Message.Value);

                consumer.Commit(result);
            }
            catch (Exception ex)
            {
                OnException(consumer, ex);
                break;
            }
        }

        NotifyComplated();

        return Task.CompletedTask;
    }

    /// <summary>
    /// コンシューマーを生成します。
    /// </summary>
    /// <param name="config">動作設定</param>
    /// <returns>コンシューマー</returns>
    protected IConsumer<TKey, TMessage> BuildConsumer(IEnumerable<KeyValuePair<string, string>> config)
    {
        var consumerBuilder = new ConsumerBuilder<TKey, TMessage>(config)
            .SetKeyDeserializer(KeyDeserializer)
            .SetValueDeserializer(MessageDeserializer)
            .SetErrorHandler(OnError)
            .SetLogHandler(OnLogging)
            ;

        return consumerBuilder.Build();
    }

    /// <summary>
    /// コンシューマーの動作設定を取得します。
    /// </summary>
    /// <param name="consumerSetting">コンシューマーの動作設定</param>
    /// <returns>動作設定のキーと値の組み合わせ</returns>
    protected IEnumerable<KeyValuePair<string, string>> GetConsumerConfig(MessageSubscriberSetting consumerSetting)
    {
        if (consumerSetting.BootstrapServers == null || consumerSetting.BootstrapServers == "")
        {
            throw new NullReferenceException("ブートストラップサーバーが設定されていません。");
        }

        if (consumerSetting.ConsumerGroupID == null || consumerSetting.ConsumerGroupID == "")
        {
            throw new NullReferenceException("コンシューマーグループIDが設定されていません。");
        }

        return new ConsumerConfig()
        {
            BootstrapServers = consumerSetting.BootstrapServers,
            GroupId = consumerSetting.ConsumerGroupID,
            EnableAutoCommit = false,
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
    }

    /// <summary>
    /// ログを出力します。
    /// </summary>
    /// <param name="consumer"></param>
    /// <param name="log"></param>
    private void OnLogging(IConsumer<TKey, TMessage> consumer, LogMessage log)
    {
        LogLevel logLevel = log.Level.ToLogLevel();
        WriteLog(logLevel, () => BuildLogMessage(log));
    }

    /// <summary>
    /// エラーが発生したときの処理を行います。
    /// </summary>
    /// <param name="consumer"></param>
    /// <param name="error"></param>
    private void OnError(IConsumer<TKey, TMessage> consumer, Error error)
    {
        WriteLog(LogLevel.Error, () => BuildLogMessage(error));
        NotifyError(new Exception(error.Reason));
    }

    /// <summary>
    /// 例外が発生したときの処理を行います。
    /// </summary>
    /// <param name="consumer"></param>
    /// <param name="exception"></param>
    private void OnException(IConsumer<TKey, TMessage> consumer, Exception exception)
    {
        WriteLog(LogLevel.Critical, () => BuildLogMessage(exception), exception);
        NotifyError(exception);
    }

    #endregion

    #region 通知

    /// <summary>
    /// 指定されたメッセージを通知します。
    /// </summary>
    /// <param name="message">メッセージ</param>
    private void NotifyMessage(TMessage message)
    {
        if (m_Observers.Count == 0) { return; }
        lock (m_Observers)
        {
            for (int i = 0; i < m_Observers.Count; ++i)
            {
                m_Observers[i].OnNext(message);
            }
        }
    }

    /// <summary>
    /// 指定されたメッセージを通知します。
    /// </summary>
    /// <param name="exception">例外</param>
    private void NotifyError(Exception exception)
    {
        if (m_Observers.Count == 0) { return; }
        lock (m_Observers)
        {
            for (int i = 0; i < m_Observers.Count; ++i)
            {
                m_Observers[i].OnError(exception);
            }
        }
    }

    /// <summary>
    /// 完了を通知します。
    /// </summary>
    private void NotifyComplated()
    {
        if (m_Observers.Count == 0) { return; }
        lock (m_Observers)
        {
            for (int i = 0; i < m_Observers.Count; ++i)
            {
                m_Observers[i].OnCompleted();
            }
        }
    }

    #endregion

    #region オブザーバー

    /// <summary>
    /// 指定されたオブザーバーによる購読を開始します。
    /// </summary>
    /// <param name="observer"></param>
    /// <returns></returns>
    protected override IDisposable SubscribeCore(IObserver<TMessage> observer)
    {
        AddObserver(observer);
        return System.Reactive.Disposables.Disposable.Create(() => RemoveObserver(observer));
    }

    private readonly List<IObserver<TMessage>> m_Observers = new List<IObserver<TMessage>>();

    /// <summary>
    /// 指定されたオブザーバーを追加します。
    /// </summary>
    /// <param name="observer"></param>
    private void AddObserver(IObserver<TMessage> observer)
    {
        lock (m_Observers)
        {
            m_Observers.Add(observer);
        }
    }

    /// <summary>
    /// 指定されたオブザーバーを削除します。
    /// </summary>
    /// <param name="observer"></param>
    private void RemoveObserver(IObserver<TMessage> observer)
    {
        if (m_Observers.Contains(observer))
        {
            lock (m_Observers)
            {
                m_Observers.Remove(observer);
            }
        }
    }

    #endregion

    #region ロギング

    /// <summary>
    /// 指定されたログを出力します。
    /// </summary>
    /// <param name="level">ログレベル</param>
    /// <param name="messageBuilder">ログメッセージを生成するメソッド</param>
    /// <param name="exception">例外</param>
    private void WriteLog(LogLevel level, Func<string> messageBuilder, Exception? exception = null)
    {
        if (!Logger.IsEnabled(level)) { return; }

        if (exception == null)
        {
            Logger.Log(level, messageBuilder());
        }
        else
        {
            Logger.Log(level, exception, messageBuilder());
        }
    }

    private string BuildLogMessage(ConsumeResult<TKey, TMessage> result)
    {
        return $"メッセージを受け取りました。[{result.Topic}:{result.Offset}] {result.Message.Value}";
    }

    private string BuildLogMessage(LogMessage log)
    {
        return log.Message;
    }

    private string BuildLogMessage(Error error)
    {
        return error.Reason;
    }

    private string BuildLogMessage(Exception exception)
    {
        return exception.Message;
    }

    #endregion
}

アプリケーションの動作確認

コンシューマーグループとトピックによる送受信制御

パブリッシャーとサブスクライバーを多重起動し、どのようにメッセージの送受信が行われるかを確認します。

パブリッシャー 動作
Publisher1 "Topic-A" に対してメッセージを発行します。
Publisher2 "Topic-B" に対してメッセージを発行します。
Publisher3 "Topic-B" に対してメッセージを発行します。
サブスクライバー 動作
SubScriber1 コンシューマーグループ "Group1" に属するコンシューマーで "Topic-A" のメッセージを購読します。
SubScriber2 コンシューマーグループ "Group1" に属するコンシューマーで "Topic-A" のメッセージを購読します。
SubScriber3 コンシューマーグループ "Group2" に属するコンシューマーで "Topic-A" のメッセージを購読します。
SubScriber4 コンシューマーグループ "Group1" に属するコンシューマーで "Topic-B" のメッセージを購読します。

ログに出力されている "[Topic-A:0]" は、トピックとそのトピックのオフセット値(メッセージを発行するたびにインクリメントされる連番)を表しています。

Publisher1
パブリッシャーをプロセス 5040 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
11:33:13.426 [Debug] メッセージを発行しました。[Topic-A:0] 1回目のメッセージ(プロセス5040)
11:33:18.453 [Debug] メッセージを発行しました。[Topic-A:1] 2回目のメッセージ(プロセス5040)
11:33:23.491 [Debug] メッセージを発行しました。[Topic-A:2] 3回目のメッセージ(プロセス5040)
11:33:28.508 [Debug] メッセージを発行しました。[Topic-A:3] 4回目のメッセージ(プロセス5040)
11:33:33.530 [Debug] メッセージを発行しました。[Topic-A:4] 5回目のメッセージ(プロセス5040)
11:33:38.546 [Debug] メッセージを発行しました。[Topic-A:5] 6回目のメッセージ(プロセス5040)
11:33:43.565 [Debug] メッセージを発行しました。[Topic-A:6] 7回目のメッセージ(プロセス5040)
11:33:48.595 [Debug] メッセージを発行しました。[Topic-A:7] 8回目のメッセージ(プロセス5040)
(以下割愛)

二つのプロセスから Tobic-B に対してメッセージを発行しています。オフセット値が重複することなくインクリメントされていることがわかります。

Publisher2
パブリッシャーをプロセス 15532 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-B
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
11:33:21.558 [Debug] メッセージを発行しました。[Topic-B:0] 1回目のメッセージ(プロセス15532)
11:33:26.592 [Debug] メッセージを発行しました。[Topic-B:1] 2回目のメッセージ(プロセス15532)
11:33:31.621 [Debug] メッセージを発行しました。[Topic-B:2] 3回目のメッセージ(プロセス15532)
11:33:36.640 [Debug] メッセージを発行しました。[Topic-B:4] 4回目のメッセージ(プロセス15532)
11:33:41.653 [Debug] メッセージを発行しました。[Topic-B:6] 5回目のメッセージ(プロセス15532)
11:33:46.672 [Debug] メッセージを発行しました。[Topic-B:8] 6回目のメッセージ(プロセス15532)
11:33:51.687 [Debug] メッセージを発行しました。[Topic-B:10] 7回目のメッセージ(プロセス15532)
11:33:56.714 [Debug] メッセージを発行しました。[Topic-B:12] 8回目のメッセージ(プロセス15532)
(以下割愛)
Piblisher3
パブリッシャーをプロセス 4928 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-B
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
11:33:33.667 [Debug] メッセージを発行しました。[Topic-B:3] 1回目のメッセージ(プロセス4928)
11:33:38.698 [Debug] メッセージを発行しました。[Topic-B:5] 2回目のメッセージ(プロセス4928)
11:33:43.718 [Debug] メッセージを発行しました。[Topic-B:7] 3回目のメッセージ(プロセス4928)
11:33:48.738 [Debug] メッセージを発行しました。[Topic-B:9] 4回目のメッセージ(プロセス4928)
11:33:53.764 [Debug] メッセージを発行しました。[Topic-B:11] 5回目のメッセージ(プロセス4928)
11:33:58.790 [Debug] メッセージを発行しました。[Topic-B:13] 6回目のメッセージ(プロセス4928)
11:34:03.800 [Debug] メッセージを発行しました。[Topic-B:15] 7回目のメッセージ(プロセス4928)
11:34:08.809 [Debug] メッセージを発行しました。[Topic-B:17] 8回目のメッセージ(プロセス4928)
(以下割愛)

コンシューマーグループ Group1 で Topic-A のメッセージを購読するプロセスを二つ起動すると、何れか一つのみがメッセージを受け取ります。メッセージを受け取っていたサブスクライバーを停止させると、他方のサブスクライバーがメッセージを受け取るようになります。

Subscriber1
サブスクライバーをプロセス 11948 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:33:13.493 [Debug] メッセージを受け取りました。[Topic-A:0] 1回目のメッセージ(プロセス5040)
11:33:18.450 [Debug] メッセージを受け取りました。[Topic-A:1] 2回目のメッセージ(プロセス5040)
11:33:23.487 [Debug] メッセージを受け取りました。[Topic-A:2] 3回目のメッセージ(プロセス5040)
11:33:28.510 [Debug] メッセージを受け取りました。[Topic-A:3] 4回目のメッセージ(プロセス5040)
11:33:33.531 [Debug] メッセージを受け取りました。[Topic-A:4] 5回目のメッセージ(プロセス5040)
11:33:38.547 [Debug] メッセージを受け取りました。[Topic-A:5] 6回目のメッセージ(プロセス5040)
11:33:43.566 [Debug] メッセージを受け取りました。[Topic-A:6] 7回目のメッセージ(プロセス5040)
11:33:48.595 [Debug] メッセージを受け取りました。[Topic-A:7] 8回目のメッセージ(プロセス5040)
11:33:53.615 [Debug] メッセージを受け取りました。[Topic-A:8] 9回目のメッセージ(プロセス5040)
11:33:58.633 [Debug] メッセージを受け取りました。[Topic-A:9] 10回目のメッセージ(プロセス5040)
11:34:03.660 [Debug] メッセージを受け取りました。[Topic-A:10] 11回目のメッセージ(プロセス5040)
11:34:08.685 [Debug] メッセージを受け取りました。[Topic-A:11] 12回目のメッセージ(プロセス5040)
11:34:13.701 [Debug] メッセージを受け取りました。[Topic-A:12] 13回目のメッセージ(プロセス5040)
11:34:18.719 [Debug] メッセージを受け取りました。[Topic-A:13] 14回目のメッセージ(プロセス5040)
11:34:23.743 [Debug] メッセージを受け取りました。[Topic-A:14] 15回目のメッセージ(プロセス5040)
11:34:28.765 [Debug] メッセージを受け取りました。[Topic-A:15] 16回目のメッセージ(プロセス5040)
メッセージの受信処理を終了しました。
Subscriber2
サブスクライバーをプロセス 13144 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:34:42.843 [Debug] メッセージを受け取りました。[Topic-A:16] 17回目のメッセージ(プロセス5040)
11:34:42.854 [Debug] メッセージを受け取りました。[Topic-A:17] 18回目のメッセージ(プロセス5040)
11:34:43.810 [Debug] メッセージを受け取りました。[Topic-A:18] 19回目のメッセージ(プロセス5040)
11:34:48.821 [Debug] メッセージを受け取りました。[Topic-A:19] 20回目のメッセージ(プロセス5040)

コンシューマーグループが異なれば全てのメッセージを受け取ります。

Subscriber3
サブスクライバーをプロセス 11192 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group2
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:33:13.493 [Debug] メッセージを受け取りました。[Topic-A:0] 1回目のメッセージ(プロセス5040)
11:33:18.451 [Debug] メッセージを受け取りました。[Topic-A:1] 2回目のメッセージ(プロセス5040)
11:33:23.484 [Debug] メッセージを受け取りました。[Topic-A:2] 3回目のメッセージ(プロセス5040)
11:33:28.511 [Debug] メッセージを受け取りました。[Topic-A:3] 4回目のメッセージ(プロセス5040)
11:33:33.530 [Debug] メッセージを受け取りました。[Topic-A:4] 5回目のメッセージ(プロセス5040)
11:33:38.547 [Debug] メッセージを受け取りました。[Topic-A:5] 6回目のメッセージ(プロセス5040)
11:33:43.565 [Debug] メッセージを受け取りました。[Topic-A:6] 7回目のメッセージ(プロセス5040)
11:33:48.594 [Debug] メッセージを受け取りました。[Topic-A:7] 8回目のメッセージ(プロセス5040)
11:33:53.616 [Debug] メッセージを受け取りました。[Topic-A:8] 9回目のメッセージ(プロセス5040)
11:33:58.633 [Debug] メッセージを受け取りました。[Topic-A:9] 10回目のメッセージ(プロセス5040)
11:34:03.661 [Debug] メッセージを受け取りました。[Topic-A:10] 11回目のメッセージ(プロセス5040)
11:34:08.684 [Debug] メッセージを受け取りました。[Topic-A:11] 12回目のメッセージ(プロセス5040)
11:34:13.702 [Debug] メッセージを受け取りました。[Topic-A:12] 13回目のメッセージ(プロセス5040)
11:34:18.717 [Debug] メッセージを受け取りました。[Topic-A:13] 14回目のメッセージ(プロセス5040)
11:34:23.743 [Debug] メッセージを受け取りました。[Topic-A:14] 15回目のメッセージ(プロセス5040)
11:34:28.765 [Debug] メッセージを受け取りました。[Topic-A:15] 16回目のメッセージ(プロセス5040)
11:34:33.786 [Debug] メッセージを受け取りました。[Topic-A:16] 17回目のメッセージ(プロセス5040)
11:34:38.797 [Debug] メッセージを受け取りました。[Topic-A:17] 18回目のメッセージ(プロセス5040)
11:34:43.809 [Debug] メッセージを受け取りました。[Topic-A:18] 19回目のメッセージ(プロセス5040)
11:34:48.820 [Debug] メッセージを受け取りました。[Topic-A:19] 20回目のメッセージ(プロセス5040)

Topic-B には二つのプロセスからメッセージが発行されています。発行された順番で受け取ることができています。

Subscriber4
サブスクライバーをプロセス 13912 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-B
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:33:21.625 [Debug] メッセージを受け取りました。[Topic-B:0] 1回目のメッセージ(プロセス15532)
11:33:26.589 [Debug] メッセージを受け取りました。[Topic-B:1] 2回目のメッセージ(プロセス15532)
11:33:31.623 [Debug] メッセージを受け取りました。[Topic-B:2] 3回目のメッセージ(プロセス15532)
11:33:33.650 [Debug] メッセージを受け取りました。[Topic-B:3] 1回目のメッセージ(プロセス4928)
11:33:36.639 [Debug] メッセージを受け取りました。[Topic-B:4] 4回目のメッセージ(プロセス15532)
11:33:38.698 [Debug] メッセージを受け取りました。[Topic-B:5] 2回目のメッセージ(プロセス4928)
11:33:41.655 [Debug] メッセージを受け取りました。[Topic-B:6] 5回目のメッセージ(プロセス15532)
11:33:43.718 [Debug] メッセージを受け取りました。[Topic-B:7] 3回目のメッセージ(プロセス4928)
11:33:46.675 [Debug] メッセージを受け取りました。[Topic-B:8] 6回目のメッセージ(プロセス15532)
11:33:48.735 [Debug] メッセージを受け取りました。[Topic-B:9] 4回目のメッセージ(プロセス4928)
11:33:51.689 [Debug] メッセージを受け取りました。[Topic-B:10] 7回目のメッセージ(プロセス15532)
(以下割愛)

メッセージの到達保証

サブスクライバーが存在しない間に発行されたメッセージを受信できることを確認します。

パブリッシャー 動作
Publisher1 "Topic-A" に対してメッセージを発行します。
サブスクライバー 動作
SubScriber1 Publisher1 からいくつかメッセージが発行された後で起動します。メッセージを受信した後、終了させます。
SubScriber2 SubScriber1 の終了後、Publisher1 からいくつかメッセージが発行された後で起動します。

前述の検証から Kafuka のインスタンスを稼働させ続けているため、Topic-A のオフセット値は 20 から始まっています。

Publisher1
パブリッシャーをプロセス 5096 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
12:21:50.114 [Debug] メッセージを発行しました。[Topic-A:20] 1回目のメッセージ(プロセス5096)
12:21:55.149 [Debug] メッセージを発行しました。[Topic-A:21] 2回目のメッセージ(プロセス5096)
12:22:00.164 [Debug] メッセージを発行しました。[Topic-A:22] 3回目のメッセージ(プロセス5096)
12:22:05.183 [Debug] メッセージを発行しました。[Topic-A:23] 4回目のメッセージ(プロセス5096)
12:22:10.201 [Debug] メッセージを発行しました。[Topic-A:24] 5回目のメッセージ(プロセス5096)
12:22:15.215 [Debug] メッセージを発行しました。[Topic-A:25] 6回目のメッセージ(プロセス5096)
12:22:20.239 [Debug] メッセージを発行しました。[Topic-A:26] 7回目のメッセージ(プロセス5096)
12:22:25.255 [Debug] メッセージを発行しました。[Topic-A:27] 8回目のメッセージ(プロセス5096)
12:22:30.280 [Debug] メッセージを発行しました。[Topic-A:28] 9回目のメッセージ(プロセス5096)
12:22:35.295 [Debug] メッセージを発行しました。[Topic-A:29] 10回目のメッセージ(プロセス5096)
12:22:40.311 [Debug] メッセージを発行しました。[Topic-A:30] 11回目のメッセージ(プロセス5096)
12:22:45.327 [Debug] メッセージを発行しました。[Topic-A:31] 12回目のメッセージ(プロセス5096)
12:22:50.344 [Debug] メッセージを発行しました。[Topic-A:32] 13回目のメッセージ(プロセス5096)
12:22:55.360 [Debug] メッセージを発行しました。[Topic-A:33] 14回目のメッセージ(プロセス5096)

サブスクライバーの起動直後、Kafka に保持されている未購読のメッセージ(20, 21)がまとめて受信されています。

SubScriber1
サブスクライバーをプロセス 17640 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
12:21:58.386 [Debug] メッセージを受け取りました。[Topic-A:20] 1回目のメッセージ(プロセス5096)
12:21:58.397 [Debug] メッセージを受け取りました。[Topic-A:21] 2回目のメッセージ(プロセス5096)
12:22:00.165 [Debug] メッセージを受け取りました。[Topic-A:22] 3回目のメッセージ(プロセス5096)
12:22:05.182 [Debug] メッセージを受け取りました。[Topic-A:23] 4回目のメッセージ(プロセス5096)
12:22:10.202 [Debug] メッセージを受け取りました。[Topic-A:24] 5回目のメッセージ(プロセス5096)
メッセージの受信処理を終了しました。

サブスクライバーの起動直後、Kafka に保持されている未購読のメッセージ(25, 26, 27, 28)がまとめて受信されています。

SubScriber2
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
12:22:32.537 [Debug] メッセージを受け取りました。[Topic-A:25] 6回目のメッセージ(プロセス5096)
12:22:32.548 [Debug] メッセージを受け取りました。[Topic-A:26] 7回目のメッセージ(プロセス5096)
12:22:32.550 [Debug] メッセージを受け取りました。[Topic-A:27] 8回目のメッセージ(プロセス5096)
12:22:32.551 [Debug] メッセージを受け取りました。[Topic-A:28] 9回目のメッセージ(プロセス5096)
12:22:35.298 [Debug] メッセージを受け取りました。[Topic-A:29] 10回目のメッセージ(プロセス5096)
12:22:40.314 [Debug] メッセージを受け取りました。[Topic-A:30] 11回目のメッセージ(プロセス5096)
12:22:58.543 [Debug] メッセージを受け取りました。[Topic-A:31] 12回目のメッセージ(プロセス5096)
12:22:58.551 [Debug] メッセージを受け取りました。[Topic-A:32] 13回目のメッセージ(プロセス5096)
12:22:58.560 [Debug] メッセージを受け取りました。[Topic-A:33] 14回目のメッセージ(プロセス5096)
メッセージの受信処理を終了しました。

まとめ

簡単に Pub/Sub を実装することができました。
プロダクトで利用するには足りない機能がありますが、パーティションやレプリカの動作検証に使おうと考えています。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?