本記事の内容は筆者個人の見解であり、所属組織・企業の公式見解ではありません。また、本記事の原案は AI を用いて作成していますが、ここで整理した概念や考え方については筆者自身が確認しています。
これまでAPIは各種連携で「使う側」として触ることが多かったのですが、自分でAPIを作って公開するのは今回が初めてでした。本記事はその学習ログとして、やったこと・ハマったことを自分用にまとめたメモです。内容はかなり基本寄りなので、慣れている方は読み飛ばしていただいて大丈夫です。
ゴール
- FastAPIでREST APIを作成(インシデント記録API)
- Postgres + SQLAlchemyでDB永続化
- Render にデプロイ
- Swagger(/docs)でAPI仕様を自動生成
- JWT(Bearer Token)で認証を実装し、/incidents 系を全て保護
-
/auth/registerは共有シークレットヘッダで登録制限(簡易的な防御) - Postmanでテストし、実際にDBに書き込み・読み出しできることを確認
1. 技術スタック
- Python / FastAPI:API本体を実装するための言語(Python)とWebフレームワーク(FastAPI)
- Uvicorn:FastAPIアプリをHTTPサーバーとして起動するASGIサーバー(Render上でもこれで起動)
-
SQLAlchemy(ORM):PythonからDBを扱うためのORM。モデル定義と
select()などでSQLを書かずに操作 - Postgres:永続化先のRDB。RenderのPostgresを利用して保存・取得を確認
-
JWT:
python-jose:JWT(アクセストークン)の発行・検証に使うライブラリ - パスワードハッシュ(passlib):パスワードを平文保存せず、ハッシュ化して安全に保存するためのライブラリ
- デプロイ:Render:GitHub連携でFastAPIをすぐ公開できるPaaS
- APIテスト(Postman):リクエスト送信、環境変数(baseUrl/token/キー)管理、認証付きテストに利用
- リポジトリ(GitHub):ソース管理。Private運用でもRenderは接続してデプロイ可能
2. Renderにデプロイして疎通確認
2.1 RenderでWeb Serviceを作成
-
Renderの New → Web Service を選択
-
GitHubのPrivate repoを接続
-
ビルド/起動(例):
- Start Command:
uvicorn app.main:app --host 0.0.0.0 --port $PORT
- Start Command:
2.2 疎通確認
-
GET /health→{ "status": "ok" } -
GET /→{ "message": "OK. See /docs for API documentation." } -
GET /docs→ Swagger UI
3. インシデント記録API(DB永続化)を実装
3.1 実装したエンドポイント
-
POST /incidents:インシデント作成 -
GET /incidents/{incident_id}:単体取得 -
GET /incidents?limit=&offset=:一覧取得(ページング)
3.2 DB接続とセッション
「DBにつなぐ」= Render側でDBの接続情報(DATABASE_URL)を環境変数として渡し、FastAPI側でSQLAlchemyのセッションを作って使うという流れ。

