以下の手順でGoogle BigQueryからPostgreSQLにテーブルをコピーします。
- BigQueryからテーブルのスキーマとデータをAvro形式でexportする
- 本記事に記載のPythonコードにAvroファイルを渡して、PostgreSQLへのimport用SQL文を出力させる
- そのSQL文をPostgreSQLで動かしてimportする
その後、BigQueryとPostgreSQLで同じSELECT文を実行して、同じ結果が返ることをざっくり確認します。
※ 場合により、テーブルの完全なコピーにならないこともありますので、ご了承ください。
ご利用は自己責任でお願いします。
BigQueryからテーブルのスキーマとデータをAvro形式でexportする
BigQueryの「エクスポート」メニューから「GCSにエクスポート」を選択します。
「GCSのロケーション」を設定し、エクスポート形式は「Avro」を選択して保存ボタンを押します。
GCSからローカルのマシンにAvroファイルをダウンロードします。
PostgreSQLへのimport用SQL文を生成する
コマンドライン引数にAvroファイル名を渡して、以下のPythonコードを動かします。
Avroファイル名から拡張子を除いた前半部分が、PostgreSQLのテーブル名として使用されます。
出力されるSQLファイル名は、そのテーブル名に拡張子sqlをつけたものになります。
import sys
import traceback
import json
from pathlib import Path
import fastavro
from collections import deque
from psycopg2.extensions import adapt
class MyException(Exception):
pass
class MyReader(fastavro.reader):
def __init__(self, file):
super().__init__(file)
self.meta = self._header["meta"]
def escape(ss):
return str(adapt(ss.encode("utf-8").decode("latin-1")))
def convert_to_postgres_type(avro_type, default_value, ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type):
postgres_type = None
default_str = None
nullable = False
if default_value is None:
default_str = "NULL"
if type(avro_type) is list:
if "null" in avro_type:
nullable = True
avro_type.remove("null")
assert len(avro_type) == 1
avro_type = avro_type[0]
if type(avro_type) is str:
if avro_type == "string":
postgres_type = "VARCHAR"
if default_value is not None:
default_str = escape(default_value)
elif avro_type == "bytes":
postgres_type = "BYTEA"
if default_value is not None:
pass
elif avro_type == "long":
postgres_type = "BIGINT"
if default_value is not None:
default_str = str(default_value)
elif avro_type == "double":
postgres_type = "DOUBLE PRECISION"
if default_value is not None:
default_str = str(default_value)
elif avro_type == "boolean":
postgres_type = "BOOLEAN"
if default_value is not None:
if default_value:
default_str = "TRUE"
else:
default_str = "FALSE"
elif type(avro_type) is dict:
if "sqlType" in avro_type:
sql_type = avro_type["sqlType"]
if sql_type == "JSON":
postgres_type = "JSON"
if default_value is not None:
default_str = escape(default_value)
elif "logicalType" in avro_type:
logical_type = avro_type["logicalType"]
if logical_type == "decimal":
postgres_type = "NUMERIC"
if default_value is not None:
default_str = str(default_value)
elif logical_type in ("timestamp-millis", "timestamp-micros"):
postgres_type = "TIMESTAMP WITH TIME ZONE"
if default_value is not None:
default_str = escape(default_value.isoformat())
elif logical_type == "date":
postgres_type = "DATE"
if default_value is not None:
default_str = escape(default_value.isoformat())
elif logical_type in ("time-millis", "time-micros"):
postgres_type = "TIME WITH TIME ZONE"
if default_value is not None:
default_str = escape(default_value.isoformat())
elif logical_type in ("local-timestamp-millis", "local-timestamp-micros"):
postgres_type = "TIMESTAMP"
if default_value is not None:
default_str = escape(default_value.isoformat())
else:
avro_child_type = avro_type["type"]
if avro_child_type == "array":
default_value = None
postgres_items_type, _, _ = convert_to_postgres_type(
avro_type["items"], default_value, ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type)
if type(postgres_items_type) is str:
postgres_type = {
"array": postgres_items_type + "[]",
"items": postgres_items_type
}
else:
postgres_type = {
"array": postgres_items_type["record"] + "[]",
"items": postgres_items_type
}
elif avro_child_type == "record":
len_is_serial_of_postgres_record_type.append(True)
postgres_record_type = f"{postgres_record_type_prefix}_type_{len(len_is_serial_of_postgres_record_type)}"
postgres_fields_type = make_sql_create_type(
postgres_record_type, avro_type["fields"], ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type)
postgres_type = {
"record": postgres_record_type,
"fields": postgres_fields_type
}
return postgres_type, default_str, nullable
def make_sql_create_type(name, avro_fields, ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type):
ddl_stmt = ""
postgres_fields_type = []
for field in avro_fields:
default_value = None
postgres_type, _, _ = convert_to_postgres_type(
field["type"], default_value, ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type)
postgres_fields_type.append(postgres_type)
if len(ddl_stmt) == 0:
ddl_stmt = f"CREATE TYPE {name} AS (\n "
else:
ddl_stmt += "\n , "
ddl_stmt += field["name"] + ' '
if type(postgres_type) is str:
ddl_stmt += postgres_type
elif "record" in postgres_type:
ddl_stmt += postgres_type["record"]
else:
ddl_stmt += postgres_type["array"]
if 0 < len(ddl_stmt):
ddl_stmt += "\n);\n"
ddl_queue.append(ddl_stmt)
return postgres_fields_type
def make_sql_create_table(name, avro_fields, ddl_queue, postgres_record_type_prefix):
ddl_stmt = ""
postgres_type_list = []
len_is_serial_of_postgres_record_type = []
for field in avro_fields:
if "default" in field:
default_value = field["default"]
else:
default_value = None
postgres_type, default_str, nullable = convert_to_postgres_type(
field["type"], default_value, ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type)
postgres_type_list.append(postgres_type)
if len(ddl_stmt) == 0:
ddl_stmt = f"CREATE TABLE {name} (\n "
else:
ddl_stmt += "\n , "
ddl_stmt += field["name"] + ' '
if type(postgres_type) is str:
ddl_stmt += postgres_type
elif "record" in postgres_type:
ddl_stmt += postgres_type["record"]
else:
ddl_stmt += postgres_type["array"]
if not nullable:
ddl_stmt += " NOT NULL"
if "default" in field:
ddl_stmt += f" DEFAULT {default_str}"
if 0 < len(ddl_stmt):
ddl_stmt += "\n);\n"
ddl_queue.append(ddl_stmt)
return postgres_type_list
def convert_to_postgres_value(avro_type, postgres_type, avro_value, null_if_convert_error):
if avro_value is None:
return "NULL"
try:
value_str = ""
if type(avro_type) is list:
if "null" in avro_type:
avro_type.remove("null")
assert len(avro_type) == 1
avro_type = avro_type[0]
if type(avro_type) is str:
if avro_type == "string":
value_str = escape(avro_value)
elif avro_type == "long":
value_str = str(avro_value)
elif avro_type == "double":
value_str = str(avro_value)
elif avro_type == "boolean":
if avro_value:
value_str = "TRUE"
else:
value_str = "FALSE"
elif type(avro_type) is dict:
if "sqlType" in avro_type:
sql_type = avro_type["sqlType"]
if sql_type == "JSON":
value_str = escape(avro_value)
elif "logicalType" in avro_type:
logical_type = avro_type["logicalType"]
if logical_type == "decimal":
value_str = str(avro_value)
elif logical_type in ("timestamp-millis", "timestamp-micros"):
value_str = escape(avro_value.isoformat())
elif logical_type == "date":
value_str = escape(avro_value.isoformat())
elif logical_type in ("time-millis", "time-micros"):
value_str = escape(avro_value.isoformat())
elif logical_type in ("local-timestamp-millis", "local-timestamp-micros"):
value_str = escape(avro_value.isoformat())
else:
avro_child_type = avro_type["type"]
if avro_child_type == "array":
avro_items_type = avro_type["items"]
postgres_items_type = postgres_type["items"]
postgres_value_list = []
for val in avro_value:
postgres_value_list.append(convert_to_postgres_value(
avro_items_type, postgres_items_type, val, null_if_convert_error))
value_str = f'ARRAY[{",".join(postgres_value_list)}]::{postgres_type["array"]}'
elif avro_child_type == "record":
avro_fields = avro_type["fields"]
postgres_fields_type = postgres_type["fields"]
postgres_value_list = []
for avro_field, postgres_field_type in zip(avro_fields, postgres_fields_type):
postgres_value_list.append(convert_to_postgres_value(
avro_field["type"], postgres_field_type, avro_value[avro_field["name"]], null_if_convert_error))
value_str = f'ROW({",".join(postgres_value_list)})'
if len(value_str) == 0:
raise MyException(f"Convert Error: type={avro_type}, value={avro_value}")
except Exception as e:
if null_if_convert_error:
return "NULL"
else:
raise e
else:
return value_str
def make_sql_insert(tablename, schema, postgres_type_list, rec, null_if_convert_error=False):
insert_into = []
insert_values = []
fields = schema["fields"]
for field, postgres_type in zip(fields, postgres_type_list):
field_name = field["name"]
insert_into.append(field_name)
insert_values.append(convert_to_postgres_value(field["type"], postgres_type, rec[field_name], null_if_convert_error))
insert_into_str = "\n , ".join(insert_into)
insert_values_str = "\n , ".join(insert_values)
insert_stmt = f"INSERT INTO {tablename} (\n {insert_into_str}\n)\nVALUES (\n {insert_values_str}\n);\n"
return insert_stmt
def main():
in_path = Path(sys.argv[1])
out_dir = Path.cwd() tablename = in_path.stem.lower()
postgres_record_type_prefix = tablename
out_path = out_dir / f"{tablename}.sql"
with open(in_path, "rb") as fi, open(out_path, "wt", encoding="utf-8") as fo:
reader = MyReader(fi)
schema = json.loads(reader.meta["avro.schema"].decode("utf-8"))
# トランザクション開始
fo.write("BEGIN;\n")
# DDL
ddl_queue = deque()
postgres_type_list = make_sql_create_table(tablename, schema["fields"], ddl_queue, postgres_record_type_prefix)
for ddl_stmt in ddl_queue:
fo.write(ddl_stmt)
# INSERT文
num = 0
for rec in reader:
num += 1
insert_stmt = make_sql_insert(tablename, schema, postgres_type_list, rec, null_if_convert_error=False)
fo.write(insert_stmt)
print(f"{num} 行")
# トランザクション終了
fo.write("COMMIT;\n")
if __name__ == "__main__":
try:
main()
except Exception:
print(traceback.format_exc())
このPythonコードの起動例です。コマンドライン引数で渡せるAvroファイルは1つのみです。
python bq_avro_to_postgres_db.py events_20210131.avro
以下のように、DDLの後にINSERT文が入ったSQLファイルが出力されます。
BEGIN;
CREATE TYPE events_20210131_type_2 AS (
string_value VARCHAR
, int_value BIGINT
, float_value DOUBLE PRECISION
, double_value DOUBLE PRECISION
);
CREATE TYPE events_20210131_type_1 AS (
key VARCHAR
, value events_20210131_type_2
);
CREATE TYPE events_20210131_type_3 AS (
analytics_storage BIGINT
, ads_storage BIGINT
, uses_transient_token VARCHAR
);
CREATE TYPE events_20210131_type_5 AS (
string_value BIGINT
, int_value BIGINT
, float_value BIGINT
, double_value BIGINT
, set_timestamp_micros BIGINT
);
CREATE TYPE events_20210131_type_4 AS (
key BIGINT
, value events_20210131_type_5
);
CREATE TYPE events_20210131_type_6 AS (
revenue DOUBLE PRECISION
, currency VARCHAR
);
CREATE TYPE events_20210131_type_8 AS (
browser VARCHAR
, browser_version VARCHAR
);
CREATE TYPE events_20210131_type_7 AS (
category VARCHAR
, mobile_brand_name VARCHAR
, mobile_model_name VARCHAR
, mobile_marketing_name VARCHAR
, mobile_os_hardware_model BIGINT
, operating_system VARCHAR
, operating_system_version VARCHAR
, vendor_id BIGINT
, advertising_id BIGINT
, language VARCHAR
, is_limited_ad_tracking VARCHAR
, time_zone_offset_seconds BIGINT
, web_info events_20210131_type_8
);
CREATE TYPE events_20210131_type_9 AS (
continent VARCHAR
, sub_continent VARCHAR
, country VARCHAR
, region VARCHAR
, city VARCHAR
, metro VARCHAR
);
CREATE TYPE events_20210131_type_10 AS (
id VARCHAR
, version VARCHAR
, install_store VARCHAR
, firebase_app_id VARCHAR
, install_source VARCHAR
);
CREATE TYPE events_20210131_type_11 AS (
medium VARCHAR
, name VARCHAR
, source VARCHAR
);
CREATE TYPE events_20210131_type_12 AS (
hostname VARCHAR
);
CREATE TYPE events_20210131_type_13 AS (
total_item_quantity BIGINT
, purchase_revenue_in_usd DOUBLE PRECISION
, purchase_revenue DOUBLE PRECISION
, refund_value_in_usd DOUBLE PRECISION
, refund_value DOUBLE PRECISION
, shipping_value_in_usd DOUBLE PRECISION
, shipping_value DOUBLE PRECISION
, tax_value_in_usd DOUBLE PRECISION
, tax_value DOUBLE PRECISION
, unique_items BIGINT
, transaction_id VARCHAR
);
CREATE TYPE events_20210131_type_14 AS (
item_id VARCHAR
, item_name VARCHAR
, item_brand VARCHAR
, item_variant VARCHAR
, item_category VARCHAR
, item_category2 VARCHAR
, item_category3 VARCHAR
, item_category4 VARCHAR
, item_category5 VARCHAR
, price_in_usd DOUBLE PRECISION
, price DOUBLE PRECISION
, quantity BIGINT
, item_revenue_in_usd DOUBLE PRECISION
, item_revenue DOUBLE PRECISION
, item_refund_in_usd DOUBLE PRECISION
, item_refund DOUBLE PRECISION
, coupon VARCHAR
, affiliation VARCHAR
, location_id VARCHAR
, item_list_id VARCHAR
, item_list_name VARCHAR
, item_list_index VARCHAR
, promotion_id VARCHAR
, promotion_name VARCHAR
, creative_name VARCHAR
, creative_slot VARCHAR
);
CREATE TABLE events_20210131 (
event_date VARCHAR DEFAULT NULL
, event_timestamp BIGINT DEFAULT NULL
, event_name VARCHAR DEFAULT NULL
, event_params events_20210131_type_1[] NOT NULL
, event_previous_timestamp BIGINT DEFAULT NULL
, event_value_in_usd DOUBLE PRECISION DEFAULT NULL
, event_bundle_sequence_id BIGINT DEFAULT NULL
, event_server_timestamp_offset BIGINT DEFAULT NULL
, user_id VARCHAR DEFAULT NULL
, user_pseudo_id VARCHAR DEFAULT NULL
, privacy_info events_20210131_type_3 DEFAULT NULL
, user_properties events_20210131_type_4[] NOT NULL
, user_first_touch_timestamp BIGINT DEFAULT NULL
, user_ltv events_20210131_type_6 DEFAULT NULL
, device events_20210131_type_7 DEFAULT NULL
, geo events_20210131_type_9 DEFAULT NULL
, app_info events_20210131_type_10 DEFAULT NULL
, traffic_source events_20210131_type_11 DEFAULT NULL
, stream_id BIGINT DEFAULT NULL
, platform VARCHAR DEFAULT NULL
, event_dimensions events_20210131_type_12 DEFAULT NULL
, ecommerce events_20210131_type_13 DEFAULT NULL
, items events_20210131_type_14[] NOT NULL
);
INSERT INTO events_20210131 (
event_date
, event_timestamp
, event_name
, event_params
, event_previous_timestamp
, event_value_in_usd
, event_bundle_sequence_id
, event_server_timestamp_offset
, user_id
, user_pseudo_id
, privacy_info
, user_properties
, user_first_touch_timestamp
, user_ltv
, device
, geo
, app_info
, traffic_source
, stream_id
, platform
, event_dimensions
, ecommerce
, items
)
VALUES (
'20210131'
, 1612069510766593
, 'page_view'
, ARRAY[ROW('gclid',ROW(NULL,NULL,NULL,NULL)),ROW('gclsrc',ROW(NULL,NULL,NULL,NULL)),ROW('debug_mode',ROW(NULL,1,NULL,NULL)),ROW('ga_session_number',ROW(NULL,1,NULL,NULL)),ROW('all_data',ROW(NULL,NULL,NULL,NULL)),ROW('page_location',ROW('https://shop.googlemerchandisestore.com/',NULL,NULL,NULL)),ROW('entrances',ROW(NULL,1,NULL,NULL)),ROW('session_engaged',ROW('0',NULL,NULL,NULL)),ROW('ga_session_id',ROW(NULL,661084800,NULL,NULL)),ROW('clean_event',ROW('gtm.js',NULL,NULL,NULL)),ROW('engaged_session_event',ROW(NULL,1,NULL,NULL)),ROW('page_title',ROW('Home',NULL,NULL,NULL))]::events_20210131_type_1[]
, NULL
, NULL
, 6595101026
, NULL
, NULL
, '1026454.4271112504'
, ROW(NULL,NULL,'No')
, ARRAY[]::events_20210131_type_4[]
, 1612069510766593
, ROW(0.0,'USD')
, ROW('mobile','Apple','iPhone','<Other>',NULL,'Web','<Other>',NULL,NULL,'en-us','No',NULL,ROW('Safari','13.1'))
, ROW('Americas','Northern America','United States','California','San Carlos','(not set)')
, NULL
, ROW('organic','(organic)','google')
, 2100450278
, 'WEB'
, NULL
, ROW(NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
, ARRAY[]::events_20210131_type_14[]
);
(中略)
COMMIT;
生成されたSQL文をPostgreSQLで動かしてimportする
SQL文の内容を確認し、必要に応じてSQL文や上記のPythonコードを変更すると良いでしょう。
その後、SQL文をPostgreSQLで実行してimportします。
以下の画像は、importされたテーブルをDataGripで確認しているところです。
BigQueryとPostgreSQLで同じSELECT文を実行して、同じ結果が返ることをざっくり確認する
本記事でPostgreSQLにimportしたのは、GA4のeコマースサンプルデータセットです。
UNNEST関数を使ったクエリーを2つ試してみます。
FROM句のテーブル名だけ、BigQuery用とPostgreSQL用で修正する必要があります。
クエリーその1
WITH
foo AS (
SELECT
event_date,
event_timestamp,
event_name,
event_params
FROM
public.events_20210131
WHERE
event_timestamp IN (1612125113370459, 1612125041889438)
)
SELECT
event_date,
event_timestamp,
event_name,
event_param.key,
(event_param.value).string_value,
(event_param.value).int_value,
(event_param.value).float_value,
(event_param.value).double_value
FROM
foo
CROSS JOIN
UNNEST(event_params) AS event_param
ORDER BY
event_timestamp,
event_name,
key
BigQueryの結果:
PostgreSQLの結果:
両方とも21件、同じデータが返りました。
クエリーその2
SELECT
event_date,
event_timestamp,
event_name,
(SELECT
(value).int_value
FROM
UNNEST(event_params)
WHERE
key = 'ga_session_id'
) AS ga_session_id,
(SELECT
(value).string_value
FROM
UNNEST(event_params)
WHERE
key = 'page_title'
) AS page_title
FROM
public.events_20210131
WHERE
event_timestamp IN (1612125113370459, 1612125041889438)
ORDER BY
event_timestamp,
event_name
BigQueryの結果:
PostgreSQLの結果:
両方とも2件、同じデータが返りました。
2つのクエリーで、BigQueryとPostgreSQLが同じ結果を返すことを、ざっくり確認できました。
続編(予告)
本記事のPythonコードは、GA4のテーブルコピー専用ではなく、汎用を目指したものです。
次回はGA4のテーブル専用のコードを書いて、PostgreSQL側でレンジパーティションを使用することを考えています。
追記(2022-12-31)
予告した記事を投稿しました。
GA4のテーブルをBigQueryからPostgreSQLにAvroファイル経由でコピーする