LoginSignup
1
0

チーム開発参加の記録【2023-06~2023-08】(5) gocronでスケジュール処理し、定期的にバッチジョブを起動してみた

Last updated at Posted at 2023-07-05

あるオンラインサロンでチーム開発に参加しています。
私はチーム03のバックエンド側メンバーに加わりました。
チーム03のバックエンドは、Go+Gin+sqlc、DBはPostgreSQLを使うことになりました。
チーム開発に参加しながら、私の学習の軌跡を記事にしていきます。

本シリーズのリンク

※ 本記事のソースコードは主に学習・検証目的で書いたものであり、プロダクトにそのまま使用できる品質を目指していません。

本記事で行うこと

  • DB設計をしていたら、定期的にバッチ処理を動かす必要があるかもしれないと思いました。
    cron形式でスケジュール処理し、定期的にバッチジョブを動かすウェブアプリを作ってみました。
  • スケジューラーとしてgocronを使用します。
  • バッチ処理の内容として、sqlcを使用し、UPSERTとトランザクション処理を試してみます。
  • バッチ処理の結果を、Webhookを利用してSlackに投稿してみます。

データベース側での準備

PostgreSQLで以下のSQLを実行して、fooテーブルをCREATEします。


CREATE TABLE foo(
    id            INTEGER NOT NULL PRIMARY KEY,
    col_timestamp TIMESTAMP WITH TIME ZONE NOT NULL
);

Go側の実装

Go+Gin+sqlc+gocronを使用して、fooテーブルへのUPSERTをスケジュール実行するWebアプリを実装します。


Project Root
  ├── db/
  │     ├── query/
  │     │     ├── query.sql
  │     │     └── schema.sql
  │     └── sqlc/
  │            ├── db.go
  │            ├── models.go
  │            └── query.sql.go
  ├── job/
  │     └── foo_job.go
  ├── app.go
  ├── go.mod
  └── sqlc.yaml

db/sqlc/配下のファイルは、sqlcのコードジェネレーターによって生成されるファイルで、人が以下の3ファイルを用意して「sqlc generate」コマンドを実行することにより生成されます。

  • db/query/schema.sql
  • db/query/query.sql
  • sqlc.yaml

各ソースコード

db/query/schema.sql

db/query/schema.sql

CREATE TABLE foo(
    id            INTEGER NOT NULL PRIMARY KEY,
    col_timestamp TIMESTAMP WITH TIME ZONE NOT NULL
);

db/query/query.sql

UPSERT処理です。

db/query/query.sql

/* name: UpsertFoo :one */
INSERT INTO foo(
    id,
    col_timestamp
)
VALUES
(
    @id,
    @col_timestamp
)
ON CONFLICT (id) DO
UPDATE SET
    col_timestamp = EXCLUDED.col_timestamp
RETURNING *;

sqlc.yaml

sqlc.yaml

version: "2"
sql:
  - engine: "postgresql"
    queries: "./db/query/query.sql"
    schema: "./db/query/schema.sql"
    gen:
      go:
        out: "./db/sqlc"
        package: "db"
        sql_package: "pgx/v5"
        emit_json_tags: true
        json_tags_case_style: "camel"

