0
1

More than 1 year has passed since last update.

BigQueryからPostgreSQLにAvroファイル経由でテーブルをコピーする

Last updated at Posted at 2022-12-28

以下の手順でGoogle BigQueryからPostgreSQLにテーブルをコピーします。

  • BigQueryからテーブルのスキーマとデータをAvro形式でexportする
  • 本記事に記載のPythonコードにAvroファイルを渡して、PostgreSQLへのimport用SQL文を出力させる
  • そのSQL文をPostgreSQLで動かしてimportする

その後、BigQueryとPostgreSQLで同じSELECT文を実行して、同じ結果が返ることをざっくり確認します。


※ 場合により、テーブルの完全なコピーにならないこともありますので、ご了承ください。
  ご利用は自己責任でお願いします。

BigQueryからテーブルのスキーマとデータをAvro形式でexportする

BigQueryの「エクスポート」メニューから「GCSにエクスポート」を選択します。

image.png

「GCSのロケーション」を設定し、エクスポート形式は「Avro」を選択して保存ボタンを押します。

image.png

GCSからローカルのマシンにAvroファイルをダウンロードします。

image.png

PostgreSQLへのimport用SQL文を生成する

コマンドライン引数にAvroファイル名を渡して、以下のPythonコードを動かします。
Avroファイル名から拡張子を除いた前半部分が、PostgreSQLのテーブル名として使用されます。
出力されるSQLファイル名は、そのテーブル名に拡張子sqlをつけたものになります。

bq_avro_to_postgres_db.py

import sys
import traceback
import json
from pathlib import Path
import fastavro
from collections import deque
from psycopg2.extensions import adapt


class MyException(Exception):
    pass


class MyReader(fastavro.reader):
    def __init__(self, file):
        super().__init__(file)
        self.meta = self._header["meta"]


def escape(ss):
    return str(adapt(ss.encode("utf-8").decode("latin-1")))


def convert_to_postgres_type(avro_type, default_value, ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type):
    postgres_type = None
    default_str = None
    nullable = False

    if default_value is None:
        default_str = "NULL"

    if type(avro_type) is list:
        if "null" in avro_type:
            nullable = True
            avro_type.remove("null")
        assert len(avro_type) == 1
        avro_type = avro_type[0]

    if type(avro_type) is str:
        if avro_type == "string":
            postgres_type = "VARCHAR"
            if default_value is not None:
                default_str = escape(default_value)
        elif avro_type == "bytes":
            postgres_type = "BYTEA"
            if default_value is not None:
                pass
        elif avro_type == "long":
            postgres_type = "BIGINT"
            if default_value is not None:
                default_str = str(default_value)
        elif avro_type == "double":
            postgres_type = "DOUBLE PRECISION"
            if default_value is not None:
                default_str = str(default_value)
        elif avro_type == "boolean":
            postgres_type = "BOOLEAN"
            if default_value is not None:
                if default_value:
                    default_str = "TRUE"
                else:
                    default_str = "FALSE"
    elif type(avro_type) is dict:
        if "sqlType" in avro_type:
            sql_type = avro_type["sqlType"]
            if sql_type == "JSON":
                postgres_type = "JSON"
                if default_value is not None:
                    default_str = escape(default_value)
        elif "logicalType" in avro_type:
            logical_type = avro_type["logicalType"]
            if logical_type == "decimal":
                postgres_type = "NUMERIC"
                if default_value is not None:
                    default_str = str(default_value)
            elif logical_type in ("timestamp-millis", "timestamp-micros"):
                postgres_type = "TIMESTAMP WITH TIME ZONE"
                if default_value is not None:
                    default_str = escape(default_value.isoformat())
            elif logical_type == "date":
                postgres_type = "DATE"
                if default_value is not None:
                    default_str = escape(default_value.isoformat())
            elif logical_type in ("time-millis", "time-micros"):
                postgres_type = "TIME WITH TIME ZONE"
                if default_value is not None:
                    default_str = escape(default_value.isoformat())
            elif logical_type in ("local-timestamp-millis", "local-timestamp-micros"):
                postgres_type = "TIMESTAMP"
                if default_value is not None:
                    default_str = escape(default_value.isoformat())
        else:
            avro_child_type = avro_type["type"]
            if avro_child_type == "array":
                default_value = None
                postgres_items_type, _, _ = convert_to_postgres_type(
                    avro_type["items"], default_value, ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type)
                if type(postgres_items_type) is str:
                    postgres_type = {
                        "array": postgres_items_type + "[]",
                        "items": postgres_items_type
                    }
                else:
                    postgres_type = {
                        "array": postgres_items_type["record"] + "[]",
                        "items": postgres_items_type
                    }
            elif avro_child_type == "record":
                len_is_serial_of_postgres_record_type.append(True)
                postgres_record_type = f"{postgres_record_type_prefix}_type_{len(len_is_serial_of_postgres_record_type)}"
                postgres_fields_type = make_sql_create_type(
                    postgres_record_type, avro_type["fields"], ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type)
                postgres_type = {
                    "record": postgres_record_type,
                    "fields": postgres_fields_type
                }
    return postgres_type, default_str, nullable


