この記事は全部俺 Advent Calendar 2018の5日目の記事です。
以前紹介した構成でfire and forgetをやるよりも圧倒的に簡単に非同期処理が実装できるresponderを使った方法を紹介します。
responder is 何?
公式ドキュメントによると、Responder is a web service framework, written for human beings.
つまり、人間のために書かれたWebサービスフレームワークです。
人間のための、というのはPythonを使ってるとよく目にするフレーズですね。
(作者のkennethreitzさんがよく使うフレーズです。)
筆者の環境
name | version |
---|---|
macOS | 10.14.1 |
Python | 3.7.1 |
Pipenv | 2018.11.26 |
responsder | 1.1.2 |
インストールとバグフィックスとHello, world!
作者が人間のためのPython開発ワークフローpipenvを作った人なので、敬意を表してpipenvを使っていきます。
pipenv install responder --pre
を行い、responderをインストールした後、公式ドキュメントに記載されている以下のソースコードを実行してみます。
(That async declaration is optional.
と記載されているので、greet_world関数のasyncは削除しています。)
import responder
api = responder.API()
@api.route("/{greeting}")
def greet_world(req, resp, *, greeting):
resp.text = f"{greeting}, world!"
if __name__ == '__main__':
api.run()
どうでしょう?
import responder
した時点で、ModuleNotFoundError: No module named 'starlette.lifespan'
というエラーが出ましたね?
実はこれは最新版のresponderのバグです。
これはIssueにも上がっているように、pipenv install starlette==0.8
を行ってstarletteのバージョンを0.8にしてやれば開発します。
2019/1/18追記:修正されました!
※余談ですが、過去にもstarletteのバージョン起因のバグが起きていたようなので、今後もこのエラーが出たら一度starletteのバージョンを変更してみると良いかもしれません。
気を取り直して、もう一度ソースコードを実行し、curl http://127.0.0.1:5042/Hello
を実行、あるいはブラウザでhttp://127.0.0.1:5042/Hello
にアクセスすると、Hello, world!
という文言が帰ってくるはずです。
これでチュートリアルが完了したので、次に非同期処理によるBackground処理を試してみます。
非同期処理(Background処理)
ここにあるソースコードを書いてみます。
全体のソースコードは以下のようになります。
import time
import responder
api = responder.API()
@api.route("/")
def hello(req, resp):
@api.background.task
def sleep(s=10):
time.sleep(s)
print("slept!")
sleep()
resp.content = "processing"
if __name__ == '__main__':
api.run()
そして、このソースコードを実行してcurl http://127.0.0.1:5042
を実行してみると、レスポンスのprocessingという文字列はすぐ返ってくる一方で、標準出力へのslept!は10秒後に出力されるので、非同期実行されていることがわかります!
このままだと結果がわかりにくいので、標準出力にタイムスタンプを出力するようにファイルを書き換えて再度実行してみます。
import time
from datetime import datetime
import responder
api = responder.API()
@api.route("/")
def hello(req, resp):
@api.background.task
def sleep(s=10):
print(f"[sleep] start at {datetime.now()}")
time.sleep(s)
print("slept!")
print(f"[sleep] end at {datetime.now()}")
sleep()
print(f"[hello] start at {datetime.now()}")
resp.content = f"{datetime.now()} processing"
print(f"[hello] end at {datetime.now()}")
if __name__ == '__main__':
api.run()
そして、同様にこのソースコードを実行してcurl http://127.0.0.1:5042
を実行してみると以下のように結果が出力されます。
INFO: ('127.0.0.1', 64969) - "GET / HTTP/1.1" 200
[sleep] start at 2018-12-05 23:01:18.900378
[hello] start at 2018-12-05 23:01:18.900466
[hello] end at 2018-12-05 23:01:18.900490
slept!
[sleep] end at 2018-12-05 23:01:28.905147
上記の結果を見ると、はじめにsleep()が呼ばれ、その処理と同時にhello()の処理が行われていることがわかります。
ちなみに、resp.content
の結果は2018-12-05 23:01:18.900485 processing
となっていました。
sleep()開始→hello開始→helloのレスポンス返却→hello終了→slept!出力→sleep終了という順序になっていることが確認できますね。
ここまでで前回の記事と同様のことがサクッと実装できてしまいました。
ここからは、実際の業務を想定して処理完了後にSlack通知を行ってくれるように拡張することを考えていきます。
処理完了後にSlack通知
まず、Slack側の設定から行っていきます。
- Slackワークスペース内に、通知を行いたいチャンネルを作成しておく
- Slackアプリから、「Apps」の
+
ボタンを押して、Incoming WebHooks
を検索して選択 - ブラウザに飛ぶので、画面左の緑色の「Add Configuration」ボタンを押す
- 「Post to Channel」にて通知を行うチャンネルまたは個人を選択して、「Add Incoming WebHooks integration」ボタンを押す
- 「Webhook URL」に出てくるURLをコピーしておく
次に、python側の処理を行っていきます。
といっても、実質POSTリクエストを飛ばすだけです。
最終的に、以下のような形式になります。
import json
import time
from datetime import datetime
import responder
import requests
api = responder.API()
@api.route("/")
def hello(req, resp):
@api.background.task
def sleep(s=10):
print(f"[sleep] start at {datetime.now()}")
time.sleep(s)
print("slept!")
print(f"[sleep] end at {datetime.now()}")
push_slack(f"[{datetime.now()}] background task is complete!!")
sleep()
print(f"[hello] start at {datetime.now()}")
resp.content = f"{datetime.now()} processing"
print(f"[hello] end at {datetime.now()}")
def push_slack(msg):
requests.post(
# TODO: ここのurlを修正する必要があります
url="[Incoming Webhook URL]",
data=json.dumps({
"text": f"<!here> {msg}",
"username": "responder bot",
"icon_emoji": ":ok:",
}))
if __name__ == '__main__':
api.run()
こうすると、標準出力とレスポンスとして、先ほどと同様の結果が得られます。
また、Slackには以下のように通知が来ていることが確認できました。
今回は実装しませんでしたが、try: ~ except:
と合わせてエラー時には別メッセージで通知したり、開始時にも通知をしたりするとより実用的になると思います。
まとめ
responder、すっごく簡単。
このページに、api.run()
コマンドでThis will spin up a production web server on port 5042, ready for incoming HTTP requests.
と記載されているので、Flaskにあるような開発サーバ限定ではなく、productionサーバとして使用することも可能です。
GraphQLのサポートもあるので、それについても近いうちに触ってまとめてみたいと思います。