あるオンラインサロンでチーム開発に参加しています。
私はチーム03のバックエンド側メンバーに加わりました。
チーム03のバックエンドは、Go+Gin+sqlc、DBはPostgreSQLを使うことになりました。
チーム開発に参加しながら、私の学習の軌跡を記事にしていきます。
本シリーズのリンク
- チーム開発参加の記録【2023-06~2023-08】(1) Go+Ginで画像をダウンロード/アップロードするAPIを作る
- チーム開発参加の記録【2023-06~2023-08】(2) sqlc + jackc/pgx/v5(v5.4.0)を使ってみた
- チーム開発参加の記録【2023-06~2023-08】(3) sqlc + jackc/pgx/v5(v5.4.1)からPostgreSQLの複合型の配列を使ってみた
- チーム開発参加の記録【2023-06~2023-08】(4) sqlc + jackc/pgx/v5 からPostgreSQLの複合型の配列を更新してみた
- チーム開発参加の記録【2023-06~2023-08】(5) gocronでスケジュール処理し、定期的にバッチジョブを起動してみた
- チーム開発参加の記録【2023-06~2023-08】(6) PostgreSQLの複合型の配列の更新について、もう少し煮詰める
※ 本記事のソースコードは主に学習・検証目的で書いたものであり、プロダクトにそのまま使用できる品質を目指していません。
本記事で行うこと
- DB設計をしていたら、定期的にバッチ処理を動かす必要があるかもしれないと思いました。
cron形式でスケジュール処理し、定期的にバッチジョブを動かすウェブアプリを作ってみました。 - スケジューラーとしてgocronを使用します。
- バッチ処理の内容として、sqlcを使用し、UPSERTとトランザクション処理を試してみます。
- バッチ処理の結果を、Webhookを利用してSlackに投稿してみます。
データベース側での準備
PostgreSQLで以下のSQLを実行して、fooテーブルをCREATEします。
CREATE TABLE foo(
id INTEGER NOT NULL PRIMARY KEY,
col_timestamp TIMESTAMP WITH TIME ZONE NOT NULL
);
Go側の実装
Go+Gin+sqlc+gocronを使用して、fooテーブルへのUPSERTをスケジュール実行するWebアプリを実装します。
Project Root
├── db/
│ ├── query/
│ │ ├── query.sql
│ │ └── schema.sql
│ └── sqlc/
│ ├── db.go
│ ├── models.go
│ └── query.sql.go
├── job/
│ └── foo_job.go
├── app.go
├── go.mod
└── sqlc.yaml
db/sqlc/配下のファイルは、sqlcのコードジェネレーターによって生成されるファイルで、人が以下の3ファイルを用意して「sqlc generate」コマンドを実行することにより生成されます。
- db/query/schema.sql
- db/query/query.sql
- sqlc.yaml
各ソースコード
db/query/schema.sql
CREATE TABLE foo(
id INTEGER NOT NULL PRIMARY KEY,
col_timestamp TIMESTAMP WITH TIME ZONE NOT NULL
);
db/query/query.sql
UPSERT処理です。
/* name: UpsertFoo :one */
INSERT INTO foo(
id,
col_timestamp
)
VALUES
(
@id,
@col_timestamp
)
ON CONFLICT (id) DO
UPDATE SET
col_timestamp = EXCLUDED.col_timestamp
RETURNING *;
sqlc.yaml
version: "2"
sql:
- engine: "postgresql"
queries: "./db/query/query.sql"
schema: "./db/query/schema.sql"
gen:
go:
out: "./db/sqlc"
package: "db"
sql_package: "pgx/v5"
emit_json_tags: true
json_tags_case_style: "camel"
db/sqlc/*.go(ジェネレーターで生成)
ここまでのファイルの編集が終わったら、Project Rootで以下を実行して、ジェネレーター生成ファイルを作ります。
sqlc generate
すると、db/sqlc/配下の以下の3つのgoファイルが生成されました。
- db/sqlc/db.go
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.18.0
package db
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)
type DBTX interface {
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
QueryRow(context.Context, string, ...interface{}) pgx.Row
}
func New(db DBTX) *Queries {
return &Queries{db: db}
}
type Queries struct {
db DBTX
}
func (q *Queries) WithTx(tx pgx.Tx) *Queries {
return &Queries{
db: tx,
}
}
- db/sqlc/models.go
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.18.0
package db
import (
"github.com/jackc/pgx/v5/pgtype"
)
type Foo struct {
ID int32 `json:"id"`
ColTimestamp pgtype.Timestamptz `json:"colTimestamp"`
}
- db/sqlc/query.sql.go
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.18.0
// source: query.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const upsertFoo = `-- name: UpsertFoo :one
INSERT INTO foo(
id,
col_timestamp
)
VALUES
(
$1,
$2
)
ON CONFLICT (id) DO
UPDATE SET
col_timestamp = EXCLUDED.col_timestamp
RETURNING id, col_timestamp
`
type UpsertFooParams struct {
ID int32 `json:"id"`
ColTimestamp pgtype.Timestamptz `json:"colTimestamp"`
}
func (q *Queries) UpsertFoo(ctx context.Context, arg UpsertFooParams) (Foo, error) {
row := q.db.QueryRow(ctx, upsertFoo, arg.ID, arg.ColTimestamp)
var i Foo
err := row.Scan(&i.ID, &i.ColTimestamp)
return i, err
}
job/foo_job.go
バッチジョブを実装します。このgoファイル内の関数は以下の通りです。
- FooJob関数:バッチジョブ本体。
- fooテーブルのid=1のデータをUPSERTします。
- col_timestamp列にはNow()を格納します。
- トランザクションも使ってみました。
- postSlack関数:FooJob関数から呼ばれます。
- Webhookを利用して、バッチジョブの処理結果をSlackに投稿します。
package job
import (
"bytes"
"context"
db "cron_server/db/sqlc"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
)
func postSlack(text string) {
const webhookURL = "your Webhook URL" // 書き換えてください
type webhookData struct {
Text string `json:"text"`
}
requestData := new(webhookData)
requestData.Text = text
requestJson, err := json.Marshal(requestData)
if err != nil {
fmt.Println(err.Error())
return
}
response, err := http.Post(webhookURL, "application/json", bytes.NewBuffer(requestJson))
if err != nil {
fmt.Println(err.Error())
return
}
defer response.Body.Close()
}
func FooJob(pool *pgxpool.Pool) (db.Foo, error) {
const id = 1
now := pgtype.Timestamptz{Time: time.Now(), Valid: true}
params := db.UpsertFooParams{ID: id, ColTimestamp: now}
ctx := context.TODO()
tx, err := pool.Begin(ctx)
if err != nil {
return db.Foo{}, err
}
defer tx.Rollback(ctx)
queries := db.New(pool)
txQuery := queries.WithTx(tx)
resultSet, err := txQuery.UpsertFoo(context.Background(), params)
if err == nil {
err = tx.Commit(ctx)
if err == nil {
buf, err := json.Marshal(resultSet)
if err == nil {
postSlack(string(buf))
}
}
}
return resultSet, err
}
app.go
スケジュール処理とWeb APIを実装します。
スケジュール処理は以下の仕様にしました。
- ウェブアプリを起動すると、自動的にスケジューラーも開始します。
- UTC時間で日付が変わった0時1分にジョブを起動したいのですが、動作確認しづらいので、一旦その処理をコメントアウトしました。
- 代わりに、3秒ごとにジョブを起動する処理を入れています。
APIは以下の仕様にしました。
- runNow API(POST):すぐにバッチジョブを手動実行します。
- pause(POST):スケジューラーを一時停止します。
- なんらかのきっかけで、デプロイ先でWebアプリが再起動されると、スケジューラーも勝手に再開されます。
- Webアプリが再起動されなければ、スケジューラーも停止したままです。
- スケジューラーを完全に停止させたい場合は、ウェブアプリそのものを停止させる必要があります。
- なんらかのきっかけで、デプロイ先でWebアプリが再起動されると、スケジューラーも勝手に再開されます。
- resume(POST):一時停止したスケジューラーを再開します。
- isRunning(GET):スケジューラーが起動中ならtrue、一時停止中ならfalseを返します。
package main
import (
"context"
"cron_server/job"
"net/http"
"os"
"strconv"
"time"
"github.com/gin-gonic/gin"
"github.com/go-co-op/gocron"
"github.com/jackc/pgx/v5/pgxpool"
)
type httpError struct {
Error string `json:"error"`
}
type httpMessage struct {
Message string `json:"message"`
}
var pool *pgxpool.Pool
var scheduler *gocron.Scheduler
func startScheduler() {
//scheduler.Cron("1 0 * * *").Do(job.FooJob, pool)
scheduler.CronWithSeconds("*/3 * * * * *").Do(job.FooJob, pool)
scheduler.StartAsync()
}
func stopScheduler() {
scheduler.Stop()
scheduler.Clear()
}
func runNow(c *gin.Context) {
if scheduler.IsRunning() {
c.JSON(http.StatusBadRequest, httpError{Error: "スケジューラーを一時停止してからCALLしてください"})
return
}
resultSet, err := job.FooJob(pool)
if err != nil {
c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
return
}
c.JSON(http.StatusOK, resultSet)
}
func pause(c *gin.Context) {
if scheduler.IsRunning() {
stopScheduler()
}
c.JSON(http.StatusOK, httpMessage{Message: "ok"})
}
func resume(c *gin.Context) {
if !scheduler.IsRunning() {
startScheduler()
}
c.JSON(http.StatusOK, httpMessage{Message: "ok"})
}
func isRunning(c *gin.Context) {
isRunning := scheduler.IsRunning()
c.JSON(http.StatusOK, httpMessage{Message: strconv.FormatBool(isRunning)})
}
func main() {
connString := "user=postgres password=secret host=localhost port=5432 dbname=your_database sslmode=disable"
var err error pool, err = pgxpool.New(context.Background(), connString)
if err != nil {
print(err.Error())
os.Exit(1)
}
defer pool.Close()
scheduler = gocron.NewScheduler(time.UTC)
startScheduler()
router := gin.Default()
router.POST("/runNow", runNow)
router.POST("/pause", pause)
router.POST("/resume", resume)
router.GET("/isRunning", isRunning)
router.Run("0.0.0.0:8080")
}
go.mod
module cron_server
go 1.20
require (
github.com/gin-gonic/gin v1.9.1
github.com/go-co-op/gocron v1.30.1
github.com/jackc/pgx/v5 v5.4.1
)
require (
github.com/bytedance/sonic v1.9.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.14.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
実行結果
Webアプリを起動すると、スケジューラーが自動起動して、3秒ごとにバッチ処理が実行され、Slackに以下のように3秒ごとにメッセージが投稿されました。
Postmanを使ってisRunning APIを呼ぶと、trueが返りました。
pause APIを呼ぶと、スケジューラーが一時停止され、以下の画面でSlackへの投稿が止まりました。
再びisRunning APIを呼ぶと、falseが返りました。
スケジューラーが止まった状態でrunNow APIを呼んでジョブを手動実行すると、以下のようにジョブが1回実行され、結果が返りました。
同じ1件がSlackにも追加されました。
resume APIを呼んで、スケジューラーを再開させました。
すると、Slackへの3秒ごとの投稿も再開しました。