はじめに
Celeryは、Pythonのタスクをqueueで処理するためのフレームワークです。
Celery の Broker として、Redisを利用することができるのですが、Azureの Azure Redis Cache
を使うにあたり、SSLを利用する部分でハマったのでここで利用方法を残しておきます。
環境
- Python: 3.5.2
- Celery: 4.0.0
- Redis: Azure Redis Cache を利用。Standardプランのため、Virtual Network内に配置することができず、Global IPに紐付いたHostに対してアクセスするので、セキュリティのためSSLの利用が必要となる。Premium プランだと、Private Subnet の中に配置することが可能。Premium Azure Redis Cache の Virtual Network のサポートを構成する方法
想定としては、WebアプリケーションのFlaskで利用していますが、他の環境でも同じだと思います。
SSLを使うには
What’s new in Celery 4.0 (latentcall)によると、Celeryはバージョン4.0から、RedisでもSSL接続を利用できるとのことです。
broker_use_sslを設定することで利用できるとのことですが、Redisの場合は、この情報を鵜呑みにして設定しても動かないです。
Error while reading from socket: (104, 'Connection reset by peer')
というエラーに悩まされることになります。このエラーは、Celeryの内部でRedisとの接続に、redis.connection.SSLConnection
ではなく、Redisの redis.connection.Connection
を使っていることによるエラーです。
Fix Redis SSL support このプルリクを参考に設定しないと、SSLを利用することができません。
from celery import Celery
from redis.connection import SSLConnection
from .config import broker_use_ssl
def make_celery(app):
celery = Celery(app.import_name, backend=app.config['CELERY_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
celery.conf.update(broker_use_ssl=broker_use_ssl)
# URL: https://github.com/celery/kombu/pull/634
if celery.conf.broker_use_ssl:
celery.backend.connparams.update(celery.conf.broker_use_ssl) # <- ここ
celery.backend.connparams['connection_class'] = SSLConnection # <- ここ
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
make_celery
の引数で渡ってくるapp
は、Flaskのアプリケーションコンテキストです。
broker_use_ssl
を設定した後に、celery.backend.connparams['connection_class'] = SSLConnection
を設定する必要があります。
また、ドキュメント上では、broker_use_ssl
には、True
か、ディクショナリを設定できるとありますが、 True
を設定しても動きません。 ディクショナリを設定する必要があります。ディクショナリは、Rubyとは違い、空だと、False
判定されてしまうので、
broker_use_ssl = {'ssl_cert_reqs': ssl.CERT_NONE}
としておくといいかと思います。
パスワード
Celeryでは、パスワードの設定を
CELERY_BROKER_URL = "redis://:password@hostname:port/db_number"
の形式で設定しなければなりません。HTTPとかの知識が無駄にあったので、あれ?パスワードをURIに含めていいの?と思ってしまいましたが、内部実装を見ると、URIは、内部でちゃんとパースされてました。
最後に
Celeryの内部実装が意外と複雑で読みにくかったです。
あと、Azureのドキュメントには、Python で Azure Redis Cache を使用する方法
一部の Redis クライアントは SSL をサポートしていないため、既定では、 新しい Azure Redis Cache インスタンスに対して非 SSL ポートは無効になっています。 この記事の執筆時には、 redis-py クライアントが SSL をサポートしていません。
という記述がありますが、redis-py
は、コミットログを見る限り、2014年の時点でSSLをサポートしています。古い記述は更新していってほしいものです。割と惑わされます。