Livesense Advent Calendar 2022 12/13の記事です。
TL;TR
以下の方法を紹介します
- sqlalchemyでinspectを使ってテーブル定義を取得して転送時に利用する設定を生成する
- pandasでデータベースからデータを読みながらparquetを書く
- gsutil/bqコマンドでBQへロードし、SQLでテーブルコメントを付与する
はじめに
分析などの用途でBigQueryを用いるとき、各種サービスのデータベース上のデータをBQにロードする、という状況が生まれます。このためのツールやサービスとして、AirbyteやCloud Data Fusion、trocco、embulkなどがありますが、まずはスモールにpythonと各種コマンドを使って作るとどうなるのか、というのを考えて試してみました。
データは、1日に1回程度、毎回全件削除して投入し直す(洗い替える)という想定のシステムです。実行環境やワークフローエンジンのことはここでは考えません。
実行環境
転送元DBは、MySQLまたはPostgreSQLを想定しています。動作確認は、MySQLで行っています。
本文中で紹介するコードの実行に必要なrequirements.txt
は以下のとおりです。
invoke~=1.7.0
PyMySQL~=1.0.0
pandas~=1.3.0
psycopg2-binary~=2.9.0
pyarrow~=9.0.0
python-dotenv~=0.21.0
sqlalchemy~=1.4.0
実行時には以下のような.env
ファイルを用意して項目を埋めてください。
# mysql+pymysql または postgresql+psycopg2
DRIVERNAME="mysql+pymysql"
HOST=
PORT=
USERNAME=
PASSWORD=
DATABASE=
PROJECT_ID=
DATASET_ID=
GCS_BUCKET=
実装
今回作ったコードの実装を先に記載しておきます。詳細はあとで解説していきます。全体で百数十行程度です。タスクランナーとして、invokeを利用しています。
import json
import os
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
from invoke import task
from pyarrow import Table
from pyarrow.parquet import ParquetWriter
from sqlalchemy import create_engine, inspect, types
from sqlalchemy.dialects import mysql
from sqlalchemy.engine.url import URL
load_dotenv()
PROJECT_ID = os.environ["PROJECT_ID"]
DATASET_ID = os.environ["DATASET_ID"]
GCS_BUCKET = os.environ["GCS_BUCKET"]
DRIVERNAME = os.environ["DRIVERNAME"]
DB_URL = URL.create(
drivername=DRIVERNAME,
host=os.environ["HOST"],
port=os.environ["PORT"],
database=os.environ["DATABASE"],
username=os.environ["USERNAME"],
password=os.environ["PASSWORD"],
query=os.environ.get("ARGS"),
)
@task
def workflow(ctx, schema, table_name):
makefiles(ctx, schema=schema, table_name=table_name)
extract(ctx, schema=schema, table_name=table_name)
gsutil_cp(ctx)
bq_load(ctx, schema=schema, table_name=table_name)
bq_query(ctx)
@task
def makefiles(ctx, schema, table_name):
# テーブルコメント付与のためのDDL
engine = create_engine(DB_URL)
inspector = inspect(engine)
Path("table_comment.sql").write_text(_generate_table_comment_ddl(inspector, table_name, schema))
# bq loadで使うスキーマファイルの作成
Path("schema.json").write_text(_generate_table_schema(inspector, table_name, schema))
def _generate_table_comment_ddl(inspector, table_name: str, schema: str):
table_comment = inspector.get_table_comment(table_name=table_name)["text"]
return f"""ALTER TABLE IF EXISTS `{DATASET_ID}.{schema}_{table_name}` SET OPTIONS (
description = "{table_comment}"
);
"""
def _generate_table_schema(inspector, table_name: str, schema: str):
data = []
for column in inspector.get_columns(table_name=table_name, schema=schema):
data.append(
{
# https://cloud.google.com/bigquery/docs/schemas#specifying_a_json_schema_file
"name": column["name"],
"type": _conv_type(column["type"]),
"mode": "NULLABLE" if column["nullable"] else "Required",
"description": column["comment"] or "",
}
)
return json.dumps(data, indent=4, ensure_ascii=False)
TYPE_CONVERT_TABLE = {
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
mysql.TINYINT: "INT64",
types.Boolean: "BOOL",
types.Date: "DATE",
types.DateTime: "DATETIME",
types.Enum: "STRING",
types.Float: "FLOAT64",
types.INTEGER: "INT64",
types.String: "STRING",
types.Text: "STRING",
types.Time: "TIME",
# 必要に応じて追加
# https://docs.sqlalchemy.org/en/14/core/type_basics.html#generic-camelcase-types
# https://docs.sqlalchemy.org/en/14/dialects/mysql.html
# https://docs.sqlalchemy.org/en/14/dialects/postgresql.html
}
def _conv_type(typeinstance):
for candidate, bqtype in TYPE_CONVERT_TABLE.items():
if isinstance(typeinstance, candidate):
return bqtype
raise ValueError(f"{typeinstance}")
@task
def extract(ctx, schema, table_name, outpath="output.parquet", chunksize=1000000):
engine = create_engine(DB_URL)
with engine.connect().execution_options(stream_results=True) as conn:
for count, chunk_df in enumerate(
pd.read_sql_query(
f"SELECT * FROM {schema}.{table_name}",
conn,
chunksize=chunksize,
)
):
chunk_df = chunk_df.convert_dtypes()
table = Table.from_pandas(chunk_df)
if count == 0:
schema = table.schema
pq_writer = ParquetWriter(outpath, schema, compression="gzip")
pq_writer.write_table(table)
if pq_writer:
pq_writer.close()
@task
def gsutil_cp(ctx, src="output.parquet", dst=GCS_BUCKET + "/" + "output.parquet"):
command = f"gsutil -m cp {src} {dst}"
ctx.run(command, echo=True)
@task
def bq_load(
ctx, schema, table_name, gcspath=GCS_BUCKET + "/" + "output.parquet", schemapath="schema.json"
):
command = (
f"bq --project_id {PROJECT_ID} load --source_format PARQUET"
f" --replace {DATASET_ID}.{schema}_{table_name} {gcspath} {schemapath}"
)
ctx.run(command, echo=True)
@task
def bq_query(ctx, sqlpath="table_comment.sql"):
command = f"bq --project_id {PROJECT_ID} --location asia-northeast1 query --nouse_legacy_sql --batch < {sqlpath}"
ctx.run(command, echo=True)
実行の流れ
workflow
関数に主要なタスクを定義しています。以下のようなコマンドで実行することができます。
invoke workflow --schema={同期元テーブルのスキーマ名} --table-name={同期元テーブル名}
以下の処理が順次実行されます。
- 設定ファイルなどの作成(makefiles)
- データの取得(extract)
- gcsアップロード(gsutil_cp)
- BQロード(bq_load)
- SQL実行によるテーブルコメント付与(bq_query)
設定ファイルなどの作成
ここでは、スキーマのjsonファイルや、後に利用するSQLファイルを生成します。BQでは、データ転送時、データから自動的に型を推定する機能があります。これを使えば、テーブル定義を考えず、手軽にデータの転送が可能ですが、推論結果はデータに依存します。また、この方法では、テーブルやカラムのコメント、必須カラムであるか否かなど、分析に有益な情報が欠落します。
BQには優秀なWebUIが備わり、テーブル定義が簡単に確認できるので、これらの情報はできるだけ保持して、データ転送を行いたいと考えました。サービスのDBからテーブル定義を取り出して、BQ用のスキーマ情報をjsonで生成します。テーブルのコメントはSQLで付与することにします。
テーブル定義の読み込み
テーブル定義の読み込みには sqlalchemyのinspectを使います。これを使うと、DBごとの差異もsqlalchemyが吸収してくれて、テーブル定義などを容易に扱うことができます。
engine = create_engine(DB_URL)
inspector = inspect(engine)
スキーマjsonファイルの作成
inspectを使って取得した情報からBQのスキーマjsonファイルを作成します。
カラムの情報は inspector.get_columns
で取得できるので、これを _generate_table_schema
のところで加工しています。今回は、型、mode(必須か否か)、コメントだけを定義していますが、デフォルト値なども与えることができると思います。同期元と同期先の型の変換はコード中で定義しています。
テーブルコメントの生成
_generate_table_comment_ddl
のところで、inspector.get_table_comment
でコメントを取得し、BQのテーブルにコメントを付与するSQLを作成しています。このSQLはデータロード後に実行します。テーブルコメントの付与はAPI等でも可能ですが、ロード後にSQLを実行するフローは、ELTの処理など、ちょっとしたデータ変換へも応用できると考えてSQLにしておきます。
データの取得
extract
タスクで同期元DBからデータを取得します。やり方は様々ありますが、今回はpandasを使いたいと思います。NULLなのか空文字なのかといった情報もわかりやすく保存しておきたいので、parquetを利用します。pandasでそのままデータを取得すると、テーブルのデータ量によってはメモリが不足し得るので、parquetは「読みながら書く」というような実装になっています。以下を参考にしています。
実装中で、データフレームに対し、convert_dtypes
を使っていますが、取得するデータとchunksize
によっては、chunkごとにカラムの型が異なって推定されてしまい、エラーになるかもしれません。その場合はchunksize
を調整したり、read_sql_query
に対してdtype
を与える必要があります。今回は省略していますが、ここでも、inspector.get_columns
の結果が利用できると思います。
その後の処理
gsutil_cp
やbq_load
、bq_query
のタスクを使って、作成したparquetをgsutilコマンドでGCSにアップロードしたあと、bq load コマンドでGCSからBQへロードします。最後に、bq queryコマンドを使って、テーブルコメントを付与しています。python APIでも同様の事ができるので、そちらを利用するのも良いかもしれません。
おわりに
あるテーブルのデータを、スキーマの情報とともにBQに転送するような実装を紹介しました。少し実装を改変すれば、DB上の全テーブルを同期するプログラムにもできると思います。今回のプログラムはコンパクトな一方、型変換や同期テーブル・カラムの管理が大変ですし、ワークフローの実行管理も必要です。CDC(Change Data Caption)への拡張は難しいです。
データ同期は考慮すべきことが多いですが、まずは小さなプログラムで実装したあと、データ活用が浸透し、徐々に課題が明らかになった後に、様々なELT/ETLのツールやサービスを検討するのが良いのではないか、と思っています。