LoginSignup
1
0

More than 3 years have passed since last update.

machineryのDelayed Taskをテストで即時実行する

Last updated at Posted at 2019-06-27

問題

Goのジョブキューであるmachineryを使っていて、
Delayed Taskによりタイマーで発動するような処理があるとき、テストでは即時に実行させたい。

machineryにはGetPendingTasksというインターフェースがあり、これでTaskを取得することができると思ったら、以下のIssueにあるように現状でDelayed Taskは実装的にGetPendingTasksで取得できないらしい。

GetPendingTasks does not return tasks having ETA · Issue #342 · RichardKnop/machinery
https://github.com/RichardKnop/machinery/issues/342

しょうがないのでgoのテストでタイムトラベルするときと同様のパターンで、
テストのときはETAがnilになるようにする
(もっとスジの良いやり方があったら教えてください!)

対処

1. ETAをフィルターする関数を定義

import (
    "time"
)

var FilterETAFunc = FilterETA

func FilterETA(eta *time.Time) *time.Time {
    return eta
}

func DisableETA() {
    FilterETAFunc = func(eta *time.Time) *time.Time {
        return nil
    }
}

2. TaskにETAを指定するときに必ずフィルタをかます

import (
  "github.com/RichardKnop/machinery/v1/tasks"
)

eta := time.Now().UTC().Add(time.Second * 5)
signature := &tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
  ETA: FilterETAFunc(&eta)
}

asyncResult, err := server.SendTask(signature)

3. テストからはDisableETAを呼ぶ

some_test.go
func init() {
    utils.DisableETA()
}

4. Taskを実行する

GetPendingTasksしてworker.Processで実行する

func ProcessJobTasks(server *machinery.Server) error {
    worker := server.NewWorker("for_test", 1)
    tasks, err := server.GetBroker().GetPendingTasks(server.GetConfig().DefaultQueue)
    if err != nil {
        return err
    }
    for _, task := range tasks {
        err := worker.Process(task)
        if err != nil {
            return err
        }
    }
    return nil
}
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0