db/sqlc/*.go(ジェネレーターで生成)

ここまでのファイルの編集が終わったら、Project Rootで以下を実行して、ジェネレーター生成ファイルを作ります。


sqlc generate

すると、db/sqlc/配下の以下の3つのgoファイルが生成されました。

  • db/sqlc/db.go
db/sqlc/db.go

// Code generated by sqlc. DO NOT EDIT.
// versions:
//   sqlc v1.18.0

package db

import (
	"context"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgconn"
)

type DBTX interface {
	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
	QueryRow(context.Context, string, ...interface{}) pgx.Row
}

func New(db DBTX) *Queries {
	return &Queries{db: db}
}

type Queries struct {
	db DBTX
}

func (q *Queries) WithTx(tx pgx.Tx) *Queries {
	return &Queries{
		db: tx,
	}
}
  • db/sqlc/models.go
db/sqlc/models.go

// Code generated by sqlc. DO NOT EDIT.
// versions:
//   sqlc v1.18.0

package db

import (
	"github.com/jackc/pgx/v5/pgtype"
)

type Foo struct {
	ID           int32              `json:"id"`
	ColTimestamp pgtype.Timestamptz `json:"colTimestamp"`
}
  • db/sqlc/query.sql.go
db/sqlc/query.sql.go

// Code generated by sqlc. DO NOT EDIT.
// versions:
//   sqlc v1.18.0
// source: query.sql

package db

import (
	"context"

	"github.com/jackc/pgx/v5/pgtype"
)

const upsertFoo = `-- name: UpsertFoo :one
INSERT INTO foo(
    id,
    col_timestamp
)
VALUES
(
    $1,
    $2
)
ON CONFLICT (id) DO
UPDATE SET
    col_timestamp = EXCLUDED.col_timestamp
RETURNING id, col_timestamp
`

type UpsertFooParams struct {
	ID           int32              `json:"id"`
	ColTimestamp pgtype.Timestamptz `json:"colTimestamp"`
}

func (q *Queries) UpsertFoo(ctx context.Context, arg UpsertFooParams) (Foo, error) {
	row := q.db.QueryRow(ctx, upsertFoo, arg.ID, arg.ColTimestamp)
	var i Foo
	err := row.Scan(&i.ID, &i.ColTimestamp)
	return i, err
}

job/foo_job.go

バッチジョブを実装します。このgoファイル内の関数は以下の通りです。

  • FooJob関数:バッチジョブ本体。
    • fooテーブルのid=1のデータをUPSERTします。
    • col_timestamp列にはNow()を格納します。
    • トランザクションも使ってみました。
  • postSlack関数:FooJob関数から呼ばれます。
    • Webhookを利用して、バッチジョブの処理結果をSlackに投稿します。
job/foo_job.go

package job

import (
	"bytes"
	"context"
	db "cron_server/db/sqlc"
	"encoding/json"
	"fmt"
	"net/http"
	"time"

	"github.com/jackc/pgx/v5/pgtype"
	"github.com/jackc/pgx/v5/pgxpool"
)

func postSlack(text string) {
	const webhookURL = "your Webhook URL" // 書き換えてください

	type webhookData struct {
		Text string `json:"text"`
	}
	requestData := new(webhookData)
	requestData.Text = text
	requestJson, err := json.Marshal(requestData)
	if err != nil {
		fmt.Println(err.Error())
		return
	}

	response, err := http.Post(webhookURL, "application/json", bytes.NewBuffer(requestJson))
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	defer response.Body.Close()
}

func FooJob(pool *pgxpool.Pool) (db.Foo, error) {
	const id = 1
	now := pgtype.Timestamptz{Time: time.Now(), Valid: true}
	params := db.UpsertFooParams{ID: id, ColTimestamp: now}

	ctx := context.TODO()
	tx, err := pool.Begin(ctx)
	if err != nil {
		return db.Foo{}, err
	}
	defer tx.Rollback(ctx)

	queries := db.New(pool)
	txQuery := queries.WithTx(tx)

	resultSet, err := txQuery.UpsertFoo(context.Background(), params)
	if err == nil {
		err = tx.Commit(ctx)
		if err == nil {
			buf, err := json.Marshal(resultSet)
			if err == nil {
				postSlack(string(buf))
			}
		}
	}
	return resultSet, err
}

app.go

スケジュール処理とWeb APIを実装します。

スケジュール処理は以下の仕様にしました。

  • ウェブアプリを起動すると、自動的にスケジューラーも開始します。
  • UTC時間で日付が変わった0時1分にジョブを起動したいのですが、動作確認しづらいので、一旦その処理をコメントアウトしました。
    • 代わりに、3秒ごとにジョブを起動する処理を入れています。

APIは以下の仕様にしました。

  • runNow API(POST):すぐにバッチジョブを手動実行します。
  • pause(POST):スケジューラーを一時停止します。
    • なんらかのきっかけで、デプロイ先でWebアプリが再起動されると、スケジューラーも勝手に再開されます。
      • Webアプリが再起動されなければ、スケジューラーも停止したままです。
    • スケジューラーを完全に停止させたい場合は、ウェブアプリそのものを停止させる必要があります。
  • resume(POST):一時停止したスケジューラーを再開します。
  • isRunning(GET):スケジューラーが起動中ならtrue、一時停止中ならfalseを返します。
app.go

package main

import (
	"context"
	"cron_server/job"
	"net/http"
	"os"
	"strconv"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/go-co-op/gocron"
	"github.com/jackc/pgx/v5/pgxpool"
)

type httpError struct {
	Error string `json:"error"`
}

type httpMessage struct {
	Message string `json:"message"`
}

var pool *pgxpool.Pool
var scheduler *gocron.Scheduler

func startScheduler() {
	//scheduler.Cron("1 0 * * *").Do(job.FooJob, pool)
	scheduler.CronWithSeconds("*/3 * * * * *").Do(job.FooJob, pool)
	scheduler.StartAsync()
}

