問題
Goのジョブキューであるmachineryを使っていて、
Delayed Taskによりタイマーで発動するような処理があるとき、テストでは即時に実行させたい。
machineryにはGetPendingTasksというインターフェースがあり、これでTaskを取得することができると思ったら、以下のIssueにあるように現状でDelayed Taskは実装的にGetPendingTasksで取得できないらしい。
GetPendingTasks does not return tasks having ETA · Issue #342 · RichardKnop/machinery
https://github.com/RichardKnop/machinery/issues/342
しょうがないのでgoのテストでタイムトラベルするときと同様のパターンで、
テストのときはETAがnilになるようにする
(もっとスジの良いやり方があったら教えてください!)
対処
1. ETAをフィルターする関数を定義
import (
"time"
)
var FilterETAFunc = FilterETA
func FilterETA(eta *time.Time) *time.Time {
return eta
}
func DisableETA() {
FilterETAFunc = func(eta *time.Time) *time.Time {
return nil
}
}
2. TaskにETAを指定するときに必ずフィルタをかます
import (
"github.com/RichardKnop/machinery/v1/tasks"
)
eta := time.Now().UTC().Add(time.Second * 5)
signature := &tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 1,
},
{
Type: "int64",
Value: 1,
},
},
ETA: FilterETAFunc(&eta)
}
asyncResult, err := server.SendTask(signature)
3. テストからはDisableETAを呼ぶ
some_test.go
func init() {
utils.DisableETA()
}
4. Taskを実行する
GetPendingTasksしてworker.Processで実行する
func ProcessJobTasks(server *machinery.Server) error {
worker := server.NewWorker("for_test", 1)
tasks, err := server.GetBroker().GetPendingTasks(server.GetConfig().DefaultQueue)
if err != nil {
return err
}
for _, task := range tasks {
err := worker.Process(task)
if err != nil {
return err
}
}
return nil
}