def make_sql_create_type(name, avro_fields, ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type):
    ddl_stmt = ""
    postgres_fields_type = []
    for field in avro_fields:
        default_value = None
        postgres_type, _, _ = convert_to_postgres_type(
            field["type"], default_value, ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type)
        postgres_fields_type.append(postgres_type)
        if len(ddl_stmt) == 0:
            ddl_stmt = f"CREATE TYPE {name} AS (\n    "
        else:
            ddl_stmt += "\n  , "
        ddl_stmt += field["name"] + ' '
        if type(postgres_type) is str:
            ddl_stmt += postgres_type
        elif "record" in postgres_type:
            ddl_stmt += postgres_type["record"]
        else:
            ddl_stmt += postgres_type["array"]
    if 0 < len(ddl_stmt):
        ddl_stmt += "\n);\n"
        ddl_queue.append(ddl_stmt)
    return postgres_fields_type


def make_sql_create_table(name, avro_fields, ddl_queue, postgres_record_type_prefix):
    ddl_stmt = ""
    postgres_type_list = []
    len_is_serial_of_postgres_record_type = []
    for field in avro_fields:
        if "default" in field:
            default_value = field["default"]
        else:
            default_value = None
        postgres_type, default_str, nullable = convert_to_postgres_type(
            field["type"], default_value, ddl_queue, postgres_record_type_prefix, len_is_serial_of_postgres_record_type)
        postgres_type_list.append(postgres_type)
        if len(ddl_stmt) == 0:
            ddl_stmt = f"CREATE TABLE {name} (\n    "
        else:
            ddl_stmt += "\n  , "
        ddl_stmt += field["name"] + ' '
        if type(postgres_type) is str:
            ddl_stmt += postgres_type
        elif "record" in postgres_type:
            ddl_stmt += postgres_type["record"]
        else:
            ddl_stmt += postgres_type["array"]
        if not nullable:
            ddl_stmt += " NOT NULL"
        if "default" in field:
            ddl_stmt += f" DEFAULT {default_str}"
    if 0 < len(ddl_stmt):
        ddl_stmt += "\n);\n"
        ddl_queue.append(ddl_stmt)
    return postgres_type_list


def convert_to_postgres_value(avro_type, postgres_type, avro_value, null_if_convert_error):
    if avro_value is None:
        return "NULL"

    try:
        value_str = ""

        if type(avro_type) is list:
            if "null" in avro_type:
                avro_type.remove("null")
            assert len(avro_type) == 1
            avro_type = avro_type[0]

        if type(avro_type) is str:
            if avro_type == "string":
                value_str = escape(avro_value)
            elif avro_type == "long":
                value_str = str(avro_value)
            elif avro_type == "double":
                value_str = str(avro_value)
            elif avro_type == "boolean":
                if avro_value:
                    value_str = "TRUE"
                else:
                    value_str = "FALSE"
        elif type(avro_type) is dict:
            if "sqlType" in avro_type:
                sql_type = avro_type["sqlType"]
                if sql_type == "JSON":
                    value_str = escape(avro_value)
            elif "logicalType" in avro_type:
                logical_type = avro_type["logicalType"]
                if logical_type == "decimal":
                    value_str = str(avro_value)
                elif logical_type in ("timestamp-millis", "timestamp-micros"):
                    value_str = escape(avro_value.isoformat())
                elif logical_type == "date":
                    value_str = escape(avro_value.isoformat())
                elif logical_type in ("time-millis", "time-micros"):
                    value_str = escape(avro_value.isoformat())
                elif logical_type in ("local-timestamp-millis", "local-timestamp-micros"):
                    value_str = escape(avro_value.isoformat())
            else:
                avro_child_type = avro_type["type"]
                if avro_child_type == "array":
                    avro_items_type = avro_type["items"]
                    postgres_items_type = postgres_type["items"]
                    postgres_value_list = []
                    for val in avro_value:
                        postgres_value_list.append(convert_to_postgres_value(
                            avro_items_type, postgres_items_type, val, null_if_convert_error))
                    value_str = f'ARRAY[{",".join(postgres_value_list)}]::{postgres_type["array"]}'
                elif avro_child_type == "record":
                    avro_fields = avro_type["fields"]
                    postgres_fields_type = postgres_type["fields"]
                    postgres_value_list = []
                    for avro_field, postgres_field_type in zip(avro_fields, postgres_fields_type):
                        postgres_value_list.append(convert_to_postgres_value(
                            avro_field["type"], postgres_field_type, avro_value[avro_field["name"]], null_if_convert_error))
                    value_str = f'ROW({",".join(postgres_value_list)})'

        if len(value_str) == 0:
            raise MyException(f"Convert Error: type={avro_type}, value={avro_value}")
    except Exception as e:
        if null_if_convert_error:
            return "NULL"
        else:
            raise e
    else:
        return value_str


