2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FastAPIとSQLAlchemyを用いたBigQueryの活用方法

Last updated at Posted at 2024-11-26

はじめに

この記事では、PythonのWebフレームワーク「FastAPI」とORMライブラリ「SQLAlchemy」を組み合わせてGoogle BigQueryと接続する方法を解説します。このプロジェクトでは、MVC(Model-View-Controller)+ Serviceアーキテクチャを採用しています。


プロジェクト構成

以下は、プロジェクトのディレクトリ構成の例です。

app/
├── main.py               # アプリケーションのエントリポイント
├── config.py             # 設定管理
├── database.py           # データベース接続の設定
├── controllers/          # コントローラ(FastAPIエンドポイントの定義)
│   └── stock_controller.py
├── models/               # モデル(SQLAlchemyを使用したデータベーススキーマ)
│   └── stock_table.py
├── schemas/              # スキーマ(Pydanticを使用したデータ検証)
│   └── stock_schema.py
├── services/             # ビジネスロジックやデータ操作を管理
│   └── stock_service.py
├── lib/                  # ユーティリティ関数やヘルパー
│   └── utility.py
└── tmp/                  # 一時ファイル保存用ディレクトリ

ポイント

  • controllers:
    • エンドポイント(ルート)を定義。
    • リクエストを受け取り、適切なservicesを呼び出します。
  • models:
    • データベーススキーマを定義(SQLAlchemy)。
  • schemas:
    • リクエストやレスポンスの型を定義(Pydantic)。
  • services:
    • ビジネスロジックやデータベース操作を担当。
  • lib:
    • ユーティリティ関数や共通処理を定義。

GCPへの認証設定

BigQueryを利用するには、GCPの認証が必要です。このプロジェクトでは、環境変数やDockerを通じて認証情報を設定しています。

環境変数の設定

ローカル環境では、以下の環境変数を設定します:

export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your-service-account-key.json"

Dockerでの認証設定

Docker環境では、Dockerfileを通じて認証情報を設定します。

# Dockerfile
ENV GOOGLE_APPLICATION_CREDENTIALS="/code/service-account-key.json"

ポイント:

  • サービスアカウントキーのJSONファイルをプロジェクトディレクトリに配置します。
  • 環境変数GOOGLE_APPLICATION_CREDENTIALSを使用して、認証情報をBigQueryライブラリに認識させます。

詳細解説

1. データベース接続の設定

database.pyでは、SQLAlchemyを使ってBigQueryへの接続を設定します。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.config import get_settings

settings = get_settings()

# SQLAlchemyエンジンの作成
DATABASE_URL = f"bigquery://{settings.gcp_project_id}"

engine = create_engine(DATABASE_URL, echo=True)

# セッションの作成
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

ポイント:

  • BigQuery接続:
    • bigquery://<project_id>形式で接続します。
  • セッション管理:
    • リクエストごとに独立したセッションを提供します。

2. モデル(データベーススキーマ)

models/stock_table.pyでは、BigQueryのテーブルスキーマをSQLAlchemyで定義します。

from sqlalchemy import Boolean, Column, Date, DateTime, Float, Integer, String
from sqlalchemy.orm import declarative_base
from app.lib.utility import now_jst

Base = declarative_base()

class Stock(Base):
    __tablename__ = "stocks"

    trade_date = Column(Date, primary_key=True)
    ticker = Column(String, primary_key=True)
    open = Column(Float, nullable=False, default=0)
    high = Column(Float, nullable=False, default=0)
    low = Column(Float, nullable=False, default=0)
    close = Column(Float, nullable=False, default=0)
    volume = Column(Integer, nullable=False, default=0)
    adj_close = Column(Float, nullable=False, default=0)
    created_at = Column(DateTime, nullable=False, default=now_jst())
    updated_at = Column(DateTime, nullable=False, default=now_jst(), onupdate=now_jst())
    status = Column(Boolean, nullable=False, default=True)

3. スキーマ(データ検証とレスポンス)

schemas/stock_schema.pyでは、リクエストおよびレスポンスの型を定義します。

from pydantic import BaseModel
from datetime import date, datetime

class StockResponse(BaseModel):
    trade_date: date
    ticker: str
    open: float
    high: float
    low: float
    close: float
    volume: int
    adj_close: float
    created_at: datetime
    updated_at: datetime
    status: bool

4. サービス(ビジネスロジック)

services/stock_service.pyでは、データベース操作やビジネスロジックを実装します。ユーティリティ関数によるモデル変換もここで行います。

from app.models.stock_table import Stock
from app.schemas.stock_schema import StockResponse
from sqlalchemy.orm import Session

def create_stock(db: Session, stock_data: dict):
    stock = Stock(**stock_data)
    db.add(stock)
    db.commit()
    db.refresh(stock)
    return convert_to_response(stock)

def convert_to_response(stock_data: Stock):
    return StockResponse.model_validate(stock_data)

ポイント:

  • create_stock内でconvert_to_responseを呼び出し、データベースモデルをレスポンススキーマに変換。
  • モデル変換処理をサービス層に統一して管理。

5. コントローラ(エンドポイント)

controllers/stock_controller.pyでは、FastAPIエンドポイントを定義します。

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.database import get_db
from app.schemas.stock_schema import StockResponse
from app.services.stock_service import create_stock

router = APIRouter()

@router.post("/stocks", response_model=StockResponse)
def create_stock_endpoint(stock_data: dict, db: Session = Depends(get_db)):
    return create_stock(db, stock_data)

全体のデータフロー

  1. GCP認証:
    • Docker環境やローカル環境で認証情報を設定。
    • BigQueryとの認証を確立。
  2. データベース接続:
    • SQLAlchemyエンジンを作成してセッションを管理。
  3. コントローラ:
    • リクエストを受け取り、サービス層のロジックを呼び出す。
  4. サービス:
    • ビジネスロジックやデータベース操作を実行。
    • モデルをPydanticスキーマに変換。
  5. レスポンス:
    • クライアントにスキーマ形式のデータを返却。

この記事では、FastAPIとSQLAlchemyを使ったBigQuery連携をMVC+Serviceアーキテクチャで設計する方法を詳しく解説しました。これを参考に、スケーラブルで拡張性の高いアプリケーションを構築してください!

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?