4. バリデーション(Pydantic)とHTTPステータス
4.1 入力バリデーション
- 例:
titleは 3文字以上 -
{"title":"ab"}を送ると 422 Unprocessable Entity
4.2 ステータスコード
- 作成成功:201 Created
- 取得失敗(存在しないID):404 Not Found
5. JWT認証を実装して「/incidents を全部保護」(インシデント記録APIの保護)
5.1 追加した認証エンドポイント
-
POST /auth/register:ユーザー登録(email + password)
-
POST /auth/login:ログイン → JWT(access_token)発行
5.2 保護対象
-
/incidents系をすべて保護:POST /incidentsGET /incidentsGET /incidents/{incident_id}
| 不正/未指定 | トークン検証 |
|---|---|
| |
6. 追加の安全策:登録(/auth/register)を共有ヘッダで制限
公開URLで誰でもユーザー登録できると荒らされる可能性があるので、/auth/register に簡易的な「登録キー」を追加。
6.1 仕組み
- ヘッダ
X-REG-KEYが必要 - Renderの環境変数
REGISTRATION_KEYと一致したら登録OK - 合わなければ 403 Forbidden
6.2 Postmanでの設定
- Headersに
X-REG-KEY: <秘密キー> - もしくはEnvironment変数
regKeyを作って{{regKey}}を使う
| 登録キーなし | 登録キーあり |
|---|---|
| |
7. Renderで設定した環境変数
-
JWT_SECRET:JWT署名用シークレット(必須) -
JWT_EXPIRE_MINUTES:有効期限(任意、例:60) -
REGISTRATION_KEY:登録制限用キー(/auth/register用)
付録壱:API一覧
Public
GET /GET /healthGET /docs
Auth
-
POST /auth/register(X-REG-KEY必須) POST /auth/login
Protected(Bearer Token必須)
POST /incidentsGET /incidentsGET /incidents/{incident_id}
付録弐:実装メモ
リポジトリ構成
# .
# ├── app/
# │ ├── main.py
# │ ├── auth.py
# │ ├── db.py
# │ └── models.py
# ├── requirements.txt
# └── README.md
main.py
from fastapi import FastAPI, Depends, HTTPException, Query, status
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.orm import Session
from .db import SessionLocal, engine, Base
from .models import Incident, User
from .auth import hash_password, verify_password, create_access_token, get_current_user
import os
from fastapi import FastAPI, Depends, HTTPException, Query, status, Header
app = FastAPI()
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ---------- Schemas ----------
class IncidentCreate(BaseModel):
title: str = Field(min_length=3, max_length=200)
class IncidentOut(BaseModel):
id: str
title: str
class RegisterIn(BaseModel):
email: str = Field(min_length=3, max_length=255)
password: str = Field(min_length=8, max_length=70)
class LoginIn(BaseModel):
email: str
password: str
class TokenOut(BaseModel):
access_token: str
token_type: str = "bearer"
# ---------- Public ----------
@app.get("/health")
def health():
return {"status": "ok"}
@app.get("/")
def root():
return {"message": "OK. See /docs for API documentation."}
# ---------- Auth ----------
@app.post("/auth/register", status_code=status.HTTP_201_CREATED)
def register(
payload: RegisterIn,
db: Session = Depends(get_db),
x_reg_key: str | None = Header(default=None, alias="X-REG-KEY"),
):
expected = os.environ.get("REGISTRATION_KEY")
if not expected:
raise HTTPException(status_code=500, detail="REGISTRATION_KEY is not configured")
if x_reg_key != expected:
raise HTTPException(status_code=403, detail="Registration disabled")
stmt = select(User).where(User.email == payload.email)
existing = db.execute(stmt).scalar_one_or_none()
if existing:
raise HTTPException(status_code=409, detail="Email already registered")
user = User(email=payload.email, password_hash=hash_password(payload.password))
db.add(user)
db.commit()
return {"message": "registered"}
@app.post("/auth/login", response_model=TokenOut)
def login(payload: LoginIn, db: Session = Depends(get_db)):
stmt = select(User).where(User.email == payload.email)
user = db.execute(stmt).scalar_one_or_none()
if not user or not verify_password(payload.password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid email or password")
token = create_access_token(subject=user.email)
return {"access_token": token, "token_type": "bearer"}
# ---------- Protected Incidents ----------
@app.post("/incidents", response_model=IncidentOut, status_code=status.HTTP_201_CREATED)
def create_incident(
payload: IncidentCreate,
db: Session = Depends(get_db),
_user: User = Depends(get_current_user),
):
incident = Incident(title=payload.title)
db.add(incident)
db.commit()
db.refresh(incident)
return {"id": str(incident.id), "title": incident.title}
@app.get("/incidents/{incident_id}", response_model=IncidentOut)
def get_incident(
incident_id: str,
db: Session = Depends(get_db),
_user: User = Depends(get_current_user),
):
stmt = select(Incident).where(Incident.id == incident_id)
incident = db.execute(stmt).scalar_one_or_none()
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
return {"id": str(incident.id), "title": incident.title}
@app.get("/incidents", response_model=list[IncidentOut])
def list_incidents(
db: Session = Depends(get_db),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
_user: User = Depends(get_current_user),
):
stmt = (
select(Incident)
.order_by(Incident.created_at.desc())
.offset(offset)
.limit(limit)
)
incidents = db.execute(stmt).scalars().all()
return [{"id": str(i.id), "title": i.title} for i in incidents]
auth.py
import os
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from passlib.context import CryptContext
from sqlalchemy import select
from sqlalchemy.orm import Session
from .db import SessionLocal
from .models import User
# --- Config ---
JWT_SECRET = os.environ.get("JWT_SECRET")
if not JWT_SECRET:
raise RuntimeError("JWT_SECRET is not set")
JWT_ALG = "HS256"
JWT_EXPIRE_MINUTES = int(os.environ.get("JWT_EXPIRE_MINUTES", "60"))
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(password: str, hashed: str) -> bool:
return pwd_context.verify(password, hashed)
def create_access_token(subject: str, expires_minutes: Optional[int] = None) -> str:
expire = datetime.utcnow() + timedelta(minutes=expires_minutes or JWT_EXPIRE_MINUTES)
payload = {"sub": subject, "exp": expire}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALG)
def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)) -> User:
cred_exc = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing token",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALG])
sub: str | None = payload.get("sub")
if not sub:
raise cred_exc
except JWTError:
raise cred_exc
stmt = select(User).where(User.email == sub)
user = db.execute(stmt).scalar_one_or_none()
if not user:
raise cred_exc
return user
db.py
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
DATABASE_URL = os.environ.get("DATABASE_URL")
if not DATABASE_URL:
raise RuntimeError("DATABASE_URL is not set")
# Render Postgres URLs are typically postgres://...; SQLAlchemy prefers postgresql://...
if DATABASE_URL.startswith("postgres://"):
DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)
class Base(DeclarativeBase):
pass
models.py
import uuid
from datetime import datetime
from sqlalchemy import String, DateTime
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from .db import Base
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow)
class Incident(Base):
__tablename__ = "incidents"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
title: Mapped[str] = mapped_column(String(200), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow)
requirements.txt
fastapi
uvicorn[standard]
SQLAlchemy>=2.0
psycopg2-binary
alembic
passlib==1.7.4
bcrypt==4.0.1
python-jose[cryptography]==3.3.0



