hueyとは?
Pythonで利用できる軽量タスクキュー。
よく使われているタスクキューとして celery があるが、複雑なワークフローやバックエンドが必要なければHueyのほうがシンプルで使いやすいと感じる。celeryの謎挙動(プロセスは生きているもののタスクは処理されないなど)に遭遇した人には特に。
特徴
- 軽量、シンプル
- Redis, SQLite, インメモリのバックエンド
- 定期実行
- 遅延実行
- 自動リトライ
- Djangoとの統合(このケースで書いています)
タスク
タスクの作成
関数を task
デコレータでくるむだけで非同期タスクとなる。この関数は TaskWrapper
クラスとなり、リトライの設定などを同時に保持する。 some_app/tasks.py
に書くことでhueyのワーカープロセスが自動的に発見してくれる。 crontab
による定期実行もできる。
from huey import crontab
from huey.contrib.djhuey import task, periodic_task
# 非同期タスク
@task()
def async_task(a, b):
return a / b
# 定期実行タスク。5分に1回実行
@periodic_task(crontab(minute="*/5"))
def heartbeat():
print("alive")
ワーカープロセスの実行
Djangoインテグレーションを使っている場合、 run_huey
というマネジメントコマンドが追加されるので実行する。
python manage.py run_huey
登録されているタスクが表示される。
[2019-11-02 14:52:28,774] INFO:huey.consumer:MainThread:Huey consumer started with 1 thread, PID 65988 at 2019-11-02 14:52:28.774934
[2019-11-02 14:52:28,775] INFO:huey.consumer:MainThread:Scheduler runs every 1 second(s).
[2019-11-02 14:52:28,775] INFO:huey.consumer:MainThread:Periodic tasks are enabled.
[2019-11-02 14:52:28,775] INFO:huey.consumer:MainThread:The following commands are available:
+ task_example.tasks.async_task
+ task_example.tasks.heartbeat
タスクの実行
タスクを実行すると huey.api.Result
のインスタンスが得られる。すぐに実行結果を得たい場合には get()
メソッドを叩くと取得することが出来る。
from some.where.tasks import async_task
# 非同期タスクを実行する
result = async_task(1, 2)
result.get()
# > 0.5
タスクが例外を上げた場合には指定回数までリトライされる。エラーも取得できる。重たいバッチ処理であるとか、外部への通信処理で使用すると便利。
# ZeroDivisionErrorが出てリトライされる
result = async_task(1, 0)
result.get()
# > TaskException: ZeroDivisionError('division by zero')
ログに N retries
の表示が減っていくことでリトライしていることがわかる。
[2019-11-02 15:07:21,932] INFO:huey:Worker-1:Executing task_example.tasks.async_task: 4bfe57b7-77f8-4443-9da9-80cb158e99d4 2 retries
[2019-11-02 15:07:21,935] ERROR:huey:Worker-1:Unhandled exception in task 4bfe57b7-77f8-4443-9da9-80cb158e99d4.
Traceback (most recent call last):
File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 345, in _execute
task_value = task.execute()
File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 697, in execute
return func(*args, **kwargs)
File "/Users/key/Work/myProjects/huey_example/task_example/tasks.py", line 7, in async_task
return a / b
ZeroDivisionError: division by zero
[2019-11-02 15:07:21,941] INFO:huey:Worker-1:Requeueing 4bfe57b7-77f8-4443-9da9-80cb158e99d4, 1 retries
[2019-11-02 15:07:21,942] INFO:huey:Worker-1:Executing task_example.tasks.async_task: 4bfe57b7-77f8-4443-9da9-80cb158e99d4 1 retries
[2019-11-02 15:07:21,943] ERROR:huey:Worker-1:Unhandled exception in task 4bfe57b7-77f8-4443-9da9-80cb158e99d4.
Traceback (most recent call last):
File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 345, in _execute
task_value = task.execute()
File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 697, in execute
return func(*args, **kwargs)
File "/Users/key/Work/myProjects/huey_example/task_example/tasks.py", line 7, in async_task
return a / b
ZeroDivisionError: division by zero
[2019-11-02 15:07:21,943] INFO:huey:Worker-1:Requeueing 4bfe57b7-77f8-4443-9da9-80cb158e99d4, 0 retries
[2019-11-02 15:07:21,945] INFO:huey:Worker-1:Executing task_example.tasks.async_task: 4bfe57b7-77f8-4443-9da9-80cb158e99d4
[2019-11-02 15:07:21,945] ERROR:huey:Worker-1:Unhandled exception in task 4bfe57b7-77f8-4443-9da9-80cb158e99d4.
Traceback (most recent call last):
File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 345, in _execute
task_value = task.execute()
File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 697, in execute
return func(*args, **kwargs)
File "/Users/key/Work/myProjects/huey_example/task_example/tasks.py", line 7, in async_task
return a / b
ZeroDivisionError: division by zero
なお、処理継続不可能な例外は適切にハンドルすることが望ましい(サンプルは不適当です)。
応用
リトライ回数の変更
task()
デコレータに引数を渡すことでリトライ回数、リトライまでの時間などを指定することが出来る。
# リトライ回数を3回(職階実行含めて合計4回)、リトライまでのディレイを30秒に設定する
@task(retries=3, retry_delay=30)
def async_task(a, b):
return a / b
リトライ回数を動的に変更する
TaskWrapper
の retries
プロパティを上書きする。
# リトライ回数を1回に変更する
async_task.retries = 1
async_task(1, 0)
遅延実行
TaskWrapper
での処理を遅延実行させることができる。1つ目の引数に関数に渡すべき値をタプルで、 delay
キーワードに遅延させる秒数を入れる。
# 5秒後に実行
r = async_task.schedule((1, 2), delay=5)
# すぐに実行すると結果は得られない(Noneがかえる)
r.get()
# 結果を待つには `blocking` オプションを使う
r.get(blocking=True)
参考
- https://huey.readthedocs.io/
- https://github.com/coleifer/huey/
- https://github.com/key/huey-example このエントリで確認したサンプルコード
- https://qiita.com/key/items/a0325c397d9a58f1e2fb 拙著Celeryワークフローについて