はじめに
この記事はGo3 Advent Calendar 201712日目の記事です。
アドベントカレンダーは初参加になります。よろしくお願いします
GoでWebアプリケーションを開発している時、何かのリクエストを受け取った際にリクエストは200 OKで返却し、処理自体はバックグラウンドで行うみたいなことをしたくて調べていました。(Railsとかだとdelayed_jobとかそういうの)
その際にmachineryというOSSを見つけたので、その紹介をしたいと思います。
machineryとは
詳細はgithubに書いてありますが、以下に一部抜粋します。
Machinery is an asynchronous task queue/job queue based on distributed message passing.
Machineryは分散メッセージを使用した非同期タスクのキュー/実行基盤です。
書いてある通り、バックグラウンドへは分散メッセージ(broker)を通して行われます。
現状サポートしているものとしては以下のようです。
- AMQP
- Redis
また、バックグラウンド処理のステータス並びに結果を格納するものとしては以下をサポートしているようです。
- Redis
- Memcache
- AMQP (書いてありますが、AMQPは非推奨のようです Keeping Results)
- MongoDB
やってみた
チュートリアルですが、自前の環境で動かしてみました。
以下にサンプルがあるので、そちらを使って動作確認を実施します。
machinery.go
また、当然ですが、事前準備としてbrokerや結果の格納先を準備しておく必要があります。(今回は、AMQP並びにMongoDBを使用します。)
そちらについては、今回は省略させていただきます。
設定変更
machinery.goと同一ディレクトリにconfig.ymlという設定ファイルがあるので、これを修正します。
---
broker: 'amqp://guest:guest@localhost:5672/'
default_queue: machinery_tasks
result_backend: 'redis://127.0.0.1:6379'
results_expire_in: 3600000
amqp:
binding_key: machinery_task
exchange: machinery_exchange
exchange_type: direct
prefetch_count: 3
Worker起動
まず、Workerを起動します。
以下のコマンドによりWorkerを起動することが出来ます。
go run example/machinery.go -c config.yml worker
起動すると
$ go run machinery.go -c config.yml worker
INFO: 2017/12/05 17:30:51 file.go:30 Successfully loaded config from file for the first time
INFO: 2017/12/05 17:30:51 worker.go:31 Launching a worker with the following settings:
INFO: 2017/12/05 17:30:51 worker.go:32 - Broker: amqp://guest:guest@localhost:5672/
INFO: 2017/12/05 17:30:51 worker.go:33 - DefaultQueue: machinery_tasks
INFO: 2017/12/05 17:30:51 worker.go:34 - ResultBackend: mongodb://127.0.0.1:27017/taskresults
INFO: 2017/12/05 17:30:51 worker.go:36 - AMQP: machinery_exchange
INFO: 2017/12/05 17:30:51 worker.go:37 - Exchange: machinery_exchange
INFO: 2017/12/05 17:30:51 worker.go:38 - ExchangeType: direct
INFO: 2017/12/05 17:30:51 worker.go:39 - BindingKey: machinery_task
INFO: 2017/12/05 17:30:51 worker.go:40 - PrefetchCount: 3
INFO: 2017/12/05 17:30:52 amqp.go:72 [*] Waiting for messages. To exit press CTRL+C
こんな感じで起動したことを確認できます。
Send実行
次にタスクを送信します。
もう一つコンソールを立ち上げ、以下のコマンドを実行します。
$ go run machinery.go -c config.yml send
INFO: 2017/12/05 17:40:07 file.go:30 Successfully loaded config from file for the first time
INFO: 2017/12/05 17:40:07 machinery.go:183 Single task:
INFO: 2017/12/05 17:40:07 mongodb.go:315 state index already exist, skipping create step
INFO: 2017/12/05 17:40:08 mongodb.go:315 lock index already exist, skipping create step
INFO: 2017/12/05 17:40:08 machinery.go:194 1 + 1 = 2
INFO: 2017/12/05 17:40:08 machinery.go:202 Group of tasks (parallel execution):
INFO: 2017/12/05 17:40:08 machinery.go:215 1 + 1 = 2
INFO: 2017/12/05 17:40:08 machinery.go:215 2 + 2 = 4
INFO: 2017/12/05 17:40:08 machinery.go:215 5 + 6 = 11
INFO: 2017/12/05 17:40:08 machinery.go:225 Group of tasks with a callback (chord):
INFO: 2017/12/05 17:40:08 machinery.go:238 (1 + 1) * (2 + 2) * (5 + 6) = 88
INFO: 2017/12/05 17:40:08 machinery.go:242 Chain of tasks:
INFO: 2017/12/05 17:40:08 machinery.go:254 (((1 + 1) + (2 + 2)) + (5 + 6)) * 4 = 68
INFO: 2017/12/05 17:40:08 machinery.go:267 Task panicked and returned error = oops
何やら動いてそうなログが表示されます。
結果確認
mongodbに実行結果が入っているか確認します。
> db.tasks.find()
{ "_id" : "task_6fc5c094-15e3-40f2-9041-66e515fd5a8b", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(2) } ] }
{ "_id" : "task_f6f2e9ca-160c-4eae-80b7-5a1b1ee49661", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(2) } ] }
{ "_id" : "task_5a039267-67f8-456d-ba70-671fd54283c9", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(4) } ] }
{ "_id" : "task_73f6979d-40a2-4502-b653-24848214b64d", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(11) } ] }
{ "_id" : "task_b16b18b7-7664-4243-8066-7c46af1a0cb8", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(2) } ] }
{ "_id" : "task_392b62b0-d6d4-4d57-8343-42b9f59b0e90", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(4) } ] }
{ "_id" : "task_568e0abf-ac15-4aa3-a324-0e7ffae9a347", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(11) } ] }
{ "_id" : "chord_c5908175-f86e-4201-8980-f425a9bd4229", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(88) } ] }
{ "_id" : "task_295b91c9-186a-4be9-89bd-9eed6186bb6b", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(2) } ] }
{ "_id" : "task_d82fa155-3f73-46c6-bd27-dab6b3c870cd", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(6) } ] }
{ "_id" : "task_26718081-43d4-43f2-ab42-2f3dae4f2174", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(17) } ] }
{ "_id" : "task_15c0b946-8027-41c8-9cc0-b76f00f46f0d", "state" : "SUCCESS", "results" : [ { "type" : "int64", "value" : NumberLong(68) } ] }
{ "_id" : "task_e2e53f8a-ec85-463c-bb63-717cd14d84ec", "state" : "FAILURE", "error" : "oops" }
確かに、処理結果が登録されていますね!
(ちなみに、最後のタスクは明示的にエラーが発生するようになってます。)
処理内容について
簡単にサンプルコードを見ていきたいと思います。
と思ってましたが、時間がなく別記事にまとめさせて下さい。
まとめ
この記事ではmachineryという非同期キュー/実行基盤のOSSについてご紹介しました。
ご興味があれば是非使ってみて下さい