LoginSignup
33
23

More than 3 years have passed since last update.

非同期サーバーのFastAPIでバックグラウンドで重い処理を書く

Posted at

前書き

少し前にFastAPIという非同期処理で実装されたPythonのフレームワークが登場しました。GoやNodeに匹敵する速さだぞという謳い文句からちょっと気になっていたので試してみました。

1年以上前にPythonでThreadを使うflaskサンプルを作ってみたという記事も書いたので、比較しやすいかと思い同じような実装にしました。

FastAPIはまだまだ日本語のドキュメントが少ないので、何かの参考の一助となれば幸いです。(公式サイトのドキュメントは充実してるので、今回の内容レベルはほとんど書いてあります。)

ちなみに、PydanticによるAPIの型付け(validationやAPIdocument)が簡単にできるのは結構良いのではないでしょうか。

本編

実行環境

macOS Mojave(10.14.3)

環境構築

Pipenvで環境を準備します。

$ mkdir fastapi_project
$ cd fastapi_project
$ pip install pipenv
$ pipenv --python 3.7
$ pipenv install

現在はこの状態です。

$ ls
Pipfile Pipfile.lock

必要なパッケージをインストール

$ pipenv install uvicorn fastapi pydantic

サーバーを書く

server.py
from datetime import datetime
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from time import sleep
from typing import Dict


app = FastAPI()


class Job(BaseModel):
    job_id: int
    is_cancelled: bool = False

    def __call__(self):
        jobs[self.job_id] = self
        try:
            for _ in range(1000):
                print(f'{datetime.now():%H:%M:%S}')
                sleep(1)

                if self.is_cancelled:
                    del jobs[self.job_id]
                    break

        finally:
            print('時間のかかる処理が終わりました')


jobs: Dict[int, Job] = {}


@app.post('/{job_id}/', status_code=202)
async def start(job_id: int, background_tasks: BackgroundTasks):
    t = Job(job_id=job_id)
    background_tasks.add_task(t)
    return {"message": "時間のかかる処理を受け付けました"}


@app.delete('/{job_id}/', status_code=202)
async def stop(job_id: int):
    t = jobs.get(job_id)
    if t is None:
        raise HTTPException(400, detail="job is not exists.")

    t.is_cancelled = True
    return {"message": f"{job_id}の中止処理を受け付けました"}


@app.get('/{job_id}/', status_code=200)
async def status(job_id: int):
    if job_id in jobs:
        return {"message": f"{job_id}は実行中です"}
    else:
        return {"message": f"{job_id}は実行していません"}


@app.get('/health_check/', status_code=200)
async def health_check():
    return {"status": "ok"}


起動します

FastAPIの起動
$ pipenv shell
$ uvicorn server:app --reload

前回同様リクエストを送ってみると、クライアント側には即座にレスポンスが返り、時間のかかる処理はバックグラウンドで処理できることが確認できました。今回はクラスを使いましたが、functionであれば良いのでThreadよりも汎用的に使えそうな気がします。

リクエストを送ってみる
$ curl -X POST http://localhost:8000/2/
{"message":"時間のかかる処理を受け付けました"}
$ curl -X GET http://localhost:8000/2/
{"message":"2は実行中です"}
$ curl -X DELETE http://localhost:8000/2/
{"message":"2の中止処理を受け付けました"}
FastAPI側の結果
INFO:uvicorn:('127.0.0.1', 50184) - "POST /2/ HTTP/1.1" 202
00:39:28
00:39:29
00:39:30
00:39:31
INFO:uvicorn:('127.0.0.1', 50186) - "GET /2/ HTTP/1.1" 200
00:39:32
00:39:33
00:39:34
00:39:35
00:39:36
00:39:37
00:39:38
INFO:uvicorn:('127.0.0.1', 50188) - "DELETE /2/ HTTP/1.1" 202
時間のかかる処理が終わりました

コードの雰囲気についてはFlaskとも似てますね。基本的にはJsonを返すようで、responseの方にPydanticのModelも指定できるようです(今回で言えばJob)。HTMLなどを返したいときはstarletteを使って直接Responseすればいけます。(その場合Validationなどは当然効きませんが)

あとがき

途中Dictのkeyの型がintstrでマッチしなくて停止処理がうまく動かずdebugに若干苦戦しました。下記を追記すれば、IDEでdebugもできます。
参考:https://fastapi.tiangolo.com/tutorial/debugging/

server.py
import uvicorn

# 省略 

if __name__ == '__main__':
    uvicorn(app, host='0.0.0.0', port=8000)

BackgroundTaskの非同期で処理してるjobは、サーバーを停止しても止まらないのが、地味に面倒でした。

33
23
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
33
23