37
38

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

[Django] データ更新をメールやチャットサービスで通知する [Celery]

Last updated at Posted at 2018-08-03

この記事について

Djangoの初心者向け記事です。データの更新に合わせて、更新内容をメールやチャットサービスに投稿する方法をまとめました。Celeryを使った非同期処理については後半に書きました。

WebAPIについて

チャットサービスへの投稿には各サービスのWebAPIを利用します。
WebAPIの利用方法は各サービスで説明しています。

ここではSlackの「Incoming Webhook」を使った投稿を例に使います。
事前にエンドポイントの作成が必要なので、上記リンクを見て作成してください。

非同期処理を使わない場合

最初に非同期処理を使わない簡単な方法の説明です。

修正対象ファイル

データの更新の仕組みはすでに存在するものとして、通知機能の修正分のみ記述します。

proj
  ├ proj
  │  └ settings.py         # メール送信サーバ(SMTP)とSlackエンドポイントの設定
  └ app
     ├ tasks.py            # 通知処理の実体
     ├ views.py           # クラスベースビューのform_validで通知処理を呼び出す
     └ templates
         └ app
             └ massege.txt # 通知メッセージのテンプレート

パッケージ追加

  • WebAPI通信用にrequestsが必要になるのでインストールします。
pip install requests

参考:Requests の使い方 (Python Library)

コード内容

  • proj/settings.py

メール送信サーバ(SMTP)のとSlackエンドポイントの設定
この方法でGmailのSTMPを使って送信する場合は、Gmail上で「安全性の低いアプリの許可」を有効にすること。
参考:https://support.google.com/accounts/answer/6010255?hl=ja

なお、Gmailに限っては専用のパッケージがあるのでそっちを使うべきかも。
参考:https://developers.google.com/gmail/api/quickstart/python

proj/settings.py
EMAIL_USE_TLS = True
EMAIL_HOST = 'smtp.gmail.com'
EMAIL_HOST_USER = 'your_account@gmail.com'
EMAIL_HOST_PASSWORD = 'PassW@rd'
EMAIL_PORT = 587
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'

# デバッグ用:以下の設定だとコンソールに出力される
# EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend'

SLACK_WEBHOOK_ENDPOINT = 'https://your_channel_endpoint'
  • app/tasks.py

メール送信とSlack投稿処理の内容
参考:naritoブログ:Djangoでメールを送信

app/tasks.py

import json

import requests
from django.core.mail import send_mail
from django.template.loader import get_template

import proj.settings as settings


# メール送信
def send_email(message):
    subject = 'データ更新'
    from_address = 'from@gmail.com'
    to_address = ['to@gmail.com']
    send_mail(subject, message, from_address, to_address)


# Slack投稿(Incoming Webhook 使用)
def send_slack_message(message):
    requests.post(settings.SLACK_WEBHOOK_ENDPOINT,
                  data=json.dumps({
                      'text': message,  # 投稿するテキスト
                      'username': u'me',  # 投稿のユーザー名
                      'icon_emoji': u':ghost:',  # 投稿のプロフィール画像に入れる絵文字
                      'link_names': 1,  # メンションを有効にする
                  }))

# メールの本文を作成する
def get_message(item, action):
    template = get_template('app/message.txt')
    context = {
        "item": item, "action": action,
    }
    message = template.render(context)
    return message


# 通知処理
def send_notification(item, action):
    message = get_message(item, action)
    send_slack_message(message)
    send_email(message)

  • app/views.py

ビュー内でtasksで定義した通知処理の呼び出しをします。

app/views.py

class MyCreateView(CreateView):

    model = Item
    form_class = ItemForm
    success_url = reverse_lazy('index')

    def form_valid(self, form):

        item = form.save(commit=False)
        item.save()
        send_notification(item, '登録')

        return HttpResponseRedirect(self.success_url)


class MyUpdateView(UpdateView):

    model = Item
    form_class = ItemForm
    success_url = reverse_lazy('index')

    def form_valid(self, form):

        item = form.save(commit=False)
        saved_item = item.save()
        send_notification(saved_item, '更新')

        return HttpResponseRedirect(self.success_url)

  • app/templates/app/massege.txt

