1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

3-shakeAdvent Calendar 2024

Day 8

バッチ処理をCloud RunからCloud Run jobsに変更してみた話

Last updated at Posted at 2024-12-07

この記事は3-shake Advent Calendar 2024 シリーズ1の8日目の記事です

はじめましてあるいはこんにちは、@bayobayo0324 です。
株式会社スリーシェイクでクラウド型ETLツール「Reckoner(レコナー)」のプロダクトエンジニアしています。
https://reckoner.io/

2022年に入社して以来、2年連続でAdvent Calendarを書いていて正直今年は書かなくていいかな?と思っていたのですが、社内で盛り上がった空気にあてられて今年も書いてみようと筆を取りました。

何について書こうかと色々考えてみたんですが、結局例年通り仕事でやってみたことを備忘録代わりに書いてみようと思います。

どんな記事?

弊プロダクトで実行しているバッチ処理を、

  • Cloud Run上で動いているAPIをScheduler実行

から

  • Cloud Run jobsに作成したジョブをScheduler実行

に変えたお話です。

バッチ処理ってどんなもの?

弊プロダクトReckonerは各種連携先のAPIを叩いてデータを取得&加工&書き出しを行うETLツールです。
連携先にはOAuth認証で取得したアクセストークンを利用するものもあります。
一般的にアクセストークン、およびリフレッシュトークンには有効期限があり、アクセストークンを常に使い続けられるようにするにはリフレッシュトークンを用いたトークンリフレッシュ処理が必要です。
連携先サービスのトークン有効期限にあわせて、トークンリフレッシュ処理は定期的に実行しなければいけません。

今回はこの定期的に実行するトークンリフレッシュ処理(=バッチ処理)の処理方式を変更したというお話となります。

変更前

変更する前は、Cloud Run上にデプロイされたAPIをCloud Schedulerによってスケジュール実行していました。
なぜこうなっていたのか、私や他のチームメンバーが入社する前から実装されていたものなので理由はわかりませんが、Cloud Run jobsがGA前だったからでは?というのがチームメンバーの推測でした。
ちなみにGAは2023年4月となっています。
https://cloud.google.com/blog/products/serverless/cloud-run-jobs-and-second-generation-execution-environment-ga?hl=en

余談ですが変更する際にGoogleCloudのどのサービスが良いのか調べていたところ、弊社所属の @SatohJohn さんの記事が出てきたました笑
https://speakerdeck.com/satohjohn/gcptenohatutichu-li-hatanwokao-etemiru?slide=20
Johnさんも個人的おすすめでjobsを推しています✨

問題点

  1. バッチ処理用エンドポイントだけではなく、内部処理から呼ばれるAPIエンドポイントと同じサービスに乗っていたため、リソース/ログ監視やソース管理が複雑になってしまっていた
  2. バッチ処理なのにHTTPリクエストで実行しているのがなんか気持ち悪い()

2はお気持ち程度ですが、1については例えばバッチ処理がめちゃくちゃに重くなっているときに他のAPIリクエストが実行されて、Cloud Runの[最大同時実行数 × 最大インスタンス数]を超えたときにそのAPIリクエストが失敗してしまったり、その逆でバッチ処理実行時にAPIリクエスト側の処理によって負荷が高い状態となっている場合にトークンリフレッシュが実行できないしまうといったリスクも考えられるため、サービスとしての分割が必要な状態でした。

変更方法

変更前

変更前はAPIサービスとして起動しており、実装はGoのWebフレームワークGinを使っていました。
以下一部省略&マスキングしたルーティング設定です(xxxxxはマスキング)

