4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

goceleryを使ってみた

Posted at

Celery の Go 版ライブラリがあったので使ってみました。

Celery とは

元々は Python のライブラリで、ジョブキューイングを簡単に実装できるフレームワークです( Ruby の世界で言う Sidekiq のようなもの)。
当社の主力商品いい物件 One のバックエンドにも使われています。
仕組みとしては以下のような感じです。

celery.png

  • Publisher は Broker に対して実行したいタスク情報を書き込む
  • Consumer は Broker をポーリングしてタスクが投げ込まれたらそれを処理
  • Consumer は処理が終わったら結果を Backend に返す(返さないことも可能)
  • Publisher は Backend をポーリングして結果が返ってきたらそれを処理

こうしておくと非同期で並列の処理ができたり、 Consumer を増やすことでスケールアップできたり、ジョブの流量制御を行ったりすることができます。

Broker には RabbitMQ や Amazon SQS 等の MQ を始め、 Redis などの KVS を、 Backend には Redis 等の KVS や RDBMS を始め、 RabbitMQ などの MQ も使えます。

Celery に入門したい人は以下をご覧ください。
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html

gocelery とは

Celery の Go 版ライブラリです。
https://github.com/gocelery/gocelery

Celery はプロトコルさえ共通であれば Publisher 、 Consumer を Python 以外の言語で書いても良いはずなので、今回は Python との接続も試してみます。

インストール

go get github.com/gocelery/gocelery

ただし普通に入れると go.uuid のバグでエラーになるので、 go.uuid のバージョンに master を指定してください。
https://github.com/satori/go.uuid/issues/70

実装してみる

Publisher

  • Broker と Backend にはとりあえず Redis を使用
  • CeleryClient.Delay でタスクを投げる
  • 返り値は AsyncResult 型になり、 Get メソッドで結果を待ちます
app.go
package main

import (
	"github.com/gocelery/gocelery"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	"net/http"
	"time"
)

func callTask(c echo.Context) error {
	cli, err := gocelery.NewCeleryClient(
		gocelery.NewRedisCeleryBroker("redis://localhost:6379"),
		gocelery.NewRedisCeleryBackend("redis://localhost:6379"),
		1,
	)
	if err != nil {
		return err
	}

	asyncResult, err := cli.Delay("worker.execute", c.Param("key"))
	if err != nil {
		return err
	}

	res, err := asyncResult.Get(10 * time.Second)
	if err != nil {
		return err
	}

	return c.JSON(http.StatusOK, map[string]interface{}{"result": res})
}

func main() {
	e := echo.New()
	e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
		Format: "request:\"${method} ${uri}\" status:${status} latency:${latency} (${latency_human}) bytes:${bytes_out}\n",
	}))
	e.GET("/:key", callTask)
	e.Start(":5000")
}

Consumer

  • CeleryClient.Register でタスク名と実行したいタスクのマッピングを登録
  • CeleryClient.StartWorker でワーカを起動
worker.go
package main

import (
	"fmt"
	"github.com/gocelery/gocelery"
	"os"
	"os/signal"
)

func execute(str string) string {
	fmt.Println("execute:", str)
	return str
}

func main() {
	concurrency := 3

	cli, _ := gocelery.NewCeleryClient(
		gocelery.NewRedisCeleryBroker("redis://localhost:6379"),
		gocelery.NewRedisCeleryBackend("redis://localhost:6379"),
		concurrency,
	)

	cli.Register("worker.execute", execute)

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	cli.StartWorker()
	defer cli.StopWorker()
	fmt.Printf("worker start: concurrency=%v\n", concurrency)

	select {
	case sig := <-c:
		fmt.Println("worker stopped by signal:", sig)
		return
	}
}

実行してみる

Publisher ログ

$ go run app.go 

   ____    __
  / __/___/ /  ___
 / _// __/ _ \/ _ \
/___/\__/_//_/\___/ v4.1.10
High performance, minimalist Go web framework
https://echo.labstack.com
____________________________________O/_______
                                    O\
⇨ http server started on [::]:5000
request:"GET /hoge" status:200 latency:51608351 (51.608351ms) bytes:18
request:"GET /fuga" status:200 latency:51921379 (51.921379ms) bytes:18
request:"GET /piyo" status:200 latency:52228218 (52.228218ms) bytes:18

Consumer ログ

$ go run worker.go 
worker start: concurrency=3
execute: hoge
execute: fuga
execute: piyo

実行ログ

$ curl http://localhost:5000/hoge | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    18  100    18    0     0    346      0 --:--:-- --:--:-- --:--:--   346
{
  "result": "hoge"
}
$ curl http://localhost:5000/fuga | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    18  100    18    0     0    346      0 --:--:-- --:--:-- --:--:--   346
{
  "result": "fuga"
}
$ curl http://localhost:5000/piyo | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    18  100    18    0     0    339      0 --:--:-- --:--:-- --:--:--   339
{
  "result": "piyo"
}

Backend

Backend の Redis の様子を見ると、 celery-task-meta-<UUID> というキーでタスク情報が登録されてます。

$ redis-cli
127.0.0.1:6379> keys *
 1) "celery-task-meta-6c416702-2efc-4a07-b4c3-450b2930364b"
 2) "celery-task-meta-6a5eac24-ef43-4836-a1b9-0b036211649e"
 3) "celery-task-meta-70a83f80-0890-4a30-a67a-051d86908f06"
127.0.0.1:6379> get celery-task-meta-6c416702-2efc-4a07-b4c3-450b2930364b
"{\"task_id\":\"\",\"status\":\"SUCCESS\",\"traceback\":null,\"result\":\"hoge\",\"children\":null}"

Publisher を Python にしてみる

gocelery はまだ CELERY_TASK_PROTOCOL=1 にしか対応してないので、最新の Celery を使う場合は引数に指定する必要があります。

app.py
from flask import Flask, jsonify
from celery import Celery

app = Flask(__name__)
cli = Celery(
    'worker',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379',
)
cli.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_RESULT_SERIALIZER='json',
    CELERY_ENABLE_UTC=True,
    CELERY_TASK_PROTOCOL=1,
    CELERY_REDIS_SOCKET_CONNECT_TIMEOUT=5,
    CELERY_REDIS_SOCKET_TIMEOUT=5,
)


@app.route('/<key>')
def call_task(key):
    async_res = cli.send_task('worker.execute', (key, ), serializer='json')
    res = async_res.get(timeout=10)
    return jsonify({'result': res})


if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True)

これで動いたんですが、 Celery のバージョンが4系だと async_res.get がたまにタイムアウトする謎の事象が発生。
バージョンを3系にすると上手く動きました。

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?