はじめに
この記事では、PythonのWebフレームワーク「FastAPI」とORMライブラリ「SQLAlchemy」を組み合わせてGoogle BigQueryと接続する方法を解説します。このプロジェクトでは、MVC(Model-View-Controller)+ Serviceアーキテクチャを採用しています。
プロジェクト構成
以下は、プロジェクトのディレクトリ構成の例です。
app/
├── main.py # アプリケーションのエントリポイント
├── config.py # 設定管理
├── database.py # データベース接続の設定
├── controllers/ # コントローラ(FastAPIエンドポイントの定義)
│ └── stock_controller.py
├── models/ # モデル(SQLAlchemyを使用したデータベーススキーマ)
│ └── stock_table.py
├── schemas/ # スキーマ(Pydanticを使用したデータ検証)
│ └── stock_schema.py
├── services/ # ビジネスロジックやデータ操作を管理
│ └── stock_service.py
├── lib/ # ユーティリティ関数やヘルパー
│ └── utility.py
└── tmp/ # 一時ファイル保存用ディレクトリ
ポイント
-
controllers
:- エンドポイント(ルート)を定義。
- リクエストを受け取り、適切な
services
を呼び出します。
-
models
:- データベーススキーマを定義(SQLAlchemy)。
-
schemas
:- リクエストやレスポンスの型を定義(Pydantic)。
-
services
:- ビジネスロジックやデータベース操作を担当。
-
lib
:- ユーティリティ関数や共通処理を定義。
GCPへの認証設定
BigQueryを利用するには、GCPの認証が必要です。このプロジェクトでは、環境変数やDockerを通じて認証情報を設定しています。
環境変数の設定
ローカル環境では、以下の環境変数を設定します:
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your-service-account-key.json"
Dockerでの認証設定
Docker環境では、Dockerfile
を通じて認証情報を設定します。
# Dockerfile
ENV GOOGLE_APPLICATION_CREDENTIALS="/code/service-account-key.json"
ポイント:
- サービスアカウントキーのJSONファイルをプロジェクトディレクトリに配置します。
- 環境変数
GOOGLE_APPLICATION_CREDENTIALS
を使用して、認証情報をBigQueryライブラリに認識させます。
詳細解説
1. データベース接続の設定
database.py
では、SQLAlchemyを使ってBigQueryへの接続を設定します。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.config import get_settings
settings = get_settings()
# SQLAlchemyエンジンの作成
DATABASE_URL = f"bigquery://{settings.gcp_project_id}"
engine = create_engine(DATABASE_URL, echo=True)
# セッションの作成
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
ポイント:
-
BigQuery接続:
-
bigquery://<project_id>
形式で接続します。
-
-
セッション管理:
- リクエストごとに独立したセッションを提供します。
2. モデル(データベーススキーマ)
models/stock_table.py
では、BigQueryのテーブルスキーマをSQLAlchemyで定義します。
from sqlalchemy import Boolean, Column, Date, DateTime, Float, Integer, String
from sqlalchemy.orm import declarative_base
from app.lib.utility import now_jst
Base = declarative_base()
class Stock(Base):
__tablename__ = "stocks"
trade_date = Column(Date, primary_key=True)
ticker = Column(String, primary_key=True)
open = Column(Float, nullable=False, default=0)
high = Column(Float, nullable=False, default=0)
low = Column(Float, nullable=False, default=0)
close = Column(Float, nullable=False, default=0)
volume = Column(Integer, nullable=False, default=0)
adj_close = Column(Float, nullable=False, default=0)
created_at = Column(DateTime, nullable=False, default=now_jst())
updated_at = Column(DateTime, nullable=False, default=now_jst(), onupdate=now_jst())
status = Column(Boolean, nullable=False, default=True)
3. スキーマ(データ検証とレスポンス)
schemas/stock_schema.py
では、リクエストおよびレスポンスの型を定義します。
from pydantic import BaseModel
from datetime import date, datetime
class StockResponse(BaseModel):
trade_date: date
ticker: str
open: float
high: float
low: float
close: float
volume: int
adj_close: float
created_at: datetime
updated_at: datetime
status: bool
4. サービス(ビジネスロジック)
services/stock_service.py
では、データベース操作やビジネスロジックを実装します。ユーティリティ関数によるモデル変換もここで行います。
from app.models.stock_table import Stock
from app.schemas.stock_schema import StockResponse
from sqlalchemy.orm import Session
def create_stock(db: Session, stock_data: dict):
stock = Stock(**stock_data)
db.add(stock)
db.commit()
db.refresh(stock)
return convert_to_response(stock)
def convert_to_response(stock_data: Stock):
return StockResponse.model_validate(stock_data)
ポイント:
-
create_stock
内でconvert_to_response
を呼び出し、データベースモデルをレスポンススキーマに変換。 - モデル変換処理をサービス層に統一して管理。
5. コントローラ(エンドポイント)
controllers/stock_controller.py
では、FastAPIエンドポイントを定義します。
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.database import get_db
from app.schemas.stock_schema import StockResponse
from app.services.stock_service import create_stock
router = APIRouter()
@router.post("/stocks", response_model=StockResponse)
def create_stock_endpoint(stock_data: dict, db: Session = Depends(get_db)):
return create_stock(db, stock_data)
全体のデータフロー
-
GCP認証:
- Docker環境やローカル環境で認証情報を設定。
- BigQueryとの認証を確立。
-
データベース接続:
- SQLAlchemyエンジンを作成してセッションを管理。
-
コントローラ:
- リクエストを受け取り、サービス層のロジックを呼び出す。
-
サービス:
- ビジネスロジックやデータベース操作を実行。
- モデルをPydanticスキーマに変換。
-
レスポンス:
- クライアントにスキーマ形式のデータを返却。
この記事では、FastAPIとSQLAlchemyを使ったBigQuery連携をMVC+Serviceアーキテクチャで設計する方法を詳しく解説しました。これを参考に、スケーラブルで拡張性の高いアプリケーションを構築してください!