func NewHTTPWorkflowPrivateHandlerV2(g *gin.Engine, uc usecase.WorkflowPrivateUsecase) {
	h := &HTTPWorkflowPrivateHandlerV2{
		workflowPrivateUC: uc,
	}

	v2 := g.Group("/api/v2")

	// tokens
	v2.GET("/token/xxxxx/:id", h.GetxxxxxToken)
	v2.POST("/token/xxxxx/refresh", h.RefreshxxxxxToken)
	v2.GET("/token/xxxxx/:id", h.GetxxxxxToken)
	v2.POST("/token/xxxxx/refresh", h.RefreshxxxxxToken)
	v2.GET("/token/xxxxx/:id", h.GetxxxxxToken)
	v2.POST("/token/xxxxx/refresh", h.RefreshxxxxxToken)
	v2.GET("/token/xxxxx/:id", h.GetxxxxxToken)
	v2.POST("/token/xxxxx/refresh", h.RefreshxxxxxToken)
	v2.GET("/token/xxxxx/:id", h.GetxxxxxToken)
	v2.GET("/token/xxxxx/:id", h.GetxxxxxToken)
	v2.GET("/token/xxxxx/:id", h.GetxxxxxToken)
	v2.GET("/token/xxxxx/:id", h.GetxxxxxToken)
}

ご覧のように、エンドポイントに/token/xxxxx/refreshがあり、handlerFuncのRefreshXXXXXTokenが呼び出される設定となっているのがわかると思います。
Cloude Schedulerからはこの/token/xxxxx/refreshに向かって定期的にPOSTリクエストを送っている設定となっていました。
ちなみにGetXXXXXTokenは内部処理用に使っているエンドポイント用のhandlerFuncです。
GetRefreshのセットで定義されていたものが途中からGetだけになっていますが、ここからRefreshをバッチ側に移したということになります。
変更前のもののバッチへの移動はまだされていないということですね…😇

変更後

バッチとしてCLI実行できるように、cobraを使って以下のように変更しました。
※こちらも一部省略&マスキングしています

batch.go
package cmd

import (
	"github.com/spf13/cobra"

	"github.com/3-shake/reckoner-cdp/server/cmd/batch"
)

var batchCmd = &cobra.Command{
	Use: "batch",
}

func init() {
	batchCmd.AddCommand(batch.NewCommandRefreshToken())
	rootCmd.AddCommand(batchCmd)
}

refrech_token.go
package batch

import (
	"github.com/3-shake/reckoner-cdp/server/cmd/batch/token"
	"github.com/spf13/cobra"
)

func NewCommandRefreshToken() *cobra.Command {
	cmd := &cobra.Command{
		Use: "refresh_token",
	}

	cmd.AddCommand(token.NewCommandxxxxx())
	return cmd
}
xxxxx.go
package token

import (
	"fmt"

	tokenHandler "github.com/3-shake/reckoner-cdp/server/batch/token/handler"
	"github.com/3-shake/reckoner-cdp/server/cmd/initialize"
	"github.com/3-shake/reckoner-cdp/server/project/logger"
	"github.com/spf13/cobra"
)

func NewCommandxxxxx() *cobra.Command {
	cmd := &cobra.Command{
		Use: "xxxxx",
		RunE: func(cmd *cobra.Command, args []string) error {
			date, err := cmd.Flags().GetString("datetime")
			if err != nil {
				return err
			}

			teamIDs, err := cmd.Flags().GetIntSlice("team_ids")
			if err != nil {
				return err
			}

			return RefreshxxxxxToken(date, teamIDs)
		},
	}

	cmd.Flags().String("datetime", "", "refresh target datetime format `YYYYYYMMDDHHMISS`")
	cmd.Flags().IntSlice("team_ids", []int{}, "refresh target team_ids, comma-separated ints")
	return cmd
}

func RefreshxxxxxToken(date string, teamIDs []int) error {
	uc := initialize.RefreshTokenUsecase()
	h := tokenHandler.NewBatchRefreshTokenHandler(uc)

	logger.Logger.Info(fmt.Sprintf("refresh_token target datetime: %s, teamIDs: %v", date, teamIDs))

	errs := h.Runxxxxx(date, teamIDs)

	if len(errs) != 0 {
		for _, err := range errs {
			logger.Logger.Error(err.Error())
		}

		return fmt.Errorf("refresh_token %d errors occurred", len(errs))
	} else {
		logger.Logger.Info("refresh_token finished")
		return nil
	}
}