def make_sql_insert(tablename, schema, postgres_type_list, rec, null_if_convert_error=False):
    insert_into = []
    insert_values = []
    fields = schema["fields"]
    for field, postgres_type in zip(fields, postgres_type_list):
        field_name = field["name"]
        insert_into.append(field_name)
        insert_values.append(convert_to_postgres_value(field["type"], postgres_type, rec[field_name], null_if_convert_error))
    insert_into_str = "\n  , ".join(insert_into)
    insert_values_str = "\n  , ".join(insert_values)
    insert_stmt = f"INSERT INTO {tablename} (\n    {insert_into_str}\n)\nVALUES (\n    {insert_values_str}\n);\n"
    return insert_stmt


def main():
    in_path = Path(sys.argv[1])
    out_dir = Path.cwd()    tablename = in_path.stem.lower()
    postgres_record_type_prefix = tablename
    out_path = out_dir / f"{tablename}.sql"

    with open(in_path, "rb") as fi, open(out_path, "wt", encoding="utf-8") as fo:
        reader = MyReader(fi)
        schema = json.loads(reader.meta["avro.schema"].decode("utf-8"))

        # トランザクション開始
        fo.write("BEGIN;\n")

        # DDL
        ddl_queue = deque()
        postgres_type_list = make_sql_create_table(tablename, schema["fields"], ddl_queue, postgres_record_type_prefix)
        for ddl_stmt in ddl_queue:
            fo.write(ddl_stmt)

        # INSERT文
        num = 0
        for rec in reader:
            num += 1
            insert_stmt = make_sql_insert(tablename, schema, postgres_type_list, rec, null_if_convert_error=False)
            fo.write(insert_stmt)
        print(f"{num}")

        # トランザクション終了
        fo.write("COMMIT;\n")


if __name__ == "__main__":
    try:
        main()
    except Exception:
        print(traceback.format_exc())

このPythonコードの起動例です。コマンドライン引数で渡せるAvroファイルは1つのみです。


python bq_avro_to_postgres_db.py events_20210131.avro

以下のように、DDLの後にINSERT文が入ったSQLファイルが出力されます。

events_20210131.sql

