1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

今日からは話題がガラリと変わりまして、Kafkaを使った非同期処理についてみていきます。

これまでgRPCを使って同期的にコミュニケーションし、データベースにデータを保存するという言ってしまえばよくあるパターンをみてきました。
一方で弊社ではKafkaに代表されるようなストリーム処理基盤を使ったイベントドリブンなアーキテクチャを採用しています。
ログとしてキューにプッシュしたデータをSource of Truthとし、たとえDBが飛んでもログを初めから読み直せば元に戻せるような設計を心がけています。
またマイクロサービス間のコミュニケーションとしてもイベントキューを介した非同期な方法を非常に多く用います。

基盤とするソフトウェアはApache Kafkaを使うチームが多いですがこれも特に決まりはなく、Benthosやその他のツールを使うチームもあります。

ログやそれを応用したイベントドリブンアーキテクチャについてはKafkaの開発元でもあるLinkedInのこちらの記事がとても参考になるので、少し長いですが興味のある方は読んでみてください。ログの考え方が変わるとてもいい記事です。

イベントフォーマット

さてではイベントキューにどういったフォーマットでデータを残すかですがここでもprotocol bufferを使いたいと思います。

protocol bufferを使うと言っても、フォーマットがチームごとにバラバラだと、"イベントが送信された時間"などドメインに関わらず共通で残したいデータがあるときなどあまり利便性がよくありません。
そんなとき各チームが自由に定義する実際のイベントを包みこみ、メタデータなどを与える共通のフォーマットがあると便利です。

弊社で使っているフォーマットはオープンソース化されていないのでそのまま紹介することができませんが、以下のような雰囲気です。

proto/envelope.proto
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";

message Event {
    string id = 1;
    google.protobuf.Timestamp timestamp = 2;
    google.protobuf.Any payload = 3;
}

実際のイベントはpayloadとして渡します。
今回はこちらを使っていきたいと思います。

Envelopeのコンパイル

以下の記述をMakefileのprotosタスクに追加してコンパイルします。

diff --git a/Makefile b/Makefile
index 0510df4..f5b531d 100644
--- a/Makefile
+++ b/Makefile
@@ -6,15 +6,22 @@ LINKFLAGS := -X main.gitHash=$(GIT_HASH)
 PROTO_DIR := ./proto
 GENERATED_DIR := ./internal/pb
 GENERATED_SERVICE_DIR := $(GENERATED_DIR)/service
+GENERATED_ENVELOPE_DIR := $(GENERATED_DIR)/envelope
+ENVELOPE_PROTO_MAPPINGS := Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types
 
 .PHONY: protos
 protos:
-	mkdir -pv $(GENERATED_DIR) $(GENERATED_SERVICE_DIR)
+	mkdir -pv $(GENERATED_DIR) $(GENERATED_SERVICE_DIR) $(GENERATED_ENVELOPE_DIR)
 	protoc \
 		-I $(PROTO_DIR) \
 		-I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
 		--gogoslick_out=plugins=grpc:$(GENERATED_SERVICE_DIR) \
 		service.proto
+	protoc \
+	  	-I $(PROTO_DIR) \
+		-I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
+		--gogoslick_out=paths=source_relative,$(ENVELOPE_PROTO_MAPPINGS):$(GENERATED_ENVELOPE_DIR) \
+		envelope.proto
 
 
 mockgen-install:

無事envelope.pb.goファイルがinternal/pb/envelope以下に作成されていればOKです。


今日は導入ということでここまで。
明日はKafkaをデプロイしたいと思います。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?