Celery の Go 版ライブラリがあったので使ってみました。
Celery とは
元々は Python のライブラリで、ジョブキューイングを簡単に実装できるフレームワークです( Ruby の世界で言う Sidekiq のようなもの)。
当社の主力商品いい物件 One のバックエンドにも使われています。
仕組みとしては以下のような感じです。
- Publisher は Broker に対して実行したいタスク情報を書き込む
- Consumer は Broker をポーリングしてタスクが投げ込まれたらそれを処理
- Consumer は処理が終わったら結果を Backend に返す(返さないことも可能)
- Publisher は Backend をポーリングして結果が返ってきたらそれを処理
こうしておくと非同期で並列の処理ができたり、 Consumer を増やすことでスケールアップできたり、ジョブの流量制御を行ったりすることができます。
Broker には RabbitMQ や Amazon SQS 等の MQ を始め、 Redis などの KVS を、 Backend には Redis 等の KVS や RDBMS を始め、 RabbitMQ などの MQ も使えます。
Celery に入門したい人は以下をご覧ください。
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html
gocelery とは
Celery の Go 版ライブラリです。
https://github.com/gocelery/gocelery
Celery はプロトコルさえ共通であれば Publisher 、 Consumer を Python 以外の言語で書いても良いはずなので、今回は Python との接続も試してみます。
インストール
go get github.com/gocelery/gocelery
ただし普通に入れると go.uuid のバグでエラーになるので、 go.uuid のバージョンに master を指定してください。
https://github.com/satori/go.uuid/issues/70
実装してみる
Publisher
- Broker と Backend にはとりあえず Redis を使用
-
CeleryClient.Delay
でタスクを投げる - 返り値は
AsyncResult
型になり、Get
メソッドで結果を待ちます
package main
import (
"github.com/gocelery/gocelery"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"net/http"
"time"
)
func callTask(c echo.Context) error {
cli, err := gocelery.NewCeleryClient(
gocelery.NewRedisCeleryBroker("redis://localhost:6379"),
gocelery.NewRedisCeleryBackend("redis://localhost:6379"),
1,
)
if err != nil {
return err
}
asyncResult, err := cli.Delay("worker.execute", c.Param("key"))
if err != nil {
return err
}
res, err := asyncResult.Get(10 * time.Second)
if err != nil {
return err
}
return c.JSON(http.StatusOK, map[string]interface{}{"result": res})
}
func main() {
e := echo.New()
e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
Format: "request:\"${method} ${uri}\" status:${status} latency:${latency} (${latency_human}) bytes:${bytes_out}\n",
}))
e.GET("/:key", callTask)
e.Start(":5000")
}
Consumer
-
CeleryClient.Register
でタスク名と実行したいタスクのマッピングを登録 -
CeleryClient.StartWorker
でワーカを起動
package main
import (
"fmt"
"github.com/gocelery/gocelery"
"os"
"os/signal"
)
func execute(str string) string {
fmt.Println("execute:", str)
return str
}
func main() {
concurrency := 3
cli, _ := gocelery.NewCeleryClient(
gocelery.NewRedisCeleryBroker("redis://localhost:6379"),
gocelery.NewRedisCeleryBackend("redis://localhost:6379"),
concurrency,
)
cli.Register("worker.execute", execute)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
cli.StartWorker()
defer cli.StopWorker()
fmt.Printf("worker start: concurrency=%v\n", concurrency)
select {
case sig := <-c:
fmt.Println("worker stopped by signal:", sig)
return
}
}
実行してみる
Publisher ログ
$ go run app.go
____ __
/ __/___/ / ___
/ _// __/ _ \/ _ \
/___/\__/_//_/\___/ v4.1.10
High performance, minimalist Go web framework
https://echo.labstack.com
____________________________________O/_______
O\
⇨ http server started on [::]:5000
request:"GET /hoge" status:200 latency:51608351 (51.608351ms) bytes:18
request:"GET /fuga" status:200 latency:51921379 (51.921379ms) bytes:18
request:"GET /piyo" status:200 latency:52228218 (52.228218ms) bytes:18
Consumer ログ
$ go run worker.go
worker start: concurrency=3
execute: hoge
execute: fuga
execute: piyo
実行ログ
$ curl http://localhost:5000/hoge | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 18 100 18 0 0 346 0 --:--:-- --:--:-- --:--:-- 346
{
"result": "hoge"
}
$ curl http://localhost:5000/fuga | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 18 100 18 0 0 346 0 --:--:-- --:--:-- --:--:-- 346
{
"result": "fuga"
}
$ curl http://localhost:5000/piyo | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 18 100 18 0 0 339 0 --:--:-- --:--:-- --:--:-- 339
{
"result": "piyo"
}
Backend
Backend の Redis の様子を見ると、 celery-task-meta-<UUID>
というキーでタスク情報が登録されてます。
$ redis-cli
127.0.0.1:6379> keys *
1) "celery-task-meta-6c416702-2efc-4a07-b4c3-450b2930364b"
2) "celery-task-meta-6a5eac24-ef43-4836-a1b9-0b036211649e"
3) "celery-task-meta-70a83f80-0890-4a30-a67a-051d86908f06"
127.0.0.1:6379> get celery-task-meta-6c416702-2efc-4a07-b4c3-450b2930364b
"{\"task_id\":\"\",\"status\":\"SUCCESS\",\"traceback\":null,\"result\":\"hoge\",\"children\":null}"
Publisher を Python にしてみる
gocelery はまだ CELERY_TASK_PROTOCOL=1
にしか対応してないので、最新の Celery を使う場合は引数に指定する必要があります。
from flask import Flask, jsonify
from celery import Celery
app = Flask(__name__)
cli = Celery(
'worker',
broker='redis://localhost:6379',
backend='redis://localhost:6379',
)
cli.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
CELERY_TASK_PROTOCOL=1,
CELERY_REDIS_SOCKET_CONNECT_TIMEOUT=5,
CELERY_REDIS_SOCKET_TIMEOUT=5,
)
@app.route('/<key>')
def call_task(key):
async_res = cli.send_task('worker.execute', (key, ), serializer='json')
res = async_res.get(timeout=10)
return jsonify({'result': res})
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)
これで動いたんですが、 Celery のバージョンが4系だと async_res.get
がたまにタイムアウトする謎の事象が発生。
バージョンを3系にすると上手く動きました。