0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

watermillを使ってイベント駆動処理をしてみる

Last updated at Posted at 2026-01-30

watermill

でKafkaにメッセージを送受信して遊んでみましたが、よくある使い方としてはメッセージが送られてきたらそれを随時処理するアプリでストリーミング処理するケースです。

go言語だとwatermillというフレームワークを使うとこういう処理を簡単に書けます。

router構造体に

  • subscriber(メッセージを取ってくるインターフェース)
  • publisher(メッセージを送るインターフェース)
  • messageHandler(自分がやりたいメッセージの処理を実装したインターフェース)

を渡し、router.Runを実行するだけでいいです。

ただ、入出力する先のコンポーネントはKafka以外にもRabbitMQなど色々と対応していて、自分が使うキューやデータベースに応じてwatermillパッケージの関数を使って
subscriber,publisher関数を自分で書かないといけません。

とはいえこんな感じで書けばいいのでそんなに面倒ではないと思います。

この記事ではwatermillを使ってストリーム処理をするアプリを作ってみたいと思います。

今回は時刻をinput側のKafkaにproduceすると下図のようにその時刻にあった挨拶のstringに変換してoutput側のKafkaにproduceしてくれる処理を作ってみました。

コード

Claude Codeに書いてもらいました。
https://github.com/hiro949/watermillTutorial/tree/main

コード一部抜粋

先に説明したwatermillを使った実装の箇所を抜粋しました。

greeter domain.Greeterは 今回やりたい処理を行う自作のインターフェース

watermillのメリット

上記の抜粋個所の'application.Run'関数で

router.AddMiddleware(
		middleware.Recoverer,
		middleware.CorrelationID,
		middleware.Timeout(10*time.Second),
		middleware.NewCircuitBreaker(gobreaker.Settings{
			Name:        "greeting_handler",
			MaxRequests: 3,
			Timeout:     30 * time.Second,
		}).Middleware,
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: 100 * time.Millisecond,
			Logger:          a.logger,
		}.Middleware,
	)

としていますが、このようにrouter構造体にAddMiddlewareとするだけでリトライやパニック時の復旧など安定稼働させるための色んな処理を追加してくれます。

こうした処理を自分で実装しなくていいのは楽ちんですね。
用意されているミドルウェアには以下のようなものがあります。

ミドルウェア 用途
Circuit Breaker エラー連続時にファストフェイル、カスケード障害を防止
Correlation メッセージ間の相関IDを追跡・伝播
Deduplicator 重複メッセージの排除
Delay On Error エラー時に指数バックオフで遅延
Duplicator メッセージを2回処理(冪等性テスト用)
Ignore Errors 特定のエラーを無視
Instant Ack 処理前に即座に Ack を返す
Poison 処理失敗メッセージを別トピック(poison queue)へ退避
RandomFail テスト用にランダムでエラーを発生
Recoverer パニックからの回復(スタックトレース付き)
Retry 失敗時のリトライ(バックオフ対応)
Throttle メッセージ処理のレート制限
Timeout ハンドラの実行時間制限

コードの実行方法

docker deamonを起動していれば、

make integration-test

でKafkaとアプリを起動して
最初の図の4例を試してくれます。

 make integration-test
========================================
     Integration Test Suite
========================================

[Step 1/5] Cleaning up environment...
:
:

[Step 2/5] Building Docker images...
:
:

[Step 3/5] Starting services...
:
:

[Step 4/5] Running tests...

Sending test messages...
  Sending: {"time":"2024-01-01T02:00:00"}
  Sending: {"time":"2024-01-01T09:00:00"}
  Sending: {"time":"2024-01-01T14:00:00"}
  Sending: {"time":"2024-01-01T19:00:00"}

Waiting for processing...

Checking results...

Results received:
Good night!
Good evening!
Good morning!
Good afternoon!

  PASS: 02:00:00 -> Good night!
  PASS: 09:00:00 -> Good morning!
  PASS: 14:00:00 -> Good afternoon!
  PASS: 19:00:00 -> Good evening!

[Step 5/5] Test Summary
========================================
✓ All tests passed! (4/4)

[Cleanup] Stopping services...
✓ [Cleanup] Done

自分で好きなメッセージを投げたいときは

$ make start

でアプリとKafkaを起動して

$ make test-producer
Starting Kafka Producer...
Type your messages and press Enter. Press Ctrl+C to exit.

>{"time":"2024-01-01T09:00:00"}
>^Cmake: *** [Makefile:44: test-producer] Error 130

でデータを送り

$ make test-consumer
Starting Kafka Consumer...
Listening for messages... Press Ctrl+C to exit.

Good morning!
^CProcessed a total of 1 messages
make: *** [Makefile:47: test-consumer] Error 130

で処理後のメッセージを確認できます。

make stop

で最後にアプリとKafkaを停止します。

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?