メールの本文です。テンプレートの仕組みを使います。HTMLと同様にテンプレートタグでロジックも組めます。

app/templates/app/massege.txt
データベースが更新されました。

処理内容:{{ action }}
作成時間:{{ item.created_at }}
作成者:{{ item.created_by }}

http://localhost:8000/detail/{{ item.pk }}

修正後、データ更新を試してください。問題がなければ通知処理が実行されます。

非同期処理を使う場合

非同期処理を使う目的

先のコードを試してみればわかりますが、外部サービスの応答速度はそれほど速くなく、早くて1秒、GmailのSMTPなどは3~4秒かかります。その時間ユーザーが操作不能になるのでアプリケーションの利便性が下がってしまいます。

それを防ぐため、時間のかかる処理を別プロセスに依頼して、ユーザーの操作不能時間を無くすのが非同期処理です。

非同期処理に必要なもの

Djangoで非同期処理を実現するには Celery というPythonのタスクキューサービスを使います。タスクキューサービスとは時間のかかるジョブを他のプロセスやマシンで分散実行するためのアプリケーションです。

CeleryはDjangoとは別にサービス(デーモン)として起動する必要がありますが、起動設定とジョブ定義はDjangoのプロジェクト内で一緒に管理できます。

Celeryの利用には、ジョブ内容を一時保管するためのメッセージキューというアプリケーションが必要です。RabbitMQRedisAmazon SQSが指定されているので、開発・本番環境にいずれかを準備する必要があります。

Celeryを利用したデータ更新通知のイメージ

image.png

タスクキューの役割定義

  • ジョブを登録するプロセスをClientと呼びます。今回の例ではWebアプリのプロセスのことです。
  • Clientからジョブの要求を受けてタスクキューに登録するサービスをBrokerと呼びます。Brokerは**メッセージキュー**というソフトウェアで構築されます。
  • Brokerのタスクキューからジョブを取得して実際に実行するプロセスをWorkerと呼びます。

参考資料

Django+Celeryについて解説した記事は多々ありますが、以下のサイトのものがもっとも簡潔かつ明快であると思いました(Celery公式サイトよりも!)。

英語ですが、まずはここを一読されるのが近道だと思います。Google翻訳で十分理解できます。

注意事項

  • 残念ながらCeleryはVersion4以降でWindows非対応となりました。ただし今のところ起動コマンドを若干修正することで問題なく動作しますし、タスクスケジューラを使ってデーモン化し本番環境で動かすこともできます。

参考:http://docs.celeryproject.org/en/latest/faq.html#does-celery-support-windows

  • Celeryの過去バージョンではDjango ORMをブローカーに使うことができましたが、最新版ではできなくなっています。それに合わせパッケージ「django-celery」も不要となりました。

開発手順

1.メッセージキューの準備

まず開発環境にメッセージキューをインストールします。Amazon SQSは開発で使えないので(使ってもいいけど)、選択肢はRabbitMQかRedisになります。

MacOS、Linuxで開発する場合はどちらでもOKです。Redisの方がメジャーなサービスのようです。ただしWindowsに関してはRedisの最新版の提供が2016年をもって終了しています。そのためRabbitMQ一択となります。

これ以降はRabbitMQでの説明となります。
RabbitMQ の構築方法については以下の記事にわかりやすくまとまっているのでこの通りに進めればOKです。

2.コード作成

修正対象ファイル

データの更新の仕組みはすでに存在するものとして、通知機能の修正分のみ記述します。

proj
  ├ proj
  │  ├ __init__.py         # Django起動時にcelery.pyのインスタンスが生成されるようにする。
  │  ├ celery.py           # celery.pyの起動ファイル
  │  └ settings.py         # メール送信サーバ(SMTP)の設定
  └ app
     ├ tasks.py            # 通知処理の実体
     ├ models.py           # save, delete メソッドに通知処理を追加
     └ templates
         └ app
             └ massege.txt # 通知メッセージのテンプレート
コード

Djangoの起動時にCeleryのインスタンスを作成します。これが無いとshared_taskデコレータが機能しません。

  • proj/__init__.py
proj/__init__.py

from .celery import app as celery_app

__all__ = ['celery_app']
  • proj/celery.py