BEGIN;
CREATE TYPE events_20210131_type_2 AS (
    string_value VARCHAR
  , int_value BIGINT
  , float_value DOUBLE PRECISION
  , double_value DOUBLE PRECISION
);
CREATE TYPE events_20210131_type_1 AS (
    key VARCHAR
  , value events_20210131_type_2
);
CREATE TYPE events_20210131_type_3 AS (
    analytics_storage BIGINT
  , ads_storage BIGINT
  , uses_transient_token VARCHAR
);
CREATE TYPE events_20210131_type_5 AS (
    string_value BIGINT
  , int_value BIGINT
  , float_value BIGINT
  , double_value BIGINT
  , set_timestamp_micros BIGINT
);
CREATE TYPE events_20210131_type_4 AS (
    key BIGINT
  , value events_20210131_type_5
);
CREATE TYPE events_20210131_type_6 AS (
    revenue DOUBLE PRECISION
  , currency VARCHAR
);
CREATE TYPE events_20210131_type_8 AS (
    browser VARCHAR
  , browser_version VARCHAR
);
CREATE TYPE events_20210131_type_7 AS (
    category VARCHAR
  , mobile_brand_name VARCHAR
  , mobile_model_name VARCHAR
  , mobile_marketing_name VARCHAR
  , mobile_os_hardware_model BIGINT
  , operating_system VARCHAR
  , operating_system_version VARCHAR
  , vendor_id BIGINT
  , advertising_id BIGINT
  , language VARCHAR
  , is_limited_ad_tracking VARCHAR
  , time_zone_offset_seconds BIGINT
  , web_info events_20210131_type_8
);
CREATE TYPE events_20210131_type_9 AS (
    continent VARCHAR
  , sub_continent VARCHAR
  , country VARCHAR
  , region VARCHAR
  , city VARCHAR
  , metro VARCHAR
);
CREATE TYPE events_20210131_type_10 AS (
    id VARCHAR
  , version VARCHAR
  , install_store VARCHAR
  , firebase_app_id VARCHAR
  , install_source VARCHAR
);
CREATE TYPE events_20210131_type_11 AS (
    medium VARCHAR
  , name VARCHAR
  , source VARCHAR
);
CREATE TYPE events_20210131_type_12 AS (
    hostname VARCHAR
);
CREATE TYPE events_20210131_type_13 AS (
    total_item_quantity BIGINT
  , purchase_revenue_in_usd DOUBLE PRECISION
  , purchase_revenue DOUBLE PRECISION
  , refund_value_in_usd DOUBLE PRECISION
  , refund_value DOUBLE PRECISION
  , shipping_value_in_usd DOUBLE PRECISION
  , shipping_value DOUBLE PRECISION
  , tax_value_in_usd DOUBLE PRECISION
  , tax_value DOUBLE PRECISION
  , unique_items BIGINT
  , transaction_id VARCHAR
);
CREATE TYPE events_20210131_type_14 AS (
    item_id VARCHAR
  , item_name VARCHAR
  , item_brand VARCHAR
  , item_variant VARCHAR
  , item_category VARCHAR
  , item_category2 VARCHAR
  , item_category3 VARCHAR
  , item_category4 VARCHAR
  , item_category5 VARCHAR
  , price_in_usd DOUBLE PRECISION
  , price DOUBLE PRECISION
  , quantity BIGINT
  , item_revenue_in_usd DOUBLE PRECISION
  , item_revenue DOUBLE PRECISION
  , item_refund_in_usd DOUBLE PRECISION
  , item_refund DOUBLE PRECISION
  , coupon VARCHAR
  , affiliation VARCHAR
  , location_id VARCHAR
  , item_list_id VARCHAR
  , item_list_name VARCHAR
  , item_list_index VARCHAR
  , promotion_id VARCHAR
  , promotion_name VARCHAR
  , creative_name VARCHAR
  , creative_slot VARCHAR
);
CREATE TABLE events_20210131 (
    event_date VARCHAR DEFAULT NULL
  , event_timestamp BIGINT DEFAULT NULL
  , event_name VARCHAR DEFAULT NULL
  , event_params events_20210131_type_1[] NOT NULL
  , event_previous_timestamp BIGINT DEFAULT NULL
  , event_value_in_usd DOUBLE PRECISION DEFAULT NULL
  , event_bundle_sequence_id BIGINT DEFAULT NULL
  , event_server_timestamp_offset BIGINT DEFAULT NULL
  , user_id VARCHAR DEFAULT NULL
  , user_pseudo_id VARCHAR DEFAULT NULL
  , privacy_info events_20210131_type_3 DEFAULT NULL
  , user_properties events_20210131_type_4[] NOT NULL
  , user_first_touch_timestamp BIGINT DEFAULT NULL
  , user_ltv events_20210131_type_6 DEFAULT NULL
  , device events_20210131_type_7 DEFAULT NULL
  , geo events_20210131_type_9 DEFAULT NULL
  , app_info events_20210131_type_10 DEFAULT NULL
  , traffic_source events_20210131_type_11 DEFAULT NULL
  , stream_id BIGINT DEFAULT NULL
  , platform VARCHAR DEFAULT NULL
  , event_dimensions events_20210131_type_12 DEFAULT NULL
  , ecommerce events_20210131_type_13 DEFAULT NULL
  , items events_20210131_type_14[] NOT NULL
);
INSERT INTO events_20210131 (
    event_date
  , event_timestamp
  , event_name
  , event_params
  , event_previous_timestamp
  , event_value_in_usd
  , event_bundle_sequence_id
  , event_server_timestamp_offset
  , user_id
  , user_pseudo_id
  , privacy_info
  , user_properties
  , user_first_touch_timestamp
  , user_ltv
  , device
  , geo
  , app_info
  , traffic_source
  , stream_id
  , platform
  , event_dimensions
  , ecommerce
  , items
)
VALUES (
    '20210131'
  , 1612069510766593
  , 'page_view'
  , ARRAY[ROW('gclid',ROW(NULL,NULL,NULL,NULL)),ROW('gclsrc',ROW(NULL,NULL,NULL,NULL)),ROW('debug_mode',ROW(NULL,1,NULL,NULL)),ROW('ga_session_number',ROW(NULL,1,NULL,NULL)),ROW('all_data',ROW(NULL,NULL,NULL,NULL)),ROW('page_location',ROW('https://shop.googlemerchandisestore.com/',NULL,NULL,NULL)),ROW('entrances',ROW(NULL,1,NULL,NULL)),ROW('session_engaged',ROW('0',NULL,NULL,NULL)),ROW('ga_session_id',ROW(NULL,661084800,NULL,NULL)),ROW('clean_event',ROW('gtm.js',NULL,NULL,NULL)),ROW('engaged_session_event',ROW(NULL,1,NULL,NULL)),ROW('page_title',ROW('Home',NULL,NULL,NULL))]::events_20210131_type_1[]
  , NULL
  , NULL
  , 6595101026
  , NULL
  , NULL
  , '1026454.4271112504'
  , ROW(NULL,NULL,'No')
  , ARRAY[]::events_20210131_type_4[]
  , 1612069510766593
  , ROW(0.0,'USD')
  , ROW('mobile','Apple','iPhone','<Other>',NULL,'Web','<Other>',NULL,NULL,'en-us','No',NULL,ROW('Safari','13.1'))
  , ROW('Americas','Northern America','United States','California','San Carlos','(not set)')
  , NULL
  , ROW('organic','(organic)','google')
  , 2100450278
  , 'WEB'
  , NULL
  , ROW(NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
  , ARRAY[]::events_20210131_type_14[]
);

(中略)

COMMIT;

生成されたSQL文をPostgreSQLで動かしてimportする

SQL文の内容を確認し、必要に応じてSQL文や上記のPythonコードを変更すると良いでしょう。
その後、SQL文をPostgreSQLで実行してimportします。

以下の画像は、importされたテーブルをDataGripで確認しているところです。

image.png

BigQueryとPostgreSQLで同じSELECT文を実行して、同じ結果が返ることをざっくり確認する

本記事でPostgreSQLにimportしたのは、GA4のeコマースサンプルデータセットです。
UNNEST関数を使ったクエリーを2つ試してみます。
FROM句のテーブル名だけ、BigQuery用とPostgreSQL用で修正する必要があります。

クエリーその1


WITH
    foo AS (
        SELECT
            event_date,
            event_timestamp,
            event_name,
            event_params
        FROM
            public.events_20210131
        WHERE
            event_timestamp IN (1612125113370459, 1612125041889438)
    )
SELECT
    event_date,
    event_timestamp,
    event_name,
    event_param.key,
    (event_param.value).string_value,
    (event_param.value).int_value,
    (event_param.value).float_value,
    (event_param.value).double_value
FROM
    foo
CROSS JOIN
    UNNEST(event_params) AS event_param
ORDER BY
    event_timestamp,
    event_name,
    key

BigQueryの結果:

image.png

PostgreSQLの結果:

image.png

両方とも21件、同じデータが返りました。

クエリーその2


SELECT
    event_date,
    event_timestamp,
    event_name,
    (SELECT
            (value).int_value
        FROM
            UNNEST(event_params)
        WHERE
            key = 'ga_session_id'
        ) AS ga_session_id,
    (SELECT
            (value).string_value
        FROM
            UNNEST(event_params)
        WHERE
            key = 'page_title'
        ) AS page_title
FROM
    public.events_20210131
WHERE
    event_timestamp IN (1612125113370459, 1612125041889438)
ORDER BY
    event_timestamp,
    event_name

BigQueryの結果:

image.png

PostgreSQLの結果:

image.png

両方とも2件、同じデータが返りました。

2つのクエリーで、BigQueryとPostgreSQLが同じ結果を返すことを、ざっくり確認できました。

続編(予告)

本記事のPythonコードは、GA4のテーブルコピー専用ではなく、汎用を目指したものです。
次回はGA4のテーブル専用のコードを書いて、PostgreSQL側でレンジパーティションを使用することを考えています。

追記(2022-12-31)

予告した記事を投稿しました。
GA4のテーブルをBigQueryからPostgreSQLにAvroファイル経由でコピーする

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1