h.Runxxxxx以下は変更前のhandlerFuncとほぼ同等の処理となっています

実行方法は下記のようになります。

$ batch refresh_token xxxxx

※xxxxxは連携先サービス名を入れています

バッチ処理に移すにあたり、テックリードとの相談やプラクティス記事を読んだりして考慮したのは下記です(参考:バッチ処理 プラクティス

  • 失敗したときに再実行可能か
  • 手動実行時に対象を限定できるか

再実行可能性は処理の中身になってしまうので割愛しますが、例えば処理の途中で失敗したときに、手動で続きから実行したいという場合はあると思います。
弊プロダクトはマルチテナントなので、データ上でテナントを識別するIDを指定することで、失敗したテナントから手動で再実行をかけられるように下記のコマンド引数を追加しています。

			teamIDs, err := cmd.Flags().GetIntSlice("team_ids")
			if err != nil {
				return err
			}

また、エラー等何らかの理由でデフォルトのリフレッシュ対象しきい値から外れてしまったものを救うために、リフレッシュ対象日時しきい値も指定できるようにしました。

			date, err := cmd.Flags().GetString("datetime")
			if err != nil {
				return err
			}

上記コマンド引数はまだ活躍の機会はないですが、いずれ必要になるときのために基本構成として入れておこう、という方針になっています。

そしてCloud Run jobsで実行するためにジョブとしてデプロイしているのがこちらです

deploy.yaml
  - id: refresh-token-xxxxx
    name: gcr.io/cloud-builders/gcloud
    entrypoint: bash
    args:
      - -c
      - |
        set -euo pipefail
        apt-get update
        apt-get install wget -y
        wget https://github.com/mikefarah/yq/releases/download/v4.30.3/yq_linux_amd64 -O /usr/bin/yq
        chmod +x /usr/bin/yq

        gcloud run jobs deploy refresh-token-xxxxx \
          --region=$(yq '.region' infra/server/cloudrun/prd.yaml) \
          --vpc-connector=$(yq '.vpc-connector' infra/server/cloudrun/prd.yaml) \
          --vpc-egress=$(yq '.vpc-egress' infra/server/cloudrun/prd.yaml) \
          --service-account=$(yq '.service-account' infra/server/cloudrun/prd.yaml) \
          --image=asia-docker.pkg.dev/${PROJECT_NAME}/reckoner/server:$TAG_NAME \
          --task-timeout=$(yq '.server-timeout' infra/server/cloudrun/prd.yaml) \
          --tasks=1 \
          --parallelism=1 \
          --max-retries=0 \
          --command=./batch \
          --args=refresh_token,xxxxx \
          $(for key in $(yq '(keys)[]' infra/server/env.yaml); do echo "--set-env-vars=^---^$key=$(yq .$key infra/server/env.yaml)"; done)
    waitFor:
      - "setup-server-env"

gcloud run jobs deployを使い、--imageや各種設定を追加しつつ、--commandbatch--argsrefresh_tokenxxxxx(連携先サービス名)を指定して起動しているのがわかると思います。

既にこの処理方式で2つのトークンリフレッシュバッチが動いていますが、今後追加されるものも全てこちらに寄せていく予定となっています。

終わりに

変更してみてですが、まずはやっぱりログ監視設定や確認が既存のAPIサービスと分離されたのが1番良かった点ですね🙌🏻
あとはソース上でも「バッチだからここ見ればいい」がすぐ分かるようになったのも嬉しかったです!
APIサービス側に実装された既存のバッチ処理を移行するのも時間を見つけて実施したいですが、なかなか機能開発の合間の時間を取れずにいます…ので、最後に宣伝させてください😂

スリーシェイクでは一緒に働く仲間を募集しています!
ぜひ私と一緒にReckonerを盛り上げていきましょう✨
Reckonerの他にもSRE支援やエージェントサービス、セキュリティSaas事業もやっています、ご興味ある方いたらぜひ!❤️
https://jobs-3-shake.com/

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?