はじめに
突然ですが、goroutineを扱うのって難しくないですか?
処理が並行で走るので、「データ競合大丈夫なのか?」「このchannelどの関数で使われてるんだっけ?」など、いろいろ把握するのが大変だなと思っています。そのためgoroutineはできるだけ使わないようにしています。ただ、それでも使わざるを得ない場面が出てくるのは事実です。
そこで今回は、goroutineを使うべきユースケースについて考えてみようと思います。要件によってどのように使っていくのかを見ていこうと思います。この記事はあくまで私僕の個人的な意見モリモリな内容ですので、1個人のエッセイというかポエムとして気軽に楽しんでいただければ!
ユースケース
今回はバッチ処理について考えてみましょう。
サービスAから定期的に顧客情報を取得し、自社のDBに反映させたい。
図にするとこのような形になります。現段階ではシンプルですが、要件が増えると複雑になりそうです。まずは、このシンプルなバッチ処理を実装してみます。
ユースケース1: シンプルなバッチ処理
まずはデータの取り込みに焦点を当てた実装を行います。
必要な構造体やメソッドを定義します。DB関連の処理、外部APIを叩くクライアント、そしてドメインモデルです。
※ 実際には別のファイルに定義したり、設計を丁寧に行うべきですが、今回は本題から外れるためその部分は割愛します。
// ----------------------------
// DB関連の処理
// ----------------------------
type customerRepository struct {
}
func (r *customerRepository) Save(ctx context.Context, userID string, cd []customer) error {
// データ保存の処理
return nil
}
type externalCredentialRepository struct {
}
func (repo *externalCredentialRepository) fetchUserKeys(ctx context.Context) (map[string]string, error) {
// 本来はDBからUserのKeyを取得する処理が入る
return map[string]string{
"user1": "key1",
"user2": "key2",
"user3": "key3",
}, nil
}
// ----------------------------
// 外部API関連の処理
// ----------------------------
type systemClient struct {
key string
}
func (c *systemClient) fetchData(ctx context.Context) ([]customer, error) {
// 外部APIからの取得処理
return nil, nil
}
// ----------------------------
// ドメインモデル
// ----------------------------
type customer struct {
ID int
Name string
// その他顧客に関するフィールドが存在する。
}
それでは、今回の要件を満たす実装をしてみましょう。
非常にシンプルで、外部APIを順番に叩いてDBに保存するだけの処理です。
package main
import (
"context"
"log"
)
func main() {
ctx := context.Background()
customerRepo := customerRepository{}
credRepo := externalCredentialRepository{}
// DBからAPIをたたくのに必要なkeyをuserごとに取得
userKeys, err := credRepo.fetchUserKeys(ctx)
if err != nil {
return
}
// 外部APIを叩いて、自社DBに保存
for id, key := range userKeys {
client := systemClient{key: key}
data, err := client.fetchData(ctx)
if err != nil {
log.Fatal(err)
continue
}
if err := customerRepo.Save(ctx, id, data); err != nil {
log.Fatal(err)
}
}
}
ユースケース2: 処理速度に制限のあるバッチ処理
前のユースケースではデータ量が増えてきた場合、処理速度を上げる必要が出てくるかもしれません。
サービスAから定期的に顧客情報を取得し、自社のDBに反映させたい。処理は○○秒以内に完了すること。
速度向上のために、どこがボトルネックかを考えます。外部APIにはレートリミットがある可能性があるため、改善の余地が少ないですができるのであれば検討すべきです。コントローラブルな実装で高速化できそうな箇所としては、以下の2つが考えられます。
- ユーザーごとに並行処理ができそう
- Saveメソッド内の処理がN + 1問題になっているなら、バルクインサートに切り替えられる
今回は、Saveメソッドのさらなる高速化が難しいと仮定し、ユーザーごとの並行処理による高速化を検討してみます。
イメージとしては以下のようになります。
DBやAPIのClient等の構造体はユースケース1と変わらないため、mainの処理のみ記載しています。
func main() {
ctx := context.Background()
customerRepo := customerRepository{}
credRepo := externalCredentialRepository{}
// DBからAPIをたたくのに必要なkeyをuserごとに取得
userKeys, err := credRepo.fetchUserKeys(ctx)
if err != nil {
log.Println(err)
return
}
wg := sync.WaitGroup{}
// 外部APIを叩いて、自社DBに保存
for id, key := range userKeys {
wg.Add(1)
go func(id, key string) {
defer wg.Done()
defer func() {
if err := recover(); err != nil {
log.Println(err)
}
}()
// ここで外部APIを叩いて、データを取得し、DBに保存する
client := systemClient{key: key}
data, err := client.fetchData(ctx)
if err != nil {
log.Println(err)
return
}
if err := customerRepo.Save(ctx, id, data); err != nil {
log.Println(err)
}
}(id, key)
}
wg.Wait()
}
エラーをまとめて処理したい場合は、errgroupパッケージを使用するのも良いでしょう。
この方法で、ユーザーごとの処理を並行して進めることができました。
(要件の秒数以内にも収まったとしましょう笑)
ユースケース3: 複数システムからのデータ取り込みと重複解消
これまで、外部システムが1つだけでしたが、新たに複数のシステムからデータを取り込む必要が出てきました。さらに、複数システムから取り込んだ顧客データに重複があった場合、それを解消してデータを保存するという要件も加わりました。
サービスA, B, Cから定期的に顧客情報を取得し、自社のDBに反映させたい。各サービスから取り込んだ顧客データは重複を取り除くこと。○○秒以内に処理を完了させたい。
このような要件が加わると、従来のソースコードでは処理がさらに遅くなりますし、将来的にシステムが増える可能性があるため、並行で処理するのが良さそうです。
そこで、この状況に対応する実装を検討します。並行処理の方法はいくつか考えられます。
- ユーザーごとのgoroutine内で、システムごとのgoroutineを実行する
- システムごとのgoroutine内で、ユーザーごとのgoroutineを実行する
- ユーザー × システムごとにgoroutineを実行する
1と2の方法では、goroutineがネストされてしまい、処理が複雑化する可能性があります。そのため、今回は3つ目の方法で効率的に処理できるか検討してみます。
最終的には取得したデータからシステムごとの重複を除外したいため、ユーザーごとに顧客のデータを保持しておくと良さそうです。イメージとしてはこんな感じ。
具体的な実装に移る前に、関連する構造体を一部変更したので、その点を記述します。
// ----------------------------
// DB関連の処理
// ----------------------------
type customerRepository struct {
}
func (r *customerRepository) Save(ctx context.Context, userID string, cd []customer) error {
// データ保存の処理
return nil
}
type externalCredentialRepository struct {
}
func (repo *externalCredentialRepository) fetchUserSystemKeys(ctx context.Context) ([]SystemKey, error) {
// 本来はDBから情報を取得
return []SystemKey{
{UserID: "user1", SystemID: SystemA, Key: "key1-a"},
{UserID: "user1", SystemID: SystemB, Key: "key1-b"},
{UserID: "user1", SystemID: SystemA, Key: "key1-c"},
{UserID: "user2", SystemID: SystemB, Key: "key2-a"},
{UserID: "user3", SystemID: SystemC, Key: "key3-a"},
{UserID: "user3", SystemID: SystemC, Key: "key3-b"},
}, nil
}
type SystemKey struct {
UserID string
SystemID SystemID
Key string
}
type SystemID int
const (
SystemA SystemID = iota + 1
SystemB
SystemC
)
// ----------------------------
// 外部API関連の処理
// ----------------------------
type systemAClient struct {
key string
}
func (c *systemAClient) fetchData(ctx context.Context, userChan chan<- map[string][]customer) error {
// 本当は外部APIを叩いてデータを取得するが、今回は擬似的なデータをチャネルに送信
userChan <- map[string][]customer{"user1": {{ID: 1, Name: "Alice"}}}
return nil
}
type systemBClient struct {
key string
}
func (c *systemBClient) fetchData(ctx context.Context, userChan chan<- map[string][]customer) error {
// 本当は外部APIを叩いてデータを取得するが、今回は擬似的なデータをチャネルに送信
userChan <- map[string][]customer{"user2": {{ID: 2, Name: "Bob"}}}
return nil
}
type systemCClient struct {
key string
}
func (c *systemCClient) fetchData(ctx context.Context, userChan chan<- map[string][]customer) error {
// 本当は外部APIを叩いてデータを取得するが、今回は擬似的なデータをチャネルに送信
userChan <- map[string][]customer{"user3": {{ID: 3, Name: "Charlie"}}}
return nil
}
type systemClient interface {
fetchData(ctx context.Context, userChan chan<- map[string][]customer) error
}
func newSystemClient(systemID SystemID, key string) (systemClient, error) {
switch systemID {
case SystemA:
return &systemAClient{key: key}, nil
case SystemB:
return &systemBClient{key: key}, nil
case SystemC:
return &systemCClient{key: key}, nil
default:
return nil, errors.New("invalid system id")
}
}
// ----------------------------
// ドメインモデル
// ----------------------------
type customer struct {
ID int
Name string
}
type customers []customer
func (c customers) Unique() customers {
// 重複を削除する処理
return []customer{}
}
主に変更した点としては以下の部分です。
- 各サービスごとに異なるクライアントを作成
- 各クライアントのfetchDataメソッドにチャネルを引数として追加
- ユーザーの認証キー取得処理の型を変更
この実装で、複数システムからのデータ取得と重複除外を効率的に行う準備が整いました。それでは、この構造を活用した並行処理の実装を見ていきましょう。
func main() {
ctx := context.Background()
customerRepo := customerRepository{}
credRepo := externalCredentialRepository{}
// DBからAPIをたたくのに必要なkeyをuserごとに取得
usKeys, err := credRepo.fetchUserSystemKeys(ctx)
if err != nil {
log.Println(err)
return
}
wg := sync.WaitGroup{}
userChan := make(chan map[string][]customer, len(usKeys))
for _, usKey := range usKeys {
wg.Add(1)
go func(usKey SystemKey) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
log.Println(r)
return
}
}()
client, err := newSystemClient(usKey.SystemID, usKey.Key)
if err != nil {
log.Println(err)
return
}
if err := client.fetchData(ctx, userChan); err != nil {
log.Println(err)
}
}(usKey)
}
// 全ての取得処理が完了したらチャネルをclose
go func() {
wg.Wait()
close(userChan)
}()
// チャネルからデータを受信して集約
var mu sync.Mutex
userDataMap := make(map[string][]customer)
for userData := range userChan {
mu.Lock()
for id, data := range userData {
userDataMap[id] = append(userDataMap[id], data...)
}
mu.Unlock()
}
// ユニーク処理と保存
for id, data := range userDataMap {
if err := customerRepo.Save(ctx, id, (customers)(data).Unique()); err != nil {
log.Println(err)
}
}
}
この実装では、全てのキーに対して並行で処理を実行し、fetchData
メソッド内でユーザーごとの顧客データを保持しています。それをuserDataMap
に格納し、最終的に重複を除外した後でSave
メソッドに渡します。これで要件を満たす実装ができました。
※
おわりに
まだ深掘りできる点はいくつかありますが、「ユースケースから考えるgoroutine」は終わりにしようと思います。
ちなみに、深堀りできるポイントとしては、
- goroutineの並行処理の数に制限を設けたい場合
- 外部システムのレートリミットがAPI単位でなかった場合
- 外部APIの処理でエラーが起きた際に、諸々検討したい場合
などなど。。
新しい要件が加わると当然新しい課題が出てきますし、何かを実装するとそこには新しい課題が生まれることもあります。
そのような課題に対して、できるだけシンプルに実装できると良いな〜と思う今日この頃です。
ということで最後まで読んでいただきありがとうございました!!