LoginSignup
3
2

More than 3 years have passed since last update.

huey

Last updated at Posted at 2019-11-02

hueyとは?

Pythonで利用できる軽量タスクキュー。

よく使われているタスクキューとして celery があるが、複雑なワークフローやバックエンドが必要なければHueyのほうがシンプルで使いやすいと感じる。celeryの謎挙動(プロセスは生きているもののタスクは処理されないなど)に遭遇した人には特に。

特徴

  • 軽量、シンプル
  • Redis, SQLite, インメモリのバックエンド
  • 定期実行
  • 遅延実行
  • 自動リトライ
  • Djangoとの統合(このケースで書いています)

タスク

タスクの作成

関数を task デコレータでくるむだけで非同期タスクとなる。この関数は TaskWrapper クラスとなり、リトライの設定などを同時に保持する。 some_app/tasks.py に書くことでhueyのワーカープロセスが自動的に発見してくれる。 crontab による定期実行もできる。

from huey import crontab
from huey.contrib.djhuey import task, periodic_task

# 非同期タスク
@task()
def async_task(a, b):
    return a / b

# 定期実行タスク。5分に1回実行
@periodic_task(crontab(minute="*/5"))
def heartbeat():
    print("alive")

ワーカープロセスの実行

Djangoインテグレーションを使っている場合、 run_huey というマネジメントコマンドが追加されるので実行する。

python manage.py run_huey

登録されているタスクが表示される。

[2019-11-02 14:52:28,774] INFO:huey.consumer:MainThread:Huey consumer started with 1 thread, PID 65988 at 2019-11-02 14:52:28.774934
[2019-11-02 14:52:28,775] INFO:huey.consumer:MainThread:Scheduler runs every 1 second(s).
[2019-11-02 14:52:28,775] INFO:huey.consumer:MainThread:Periodic tasks are enabled.
[2019-11-02 14:52:28,775] INFO:huey.consumer:MainThread:The following commands are available:
+ task_example.tasks.async_task
+ task_example.tasks.heartbeat

タスクの実行

タスクを実行すると huey.api.Result のインスタンスが得られる。すぐに実行結果を得たい場合には get() メソッドを叩くと取得することが出来る。

from some.where.tasks import async_task

# 非同期タスクを実行する
result = async_task(1, 2)
result.get()
# > 0.5

タスクが例外を上げた場合には指定回数までリトライされる。エラーも取得できる。重たいバッチ処理であるとか、外部への通信処理で使用すると便利。

# ZeroDivisionErrorが出てリトライされる
result = async_task(1, 0)
result.get()
# > TaskException: ZeroDivisionError('division by zero')

ログに N retries の表示が減っていくことでリトライしていることがわかる。

[2019-11-02 15:07:21,932] INFO:huey:Worker-1:Executing task_example.tasks.async_task: 4bfe57b7-77f8-4443-9da9-80cb158e99d4 2 retries
[2019-11-02 15:07:21,935] ERROR:huey:Worker-1:Unhandled exception in task 4bfe57b7-77f8-4443-9da9-80cb158e99d4.
Traceback (most recent call last):
  File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 345, in _execute
    task_value = task.execute()
  File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 697, in execute
    return func(*args, **kwargs)
  File "/Users/key/Work/myProjects/huey_example/task_example/tasks.py", line 7, in async_task
    return a / b
ZeroDivisionError: division by zero
[2019-11-02 15:07:21,941] INFO:huey:Worker-1:Requeueing 4bfe57b7-77f8-4443-9da9-80cb158e99d4, 1 retries
[2019-11-02 15:07:21,942] INFO:huey:Worker-1:Executing task_example.tasks.async_task: 4bfe57b7-77f8-4443-9da9-80cb158e99d4 1 retries
[2019-11-02 15:07:21,943] ERROR:huey:Worker-1:Unhandled exception in task 4bfe57b7-77f8-4443-9da9-80cb158e99d4.
Traceback (most recent call last):
  File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 345, in _execute
    task_value = task.execute()
  File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 697, in execute
    return func(*args, **kwargs)
  File "/Users/key/Work/myProjects/huey_example/task_example/tasks.py", line 7, in async_task
    return a / b
ZeroDivisionError: division by zero
[2019-11-02 15:07:21,943] INFO:huey:Worker-1:Requeueing 4bfe57b7-77f8-4443-9da9-80cb158e99d4, 0 retries
[2019-11-02 15:07:21,945] INFO:huey:Worker-1:Executing task_example.tasks.async_task: 4bfe57b7-77f8-4443-9da9-80cb158e99d4
[2019-11-02 15:07:21,945] ERROR:huey:Worker-1:Unhandled exception in task 4bfe57b7-77f8-4443-9da9-80cb158e99d4.
Traceback (most recent call last):
  File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 345, in _execute
    task_value = task.execute()
  File "/Users/key/Work/myProjects/huey_example/.venv/lib/python3.7/site-packages/huey/api.py", line 697, in execute
    return func(*args, **kwargs)
  File "/Users/key/Work/myProjects/huey_example/task_example/tasks.py", line 7, in async_task
    return a / b
ZeroDivisionError: division by zero

なお、処理継続不可能な例外は適切にハンドルすることが望ましい(サンプルは不適当です)。

応用

リトライ回数の変更

task() デコレータに引数を渡すことでリトライ回数、リトライまでの時間などを指定することが出来る。

# リトライ回数を3回(職階実行含めて合計4回)、リトライまでのディレイを30秒に設定する
@task(retries=3, retry_delay=30)
def async_task(a, b):
    return a / b

リトライ回数を動的に変更する

TaskWrapperretries プロパティを上書きする。

# リトライ回数を1回に変更する
async_task.retries = 1
async_task(1, 0)

遅延実行

TaskWrapper での処理を遅延実行させることができる。1つ目の引数に関数に渡すべき値をタプルで、 delay キーワードに遅延させる秒数を入れる。

# 5秒後に実行
r = async_task.schedule((1, 2), delay=5)
# すぐに実行すると結果は得られない(Noneがかえる)
r.get()
# 結果を待つには `blocking` オプションを使う
r.get(blocking=True)

参考

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2