28
25

More than 1 year has passed since last update.

FastAPI OAuth2パスワード認証

Last updated at Posted at 2021-11-18

==>【続編を書きました 2021 11/25】FastAPI OAuth2 クライアント - Qiita
==>【続編を書きました 2021 12/15】FastAPIでつくるVue SPAアプリの雛形 - Qiita

FastAPIで安全なログインなどを行うときは、OAuth2パスワード認証が使われることが多いようです。あまり難しいことは考えないでFastAPIの提供してくれる機能を使えば、他のフレームワークに比べれば比較的簡単に実現可能です。ドキュメントも充実していて、その丁寧な説明には感謝しかありません。

【公式サイト】Dependencies - First Steps
【公式サイト】Security - First Steps

今回はドキュメントの内容をコンパクトにまとめる形で説明しました。

1. OAuth2パスワード認証の概略

1-1.OAuth2パスワード認証の登場人物

OAuth2パスワード認証の登場人物は4人でそれぞれ以下の役割を担っています。

登場人物 役割
1. ユーザ クライアント(ブラウザ)上でユーザ名/パスワードを入力して認可サーバに認可させます。その後ブラウザをクリックし別セッションとしてAPIサーバにリクエストを送ります。
2. クライアント 認可サーバにusername/passwordを送信しtokenを取得する。tokenを添えてAPIサーバにリクエストを送信し必要なリソースをgetする。
3. 認可サーバ クライアントから送られてきたusername/passwordを検証し、正しければtokenを生成しreturnする。
4. APIサーバ クライアントから送られたtokenを検証し、正しいものであればリクエストの要求するリソースをreturnする。 }

OAuth2は、バックエンドやAPIがユーザーを認証するサーバーから独立したものとして設計されていました。しかし、この場合、同じFastAPIアプリケーションがAPIと認証を処理します。つまりFastAPIアプリケーションが同時に認可サーバとAPIサーバの両方の役割を果たします。

1-2. OAuth2のパスワードフロー

OAuth2 (3) (1) (2).png

tokenについて

上のダイアグラムの(※2)で生成されるtokenは以下の性質を持ちます。

  • tokenは単なる文字列だが、このユーザを認証できる情報を持っている。
    • 通常、tokenには有効期限がセットされている。
      • それでユーザは時間の経過によっては再ログインが必要な時もある。
      • またtokenが盗まれたとしてもリスクは低い。永久的なキーが盗まれた場合とは異なる。

シーケンス図はこちらを利用

2. FastAPIとDependency

【公式サイト】Dependencies - First Steps
FastAPIは強力で精密なDependency Injectionシステムを備えています。

以下はDependencyを使わない普通のコードです。

depend0.py
from typing import Optional
from fastapi import Depends, FastAPI

app = FastAPI()

@app.get("/items/")
async def read_items(q: Optional[str] = None, skip: int = 0, limit: int = 100):
    return {"q":q, "skip":skip, "limit":limit}

@app.get("/users/")
async def read_users(q: Optional[str] = None, skip: int = 0, limit: int = 100):
    return {"q":q, "skip":skip, "limit":limit}

実行すると以下のような結果になります。
image.png

2-1. 関数Dependency

read_items()やread_users()のようなデコレータの下の関数は path operation function と呼ばれています。path operation functionの引数が同じパターンなので、Dependencyを使って、以下のようにまとめることができます。

depend1.py
from typing import Optional
from fastapi import Depends, FastAPI

app = FastAPI()

async def common_parameters(q: Optional[str] = None, skip: int = 0, limit: int = 100):
    return {"q": q, "skip": skip, "limit": limit}

@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
    return commons

@app.get("/users/")
async def read_users(commons: dict = Depends(common_parameters)):
    return commons

実行結果はdepend0.pyと同じです。
path operation functionの引数の中にDepends()関数を使いますが、これは以前に紹介したQuery()関数やBody()関数と同じようなものです。
FastAPIで作るWebアプリ - validation / FastAPIで作るWebアプリ - Body validation

Depends()関数がBody()関数やQuery()関数と違う点は以下の通りです:

  • Depends()関数は引数を一個しかとりません。
  • この引数は関数のようなCallableでなければなりません
  • このcallableは、 path operation functionsと同じ引数の並びを持ちます。

この3番目の言っていることは、depend0.pyのpath operation functionの引数の並びと、depend1.pyのcommon_parameters()の引数の並びが同じだということです。

2-2. クラスDependency

FastAPIでは、Python クラスは callableなのでdependencyとして使うことができます。つまりDepends()関数の引数として渡すことができます。

depend2.py
from typing import Optional
from fastapi import Depends, FastAPI

app = FastAPI()

fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]

class CommonQueryParams:
    def __init__(self, q: Optional[str] = None, skip: int = 0, limit: int = 100):
        self.q = q
        self.skip = skip
        self.limit = limit

@app.get("/items/")
async def read_items(commons: CommonQueryParams = Depends(CommonQueryParams)):
    response = {}
    if commons.q:
        response.update({"q": commons.q})
    items = fake_items_db[commons.skip : commons.skip + commons.limit]
    response.update({"items": items})
    return response

init メッソドの引数の並びがこれまでと同じであることに注目してください。FastAPIは関数Dependencyの場合と同じように引数の並びをチェックします。

また以下の書き方は冗長なので

commons: CommonQueryParams = Depends(CommonQueryParams)

以下のように省略できます

commons: CommonQueryParams = Depends()

3. SimpleなOAuth2フロー

以下はJWT tokens や 安全な password hashing を使わないSimpleなフローを実現したものです。安全なフローのプログラムは「OAuth2 with Password (and hashing), Bearer with JWT tokens」を参照してください。

oauth2_simple.py
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

app = FastAPI()

def fake_hash_password(password: str):
    return "fakehashed" + password

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    return {"access_token": user.username, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_use

3-1. Pydantic Userモデル

UserモデルはPydantic であることに注目してください。
Pydantic Data Model入門

3-2.get_current_active_user サブDependency

DependencyはサブDependencyを持つことができます。そのネストはいくらでも深くすることができます。今回はget_current_active_user -> get_current_user -> oauth2_scheme とサブDependencyがネストしています。

3-3. OAuth2PasswordBearerとOAuth2PasswordRequestForm

クラス OAuth2PasswordBearerOAuth2PasswordRequestForm のインスタンスはDependencyとして使われます。

OAuth2PasswordBearerのDependencyは、RequestのAuthorization headerを探しに行き、Headerの値が「Bearer + token」の形であることを確認します。そしてそのtokenをクライアントにstr値として返します。このインスタンスは引数tokenUrl="token"をとっていることに注意してください。
もしAuthorization headerがなかったり、あったとしてもBearer tokenを持っていなかった場合は、401ステータスエラー(UNAUTHORIZED)を直接返します。ダイアグラム(※3)

OAuth2PasswordRequestForm のDependencyは「username/password」のform bodyを処理します。ログインの処理ですね。ダイアグラム(※2)

今回は以上です

28
25
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
28
25