Posted at

goceleryを使ってみた

Celery の Go 版ライブラリがあったので使ってみました。


Celery とは

元々は Python のライブラリで、ジョブキューイングを簡単に実装できるフレームワークです( Ruby の世界で言う Sidekiq のようなもの)。

当社の主力商品いい物件 One のバックエンドにも使われています。

仕組みとしては以下のような感じです。


  • Publisher は Broker に対して実行したいタスク情報を書き込む

  • Consumer は Broker をポーリングしてタスクが投げ込まれたらそれを処理

  • Consumer は処理が終わったら結果を Backend に返す(返さないことも可能)

  • Publisher は Backend をポーリングして結果が返ってきたらそれを処理

こうしておくと非同期で並列の処理ができたり、 Consumer を増やすことでスケールアップできたり、ジョブの流量制御を行ったりすることができます。

Broker には RabbitMQ や Amazon SQS 等の MQ を始め、 Redis などの KVS を、 Backend には Redis 等の KVS や RDBMS を始め、 RabbitMQ などの MQ も使えます。

Celery に入門したい人は以下をご覧ください。

http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html


gocelery とは

Celery の Go 版ライブラリです。

https://github.com/gocelery/gocelery

Celery はプロトコルさえ共通であれば Publisher 、 Consumer を Python 以外の言語で書いても良いはずなので、今回は Python との接続も試してみます。


インストール

go get github.com/gocelery/gocelery

ただし普通に入れると go.uuid のバグでエラーになるので、 go.uuid のバージョンに master を指定してください。

https://github.com/satori/go.uuid/issues/70


実装してみる


Publisher


  • Broker と Backend にはとりあえず Redis を使用


  • CeleryClient.Delay でタスクを投げる

  • 返り値は AsyncResult 型になり、 Get メソッドで結果を待ちます


app.go

package main

import (
"github.com/gocelery/gocelery"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"net/http"
"time"
)

func callTask(c echo.Context) error {
cli, err := gocelery.NewCeleryClient(
gocelery.NewRedisCeleryBroker("redis://localhost:6379"),
gocelery.NewRedisCeleryBackend("redis://localhost:6379"),
1,
)
if err != nil {
return err
}

asyncResult, err := cli.Delay("worker.execute", c.Param("key"))
if err != nil {
return err
}

res, err := asyncResult.Get(10 * time.Second)
if err != nil {
return err
}

return c.JSON(http.StatusOK, map[string]interface{}{"result": res})
}

func main() {
e := echo.New()
e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
Format: "request:\"${method} ${uri}\" status:${status} latency:${latency} (${latency_human}) bytes:${bytes_out}\n",
}))
e.GET("/:key", callTask)
e.Start(":5000")
}



Consumer



  • CeleryClient.Register でタスク名と実行したいタスクのマッピングを登録


  • CeleryClient.StartWorker でワーカを起動


worker.go

package main

import (
"fmt"
"github.com/gocelery/gocelery"
"os"
"os/signal"
)

func execute(str string) string {
fmt.Println("execute:", str)
return str
}

func main() {
concurrency := 3

cli, _ := gocelery.NewCeleryClient(
gocelery.NewRedisCeleryBroker("redis://localhost:6379"),
gocelery.NewRedisCeleryBackend("redis://localhost:6379"),
concurrency,
)

cli.Register("worker.execute", execute)

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)

cli.StartWorker()
defer cli.StopWorker()
fmt.Printf("worker start: concurrency=%v\n", concurrency)

select {
case sig := <-c:
fmt.Println("worker stopped by signal:", sig)
return
}
}



実行してみる


Publisher ログ

$ go run app.go 

____ __
/ __/___/ / ___
/ _// __/ _ \/ _ \
/___/\__/_//_/\___/ v4.1.10
High performance, minimalist Go web framework
https://echo.labstack.com
____________________________________O/_______
O\
⇨ http server started on [::]:5000
request:"GET /hoge" status:200 latency:51608351 (51.608351ms) bytes:18
request:"GET /fuga" status:200 latency:51921379 (51.921379ms) bytes:18
request:"GET /piyo" status:200 latency:52228218 (52.228218ms) bytes:18


Consumer ログ

$ go run worker.go 

worker start: concurrency=3
execute: hoge
execute: fuga
execute: piyo


実行ログ

$ curl http://localhost:5000/hoge | jq

% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 18 100 18 0 0 346 0 --:--:-- --:--:-- --:--:-- 346
{
"result": "hoge"
}
$ curl http://localhost:5000/fuga | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 18 100 18 0 0 346 0 --:--:-- --:--:-- --:--:-- 346
{
"result": "fuga"
}
$ curl http://localhost:5000/piyo | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 18 100 18 0 0 339 0 --:--:-- --:--:-- --:--:-- 339
{
"result": "piyo"
}


Backend

Backend の Redis の様子を見ると、 celery-task-meta-<UUID> というキーでタスク情報が登録されてます。

$ redis-cli

127.0.0.1:6379> keys *
1) "celery-task-meta-6c416702-2efc-4a07-b4c3-450b2930364b"
2) "celery-task-meta-6a5eac24-ef43-4836-a1b9-0b036211649e"
3) "celery-task-meta-70a83f80-0890-4a30-a67a-051d86908f06"
127.0.0.1:6379> get celery-task-meta-6c416702-2efc-4a07-b4c3-450b2930364b
"{\"task_id\":\"\",\"status\":\"SUCCESS\",\"traceback\":null,\"result\":\"hoge\",\"children\":null}"


Publisher を Python にしてみる

gocelery はまだ CELERY_TASK_PROTOCOL=1 にしか対応してないので、最新の Celery を使う場合は引数に指定する必要があります。


app.py

from flask import Flask, jsonify

from celery import Celery

app = Flask(__name__)
cli = Celery(
'worker',
broker='redis://localhost:6379',
backend='redis://localhost:6379',
)
cli.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
CELERY_TASK_PROTOCOL=1,
CELERY_REDIS_SOCKET_CONNECT_TIMEOUT=5,
CELERY_REDIS_SOCKET_TIMEOUT=5,
)

@app.route('/<key>')
def call_task(key):
async_res = cli.send_task('worker.execute', (key, ), serializer='json')
res = async_res.get(timeout=10)
return jsonify({'result': res})

if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)


これで動いたんですが、 Celery のバージョンが4系だと async_res.get がたまにタイムアウトする謎の事象が発生。

バージョンを3系にすると上手く動きました。