概要
SQL文を書かずにSQLAlchemyでBigQueryのクエリーを実行する方法を書きました!
基本的にSQLを書きたい場合はこちらのBigQuery用のPythonクライアントをみてください。
詳細は公式ドキュメントへ!
BigQueryの基本的な知識こちらへ!
インストール
ライブラリーをインストール
pip install sqlalchemy-bigquery
コネクションを接続
以下のようにコネクションは作れる
from sqlalchemy import MetaData
from sqlalchemy import create_engine
engine = create_engine(
'bigquery://',
credentials_info=credentials,
)
metadata = MetaData()
クレデンシャル情報はJSONで渡せるが、 JSONファイルのパスでも大丈夫
# JSONで渡す
credentials = {
'type': 'xxxx'
'project_id': 'xxxx'
# ホゲホゲ
}
engine = create_engine('bigquery://', credentials_info=credentials)
# パスで渡す
engine = create_engine('bigquery://', credentials_path='/path/to/keyfile.json')
「プロジェクト名」や「データセット名」のデフォルトも設定できる
engine = create_engine('bigquery://project_name/dataset_name', credentials_info=credentials)
こんなエラーが出ないように、Dialectをコネクション開始する前に登録しておいた方がいい。
NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:bigquery
from sqlalchemy.dialects import registry
registry.register('bigquery', 'sqlalchemy_bigquery', 'BigQueryDialect')
クエリー
SQLAlchemyのシンタックスでクエリーを行いたい
まず、Tableを取得する
from sqlalchemy import Table
def get_table(project_name: str, dataset_name: str, table_name: str) -> Table:
table = Table(f'{project_name}.{dataset_name}.{table_name}', metadata, autoload_with=engine)
return table
次に、そのテーブルでクエリーを行う。汎用SELECTメソッドを作る。
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import Session
from sqlalchemy.sql import Selectable
def select(stmt: Selectable) -> list[dict]:
with Session(engine) as session:
results: list[Row] = session.execute(stmt).fetchall()
return [r._asdict() for r in results]
最後に組み合わせる!
from sqlalchemy.sql import select
def select_something() -> list[dict]:
table = get_table('project_name', 'dataset', 'table')
stmt = (
select(
table.c.id.label('ID'),
table.c.name.label('Name'),
table.c.categoryID.label('CategoryID'),
table.c.sortID.label('SortID'),
)
.filter(
table.c.categoryID == 1,
)
.order_by(table.c.id)
)
return select(stmt)
完成!
Summary
orm.py
from sqlalchemy import MetaData
from sqlalchemy import Table
from sqlalchemy import create_engine
from sqlalchemy.dialects import registry
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import Session
from sqlalchemy.sql import Selectable
class SQLAlchemyBigQueryDataSource:
def __init__(self, credentials: dict) -> None:
registry.register('bigquery', 'sqlalchemy_bigquery', 'BigQueryDialect')
self.engine = create_engine(
'bigquery://',
credentials_info=credentials,
)
self.metadata = MetaData()
def get_table(self, project_name: str, dataset_name: str, table_name: str) -> Table:
table = Table(f'{project_name}.{dataset_name}.{table_name}', self.metadata, autoload_with=self.engine)
return table
def select(self, stmt: Selectable) -> list[dict]:
with Session(self.engine) as session:
results: list[Row] = session.execute(stmt).fetchall()
return [r._asdict() for r in results]
table_name.py
from sqlalchemy import Table
from sqlalchemy.sql import select
from orm import SQLAlchemyBigQueryDataSource
class SomethingBigQueryDataSource:
def __init__(self, data_source: SQLAlchemyBigQueryDataSource) -> None:
self.data_source = data_source
def select_something(self) -> list[dict]:
table = get_table('project_name', 'dataset', 'table')
stmt = (
select(
table.c.id.label('ID'),
table.c.name.label('Name'),
table.c.categoryID.label('CategoryID'),
table.c.sortID.label('SortID'),
)
.filter(
table.c.categoryID == 1,
)
.order_by(table.c.id)
)
return select(stmt)