app.config_from_objectにより、Celeryの設定をDjangoの設定ファイルより取得します。app.autodiscover_tasks()により、各アプリケーションフォルダにあるtasks.pyをジョブ定義ファイルとして認識します。

proj/celery.py

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
  • proj/settings.py

「非同期処理を使わない場合」のメールサーバの設定に加えて、ブローカーのURLを指定します。Redisを使う場合はredis://localhostとなります。

proj/settings.py
CELERY_BROKER_URL = 'amqp://localhost'
  • app/tasks.py

一応全体を載せますが、非同期処理を使わない場合のものとほとんど変わりません。違いは非同期タスクにshared_taskデコレータが付いたこと、非同期タスクの呼び出しにdelayメソッドを使っていることだけです。

ちなみに非同期タスクはクラスオブジェクトを引数に指定することはできないので注意してください。

app/tasks.py
import json

import requests
from celery import shared_task
from django.core.mail import send_mail
from django.template.loader import get_template

import proj.settings as settings


# メール送信
@shared_task
def send_email(message):
    subject = 'データ更新'
    from_address = 'from@gmail.com'
    to_address = ['to@gmail.com']
    send_mail(subject, message, from_address, to_address)


# Slack投稿(Incoming Webhook)
@shared_task
def send_slack_message(message):
    requests.post(settings.SLACK_WEBHOOK_ENDPOINT,
                  data=json.dumps({
                      'text': message,    # 投稿するテキスト
                      'username': u'me',  # 投稿のユーザー名
                      'icon_emoji': u':ghost:',  # 投稿のプロフィール画像に入れる絵文字
                      'link_names': 1,  # メンションを有効にする
                  }))


# メールの本文を作成する
def get_message(item, action):
    template = get_template('app/message.txt')
    context = {
        "item": item, "action": action,
    }
    message = template.render(context)
    return message


# 通知処理
def send_notification(item, action):
    message = get_message(item, action)
    send_slack_message.delay(message)
    send_email.delay(message)


views.pyとmassege.txtについては「非同期処理を使わない場合」と一緒なので省略します。

3.Celeryの起動

CeleryはDjangoのプロセスとは別に起動させます。

  • 開発環境ではコンソール上で起動します。
  • 本番環境ではサービス(デーモン)化させます。
開発環境

Djangoを起動(manage.py)したあと、別のコンソールで以下のコマンドを実行します。

Linux・macOS
celery -A proj worker -l info
Windows
celery -A proj worker -l info -P eventlet

Windowsでは事前にpip install eventletでeventletパッケージを追加してください。WindowsでLinux、macOSと同じコマンドを実行すると、ジョブの実行時に以下のエラーが出ます。上記コマンドはその回避策です。

Windows
ValueError: not enough values to unpack (expected 3, got 0)

参考:[github] ValueError: not enough values to unpack (expected 3, got 0)

celeryコマンドがceleryインスタンスを探し出す仕組みについては以下の記事にまとまっています。
参考:https://qiita.com/kwi/items/bd289aeff0fa5881e797
上の元ネタ:https://media.readthedocs.org/pdf/celery/latest/celery.pdf

本番環境
Herokuの場合

HerokuではRabbitMQ(RabbitMG-bigwig)とRedisはアドオンとして提供されています。どちらも無料コースがあるので活用しましょう。RabbitMQの場合だと環境変数RABBITMQ_BIGWIG_URLにブローカーのURLが格納されます。os.environやdjango-environを使って取得してください。

ProcfileにCeleryの起動コマンドを指定します。workerプロセスとして実行します。

Procfile
web: gunicorn proj.wsgi --log-file -
worker: celery worker -A proj -l INFO

デプロイ直後はworkerプロセスが起動していません。以下のコマンドで登録したプロセスを起動します。

heroku ps:scale worker=1

参考:Django-Celery-Redisのherokuへのデプロイがうまくできたときのメモ

Celery対応パッケージ

Django+Celery環境をより使いやすくするためのパッケージが出ています。

  • django-celery-results
  • ジョブの実行結果をDjangoのデータベースに保存
  • django-celery-email
  • Celeryを使ったメールの送信が簡単になる。
  • django-slack
  • Slackのライブラリ。バックエンドにCeleryを使うことができる。
37
38
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
37
38

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?