Help us understand the problem. What is going on with this article?

goceleryを使ってみた

More than 1 year has passed since last update.

Celery の Go 版ライブラリがあったので使ってみました。

Celery とは

元々は Python のライブラリで、ジョブキューイングを簡単に実装できるフレームワークです( Ruby の世界で言う Sidekiq のようなもの)。
当社の主力商品いい物件 One のバックエンドにも使われています。
仕組みとしては以下のような感じです。

celery.png

  • Publisher は Broker に対して実行したいタスク情報を書き込む
  • Consumer は Broker をポーリングしてタスクが投げ込まれたらそれを処理
  • Consumer は処理が終わったら結果を Backend に返す(返さないことも可能)
  • Publisher は Backend をポーリングして結果が返ってきたらそれを処理

こうしておくと非同期で並列の処理ができたり、 Consumer を増やすことでスケールアップできたり、ジョブの流量制御を行ったりすることができます。

Broker には RabbitMQ や Amazon SQS 等の MQ を始め、 Redis などの KVS を、 Backend には Redis 等の KVS や RDBMS を始め、 RabbitMQ などの MQ も使えます。

Celery に入門したい人は以下をご覧ください。
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html

gocelery とは

Celery の Go 版ライブラリです。
https://github.com/gocelery/gocelery

Celery はプロトコルさえ共通であれば Publisher 、 Consumer を Python 以外の言語で書いても良いはずなので、今回は Python との接続も試してみます。

インストール

go get github.com/gocelery/gocelery

ただし普通に入れると go.uuid のバグでエラーになるので、 go.uuid のバージョンに master を指定してください。
https://github.com/satori/go.uuid/issues/70

実装してみる

Publisher

  • Broker と Backend にはとりあえず Redis を使用
  • CeleryClient.Delay でタスクを投げる
  • 返り値は AsyncResult 型になり、 Get メソッドで結果を待ちます
app.go
package main

import (
    "github.com/gocelery/gocelery"
    "github.com/labstack/echo/v4"
    "github.com/labstack/echo/v4/middleware"
    "net/http"
    "time"
)

func callTask(c echo.Context) error {
    cli, err := gocelery.NewCeleryClient(
        gocelery.NewRedisCeleryBroker("redis://localhost:6379"),
        gocelery.NewRedisCeleryBackend("redis://localhost:6379"),
        1,
    )
    if err != nil {
        return err
    }

    asyncResult, err := cli.Delay("worker.execute", c.Param("key"))
    if err != nil {
        return err
    }

    res, err := asyncResult.Get(10 * time.Second)
    if err != nil {
        return err
    }

    return c.JSON(http.StatusOK, map[string]interface{}{"result": res})
}

func main() {
    e := echo.New()
    e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
        Format: "request:\"${method} ${uri}\" status:${status} latency:${latency} (${latency_human}) bytes:${bytes_out}\n",
    }))
    e.GET("/:key", callTask)
    e.Start(":5000")
}

Consumer

  • CeleryClient.Register でタスク名と実行したいタスクのマッピングを登録
  • CeleryClient.StartWorker でワーカを起動
worker.go
package main

import (
    "fmt"
    "github.com/gocelery/gocelery"
    "os"
    "os/signal"
)

func execute(str string) string {
    fmt.Println("execute:", str)
    return str
}

func main() {
    concurrency := 3

    cli, _ := gocelery.NewCeleryClient(
        gocelery.NewRedisCeleryBroker("redis://localhost:6379"),
        gocelery.NewRedisCeleryBackend("redis://localhost:6379"),
        concurrency,
    )

    cli.Register("worker.execute", execute)

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)

    cli.StartWorker()
    defer cli.StopWorker()
    fmt.Printf("worker start: concurrency=%v\n", concurrency)

    select {
    case sig := <-c:
        fmt.Println("worker stopped by signal:", sig)
        return
    }
}

実行してみる

Publisher ログ

$ go run app.go 

   ____    __
  / __/___/ /  ___
 / _// __/ _ \/ _ \
/___/\__/_//_/\___/ v4.1.10
High performance, minimalist Go web framework
https://echo.labstack.com
____________________________________O/_______
                                    O\
⇨ http server started on [::]:5000
request:"GET /hoge" status:200 latency:51608351 (51.608351ms) bytes:18
request:"GET /fuga" status:200 latency:51921379 (51.921379ms) bytes:18
request:"GET /piyo" status:200 latency:52228218 (52.228218ms) bytes:18

Consumer ログ

$ go run worker.go 
worker start: concurrency=3
execute: hoge
execute: fuga
execute: piyo

実行ログ

$ curl http://localhost:5000/hoge | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    18  100    18    0     0    346      0 --:--:-- --:--:-- --:--:--   346
{
  "result": "hoge"
}
$ curl http://localhost:5000/fuga | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    18  100    18    0     0    346      0 --:--:-- --:--:-- --:--:--   346
{
  "result": "fuga"
}
$ curl http://localhost:5000/piyo | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    18  100    18    0     0    339      0 --:--:-- --:--:-- --:--:--   339
{
  "result": "piyo"
}

Backend

Backend の Redis の様子を見ると、 celery-task-meta-<UUID> というキーでタスク情報が登録されてます。

$ redis-cli
127.0.0.1:6379> keys *
 1) "celery-task-meta-6c416702-2efc-4a07-b4c3-450b2930364b"
 2) "celery-task-meta-6a5eac24-ef43-4836-a1b9-0b036211649e"
 3) "celery-task-meta-70a83f80-0890-4a30-a67a-051d86908f06"
127.0.0.1:6379> get celery-task-meta-6c416702-2efc-4a07-b4c3-450b2930364b
"{\"task_id\":\"\",\"status\":\"SUCCESS\",\"traceback\":null,\"result\":\"hoge\",\"children\":null}"

Publisher を Python にしてみる

gocelery はまだ CELERY_TASK_PROTOCOL=1 にしか対応してないので、最新の Celery を使う場合は引数に指定する必要があります。

app.py
from flask import Flask, jsonify
from celery import Celery

app = Flask(__name__)
cli = Celery(
    'worker',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379',
)
cli.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_RESULT_SERIALIZER='json',
    CELERY_ENABLE_UTC=True,
    CELERY_TASK_PROTOCOL=1,
    CELERY_REDIS_SOCKET_CONNECT_TIMEOUT=5,
    CELERY_REDIS_SOCKET_TIMEOUT=5,
)


@app.route('/<key>')
def call_task(key):
    async_res = cli.send_task('worker.execute', (key, ), serializer='json')
    res = async_res.get(timeout=10)
    return jsonify({'result': res})


if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True)

これで動いたんですが、 Celery のバージョンが4系だと async_res.get がたまにタイムアウトする謎の事象が発生。
バージョンを3系にすると上手く動きました。

es-h-sugihara
いい物件OneサーバAPIを開発しているサービスリード・エンジニア。プログラムを書くのも好きだが、ログやデータを可視化してユーザの動向を眺めたり、他チームの人と議論したりしつつ、「良いサービスとは」を考えながら開発していくことを生業としている。専攻は理論物理なので物理や数学の話にはだいたい食い付く。1日3杯ぐらい紅茶を飲む。自称ゆるふわアナログ系エンジニア。
e-seikatsu
巨大な不動産市場のIT化を力強く推進するクラウドサービスを開発、提供する「不動産テック」企業
https://www.e-seikatsu.info/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away