func stopScheduler() {
	scheduler.Stop()
	scheduler.Clear()
}

func runNow(c *gin.Context) {
	if scheduler.IsRunning() {
		c.JSON(http.StatusBadRequest, httpError{Error: "スケジューラーを一時停止してからCALLしてください"})
		return
	}

	resultSet, err := job.FooJob(pool)
	if err != nil {
		c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
		return
	}
	c.JSON(http.StatusOK, resultSet)
}

func pause(c *gin.Context) {
	if scheduler.IsRunning() {
		stopScheduler()
	}
	c.JSON(http.StatusOK, httpMessage{Message: "ok"})
}

func resume(c *gin.Context) {
	if !scheduler.IsRunning() {
		startScheduler()
	}
	c.JSON(http.StatusOK, httpMessage{Message: "ok"})
}

func isRunning(c *gin.Context) {
	isRunning := scheduler.IsRunning()
	c.JSON(http.StatusOK, httpMessage{Message: strconv.FormatBool(isRunning)})
}

func main() {
	connString := "user=postgres password=secret host=localhost port=5432 dbname=your_database sslmode=disable"
	var err error	pool, err = pgxpool.New(context.Background(), connString)
	if err != nil {
		print(err.Error())
		os.Exit(1)
	}
	defer pool.Close()

	scheduler = gocron.NewScheduler(time.UTC)
	startScheduler()

	router := gin.Default()
	router.POST("/runNow", runNow)
	router.POST("/pause", pause)
	router.POST("/resume", resume)
	router.GET("/isRunning", isRunning)

	router.Run("0.0.0.0:8080")
}

go.mod

go.mod

module cron_server

go 1.20

require (
	github.com/gin-gonic/gin v1.9.1
	github.com/go-co-op/gocron v1.30.1
	github.com/jackc/pgx/v5 v5.4.1
)

require (
	github.com/bytedance/sonic v1.9.1 // indirect
	github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
	github.com/gabriel-vasile/mimetype v1.4.2 // indirect
	github.com/gin-contrib/sse v0.1.0 // indirect
	github.com/go-playground/locales v0.14.1 // indirect
	github.com/go-playground/universal-translator v0.18.1 // indirect
	github.com/go-playground/validator/v10 v10.14.0 // indirect
	github.com/goccy/go-json v0.10.2 // indirect
	github.com/jackc/pgpassfile v1.0.0 // indirect
	github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
	github.com/jackc/puddle/v2 v2.2.0 // indirect
	github.com/json-iterator/go v1.1.12 // indirect
	github.com/klauspost/cpuid/v2 v2.2.4 // indirect
	github.com/leodido/go-urn v1.2.4 // indirect
	github.com/mattn/go-isatty v0.0.19 // indirect
	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
	github.com/modern-go/reflect2 v1.0.2 // indirect
	github.com/pelletier/go-toml/v2 v2.0.8 // indirect
	github.com/robfig/cron/v3 v3.0.1 // indirect
	github.com/rogpeppe/go-internal v1.11.0 // indirect
	github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
	github.com/ugorji/go/codec v1.2.11 // indirect
	go.uber.org/atomic v1.9.0 // indirect
	golang.org/x/arch v0.3.0 // indirect
	golang.org/x/crypto v0.9.0 // indirect
	golang.org/x/net v0.10.0 // indirect
	golang.org/x/sync v0.1.0 // indirect
	golang.org/x/sys v0.8.0 // indirect
	golang.org/x/text v0.9.0 // indirect
	google.golang.org/protobuf v1.30.0 // indirect
	gopkg.in/yaml.v3 v3.0.1 // indirect
)

実行結果

Webアプリを起動すると、スケジューラーが自動起動して、3秒ごとにバッチ処理が実行され、Slackに以下のように3秒ごとにメッセージが投稿されました。

image.png

Postmanを使ってisRunning APIを呼ぶと、trueが返りました。

image.png

pause APIを呼ぶと、スケジューラーが一時停止され、以下の画面でSlackへの投稿が止まりました。

image.png

再びisRunning APIを呼ぶと、falseが返りました。

image.png

スケジューラーが止まった状態でrunNow APIを呼んでジョブを手動実行すると、以下のようにジョブが1回実行され、結果が返りました。

image.png

同じ1件がSlackにも追加されました。

image.png

resume APIを呼んで、スケジューラーを再開させました。

image.png

すると、Slackへの3秒ごとの投稿も再開しました。

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0