watermill
でKafkaにメッセージを送受信して遊んでみましたが、よくある使い方としてはメッセージが送られてきたらそれを随時処理するアプリでストリーミング処理するケースです。
go言語だとwatermillというフレームワークを使うとこういう処理を簡単に書けます。
router構造体に
- subscriber(メッセージを取ってくるインターフェース)
- publisher(メッセージを送るインターフェース)
- messageHandler(自分がやりたいメッセージの処理を実装したインターフェース)
を渡し、router.Runを実行するだけでいいです。
ただ、入出力する先のコンポーネントはKafka以外にもRabbitMQなど色々と対応していて、自分が使うキューやデータベースに応じてwatermillパッケージの関数を使って
subscriber,publisher関数を自分で書かないといけません。
とはいえこんな感じで書けばいいのでそんなに面倒ではないと思います。
この記事ではwatermillを使ってストリーム処理をするアプリを作ってみたいと思います。
例
今回は時刻をinput側のKafkaにproduceすると下図のようにその時刻にあった挨拶のstringに変換してoutput側のKafkaにproduceしてくれる処理を作ってみました。
コード
Claude Codeに書いてもらいました。
https://github.com/hiro949/watermillTutorial/tree/main
コード一部抜粋
先に説明したwatermillを使った実装の箇所を抜粋しました。
greeter domain.Greeterは 今回やりたい処理を行う自作のインターフェース
watermillのメリット
上記の抜粋個所の'application.Run'関数で
router.AddMiddleware(
middleware.Recoverer,
middleware.CorrelationID,
middleware.Timeout(10*time.Second),
middleware.NewCircuitBreaker(gobreaker.Settings{
Name: "greeting_handler",
MaxRequests: 3,
Timeout: 30 * time.Second,
}).Middleware,
middleware.Retry{
MaxRetries: 3,
InitialInterval: 100 * time.Millisecond,
Logger: a.logger,
}.Middleware,
)
としていますが、このようにrouter構造体にAddMiddlewareとするだけでリトライやパニック時の復旧など安定稼働させるための色んな処理を追加してくれます。
こうした処理を自分で実装しなくていいのは楽ちんですね。
用意されているミドルウェアには以下のようなものがあります。
| ミドルウェア | 用途 |
|---|---|
| Circuit Breaker | エラー連続時にファストフェイル、カスケード障害を防止 |
| Correlation | メッセージ間の相関IDを追跡・伝播 |
| Deduplicator | 重複メッセージの排除 |
| Delay On Error | エラー時に指数バックオフで遅延 |
| Duplicator | メッセージを2回処理(冪等性テスト用) |
| Ignore Errors | 特定のエラーを無視 |
| Instant Ack | 処理前に即座に Ack を返す |
| Poison | 処理失敗メッセージを別トピック(poison queue)へ退避 |
| RandomFail | テスト用にランダムでエラーを発生 |
| Recoverer | パニックからの回復(スタックトレース付き) |
| Retry | 失敗時のリトライ(バックオフ対応) |
| Throttle | メッセージ処理のレート制限 |
| Timeout | ハンドラの実行時間制限 |
コードの実行方法
docker deamonを起動していれば、
make integration-test
でKafkaとアプリを起動して
最初の図の4例を試してくれます。
make integration-test
========================================
Integration Test Suite
========================================
[Step 1/5] Cleaning up environment...
:
:
[Step 2/5] Building Docker images...
:
:
[Step 3/5] Starting services...
:
:
[Step 4/5] Running tests...
Sending test messages...
Sending: {"time":"2024-01-01T02:00:00"}
Sending: {"time":"2024-01-01T09:00:00"}
Sending: {"time":"2024-01-01T14:00:00"}
Sending: {"time":"2024-01-01T19:00:00"}
Waiting for processing...
Checking results...
Results received:
Good night!
Good evening!
Good morning!
Good afternoon!
PASS: 02:00:00 -> Good night!
PASS: 09:00:00 -> Good morning!
PASS: 14:00:00 -> Good afternoon!
PASS: 19:00:00 -> Good evening!
[Step 5/5] Test Summary
========================================
✓ All tests passed! (4/4)
[Cleanup] Stopping services...
✓ [Cleanup] Done
自分で好きなメッセージを投げたいときは
$ make start
でアプリとKafkaを起動して
$ make test-producer
Starting Kafka Producer...
Type your messages and press Enter. Press Ctrl+C to exit.
>{"time":"2024-01-01T09:00:00"}
>^Cmake: *** [Makefile:44: test-producer] Error 130
でデータを送り
$ make test-consumer
Starting Kafka Consumer...
Listening for messages... Press Ctrl+C to exit.
Good morning!
^CProcessed a total of 1 messages
make: *** [Makefile:47: test-consumer] Error 130
で処理後のメッセージを確認できます。
make stop
で最後にアプリとKafkaを停止します。
参考