0
0

More than 1 year has passed since last update.

BigQueryやPythonからAvroファイルに出力しようとしてDATETIME型でつまづいた

Last updated at Posted at 2022-12-10

10月からGoogle BigQueryを使い始めました。
データのimport/exportに、Apache Avro形式のファイルを使ってみましたので、その扱いについて記事にします。
Avro形式は、データだけでなくテーブルのスキーマ情報も持つので、テーブル作成のデータソースとしてjsonやcsvより便利です。
また、バイナリファイルなのでjsonやcsvよりも高速に処理できるそうです。

参考URL(感謝します)

AvroファイルでDATETIME型を扱えるか?

BigQueryのドキュメントによれば、


BigQueryのDATETIME型 == Avroのlocal-timestamp-millis型
 または
BigQueryのDATETIME型 == Avroのlocal-timestamp-micros型

だそうです。

また、Avroの仕様には、 Local timestamp (millisecond precision) 型と Local timestamp (microsecond precision) 型が定義されています。

これらを見るとAvroファイルでDATETIME型が使えそうですが、実際に試してみてそれなりにハマったので、私が試した範囲内で整理して記事にします。

また、Python等でAvroファイルを作り、それをデータソースにしてBigQueryでテーブルを作るという作業はしばしば発生します。
そのため、Avroファイルを操作するPythonコードを残しておく意味でも記事にしました。

先に結果表示:AvroファイルDATETIME型への対応状況

本記事執筆時点で、以下の結果となりました。

Pythonパッケージの対応状況

(DATETIME型) read write
fastavro
Apache Python avro package 整数型で読まれる 整数型で渡す必要がある

fastavro、いいですね。

Google BigQueryの対応状況

(DATETIME型) import export
Google BigQuery string型で出力される

先に結果表示:AvroファイルTIMESTAMP型への対応状況

Pythonパッケージの対応状況

(TIMESTAMP型) read write
fastavro
Apache Python avro package

Google BigQueryの対応状況

(TIMESTAMP型) import export
Google BigQuery

TIMESTAMP型への対応は、すべて問題ありませんでした。

結果を先に書いてしまいましたが、以下私が試した手順&Pythonコードを記します。
最初にfastavroを調べます。

fastavro

インストールしたPythonパッケージ:


fastavro==1.7.0

fastavroでAvroファイルをwrite

以下のコードを動かします。

fastavro_writer.py

import traceback
import fastavro
import datetime
from pathlib import Path


def main():
    schema = {
        "type": "record",
        "name": "MyType",
        "fields": [
            {
                "name": "datetime_col",
                "type": {
                    "type": "long",
                    "logicalType": "local-timestamp-millis"
                },
                "doc": "DATETIME type column"
            },
            {
                "name": "timestamp_col",
                "type": {
                    "type": "long",
                    "logicalType": "timestamp-millis"
                },
                "doc": "TIMESTAMP type column"
            }
        ],
        "doc": "Types containing DATETIME and TIMESTAMP"
    }

    out_dir = Path.cwd()
    out_path = out_dir / "by_fastavro.avro"

    # データ作成
    a_datetime = datetime.datetime(2022, 1, 2, 3, 4, 5)
    # a_timestamp = datetime.datetime(2022, 6, 7, 8, 9, 10, tzinfo=datetime.timezone.utc)
    a_timestamp = datetime.datetime(2022, 6, 7, 8, 9, 10)

    # avroファイルwrite
    with open(out_path, "wb") as fo:
        writer = fastavro.write.Writer(fo, schema)
        writer.write({
            "datetime_col": a_datetime,
            "timestamp_col": a_timestamp
        })
        writer.flush()


if __name__ == "__main__":
    try:
        main()
    except Exception:
        print(traceback.format_exc())

Avroファイル「by_fastavro.avro」が生成されました。
次の節で、このAvroファイルが正常に作られているか、中身を確認します。

fastavroでAvroファイルをread

以下のコードを動かして、前の節で生成されたAvroファイル「by_fastavro.avro」をreadします。

fastavro_reader.py

import traceback
import json
import pprint
from pathlib import Path
import fastavro


class MyReader(fastavro.reader):
    def __init__(self, file):
        super().__init__(file)
        self.meta = self._header["meta"]


def main():
    in_dir = Path.cwd()
    in_path = in_dir / "by_fastavro.avro"

    # avroファイルread
    data = []
    with open(in_path, "rb") as fi:
        reader = MyReader(fi)
        schema = reader.meta["avro.schema"].decode("utf-8")
        for rec in reader:
            data.append(rec)

    print("---- data ----")
    pprint.pprint(data)
    print("---- schema ----")
    print(json.dumps(json.loads(schema), indent=4))


if __name__ == "__main__":
    try:
        main()
    except Exception:
        print(traceback.format_exc())

以下のように表示され、write・read共にDATETIME型が正常に処理された様子です。


---- data ----
[{'datetime_col': datetime.datetime(2022, 1, 2, 3, 4, 5),
  'timestamp_col': datetime.datetime(2022, 6, 7, 8, 9, 10, tzinfo=datetime.timezone.utc)}]
---- schema ----
{
    "type": "record",
    "name": "MyType",
    "fields": [
        {
            "name": "datetime_col",
            "type": {
                "type": "long",
                "logicalType": "local-timestamp-millis"
            },
            "doc": "DATETIME type column"
        },
        {
            "name": "timestamp_col",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            },
            "doc": "TIMESTAMP type column"
        }
    ],
    "doc": "Types containing DATETIME and TIMESTAMP"
}

fastavroのDATETIME型の対応状況

本記事執筆時点で、fastavroのAvroファイルDATETIME型への対応状況は以下の結果となりました。

read write
fastavro

次に、Apache Python avro packageを調べます。

Apache Python avro package

インストールしたPythonパッケージ:


avro==1.11.1
dataclasses-avroschema==0.34.2

Apache Python avro packageでAvroファイルをwrite:DATETIME型

DATETIME型では、結果的に3パターンのコードを書き、3パターン目でAvroファイルの出力に成功しました。
まずは以下のパターン1のコードを動かします。
参考URL: Dataclasses Avro Schema Generator / Logical Types

avro_datetime_writer_1.py

import traceback
from pathlib import Path
import datetime
import json
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import dataclasses
from dataclasses_avroschema import AvroModel


@dataclasses.dataclass
class MyType(AvroModel):
    "Types containing DATETIME"
    datetime_col: datetime.datetime


def main():
    # スキーマ表示
    schema_py = MyType.avro_schema_to_python()
    print("---- schema ----")
    print(json.dumps(schema_py, indent=4))

    out_dir = Path.cwd()
    out_path = out_dir / "by_apache_avro_datetime_1.avro"

    a_datetime = datetime.datetime(2022, 1, 2, 3, 4, 5)

    # avroファイルwrite
    schema = MyType.avro_schema()
    with open(out_path, "wb") as fo:
        with DataFileWriter(fo, DatumWriter(), avro.schema.parse(schema)) as writer:
            writer.append({
                "datetime_col": a_datetime
            })


if __name__ == "__main__":
    try:
        main()
    except Exception:
        print(traceback.format_exc())

パターン1の実行結果は以下の通りです。
logicalTypeがtimestamp-millis、つまりTIMESTAMP型になってしまい、データ挿入もコケました。
このコードではダメなようです。


---- schema ----
{
    "type": "record",
    "name": "MyType",
    "fields": [
        {
            "name": "datetime_col",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
        }
    ],
    "doc": "Types containing DATETIME"
}
Traceback (most recent call last):
  File "F:\PycharmProjects\sample_project\avro_datetime_writer_1.py", line 40, in <module>
    main()
  File "F:\PycharmProjects\sample_project\avro_datetime_writer_1.py", line 33, in main
    writer.append({
  File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\datafile.py", line 259, in append
    self.datum_writer.write(datum, self.buffer_encoder)
  File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\io.py", line 1013, in write
    validate(self.writers_schema, datum, raise_on_error=True)
  File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\io.py", line 147, in validate
    raise avro.errors.AvroTypeException(current_node.schema, current_node.name, current_node.datum)
avro.errors.AvroTypeException: The datum "2022-01-02 03:04:05" provided for "datetime_col" is not an example of the schema {
  "type": "long",
  "logicalType": "timestamp-millis"
}

パターン1から改善したパターン2のコード。

avro_datetime_writer_2.py

import traceback
from pathlib import Path
import datetime
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter


SCHEMA = '''\
{
    "type": "record",
    "name": "MyType",
    "fields": [
        {
            "name": "datetime_col",
            "type": {
                "type": "long",
                "logicalType": "local-timestamp-millis"
            },
            "doc": "DATETIME type column"
        }
    ],
    "doc": "Types containing DATETIME"
}
'''


def main():
    out_dir = Path.cwd()
    out_path = out_dir / "by_apache_avro_datetime_2.avro"

    a_datetime = datetime.datetime(2022, 1, 2, 3, 4, 5)

    # avroファイルwrite
    with open(out_path, "wb") as fo:
        with DataFileWriter(fo, DatumWriter(), avro.schema.parse(SCHEMA)) as writer:
            writer.append({
                "datetime_col": a_datetime
            })


if __name__ == "__main__":
    try:
        main()
    except Exception:
        print(traceback.format_exc())

パターン2の実行結果は以下の通りです。
local-timestamp-millis型、つまりDATETIME型がUnknownとwarningsが表示され、その後データ挿入がコケました。
このコードもダメなようです。


C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\schema.py:1099: IgnoredLogicalType: Unknown local-timestamp-millis, using long.
  warnings.warn(avro.errors.IgnoredLogicalType(f"Unknown {logical_type}, using {type_}."))
Traceback (most recent call last):
  File "F:\PycharmProjects\sample_project\avro_datetime_writer_2.py", line 44, in <module>
    main()
  File "F:\PycharmProjects\sample_project\avro_datetime_writer_2.py", line 37, in main
    writer.append({
  File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\datafile.py", line 259, in append
    self.datum_writer.write(datum, self.buffer_encoder)
  File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\io.py", line 1013, in write
    validate(self.writers_schema, datum, raise_on_error=True)
  File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\io.py", line 147, in validate
    raise avro.errors.AvroTypeException(current_node.schema, current_node.name, current_node.datum)
avro.errors.AvroTypeException: The datum "2022-01-02 03:04:05" provided for "datetime_col" is not an example of the schema {
  "type": "long",
  "logicalType": "local-timestamp-millis"
}

さらに改善したパターン3のコード。
パターン2ではlogicalTypeが認識されなかったので、DATETIME型から整数型に変換してデータ挿入を試みます。
その整数型変数には、1970年1月1日からの経過時間を入れるのですが、logicalTypeがlocal-timestamp-millis型なので、経過ミリ秒を計算して入れます。

avro_datetime_writer_3.py

import traceback
from pathlib import Path
import datetime
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter


SCHEMA = '''\
{
    "type": "record",
    "name": "MyType",
    "fields": [
        {
            "name": "datetime_col",
            "type": {
                "type": "long",
                "logicalType": "local-timestamp-millis"
            },
            "doc": "DATETIME type column"
        }
    ],
    "doc": "Types containing DATETIME"
}
'''

DATETIME_ORIGIN = datetime.datetime(1970, 1, 1, 0, 0)


def main():
    out_dir = Path.cwd()
    out_path = out_dir / "by_apache_avro_datetime_3.avro"

    a_datetime = datetime.datetime(2022, 1, 2, 3, 4, 5)

    # avroファイルwrite
    with open(out_path, "wb") as fo:
        with DataFileWriter(fo, DatumWriter(), avro.schema.parse(SCHEMA)) as writer:
            writer.append({
                "datetime_col": int((a_datetime - DATETIME_ORIGIN).total_seconds() * 1000)
            })


if __name__ == "__main__":
    try:
        main()
    except Exception:
        print(traceback.format_exc())

パターン3の実行結果は以下の通りです。
warningsが表示されますが、Avroファイル「by_apache_avro_datetime_3.avro」が生成されました。


C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\schema.py:1099: IgnoredLogicalType: Unknown local-timestamp-millis, using long.
  warnings.warn(avro.errors.IgnoredLogicalType(f"Unknown {logical_type}, using {type_}."))

以下のコードを実行して、生成されたAvroファイルの中身を見てみます。


# fastavro_reader.pyのin_path変数を変更しただけのコード

import traceback
import json
import pprint
from pathlib import Path
import fastavro


class MyReader(fastavro.reader):
    def __init__(self, file):
        super().__init__(file)
        self.meta = self._header["meta"]


def main():
    in_dir = Path.cwd()
    in_path = in_dir / "by_apache_avro_datetime_3.avro"

    # avroファイルread
    data = []
    with open(in_path, "rb") as fi:
        reader = MyReader(fi)
        schema = reader.meta["avro.schema"].decode("utf-8")
        for rec in reader:
            data.append(rec)

    print("---- data ----")
    pprint.pprint(data)
    print("---- schema ----")
    print(json.dumps(json.loads(schema), indent=4))


if __name__ == "__main__":
    try:
        main()
    except Exception:
        print(traceback.format_exc())

以下のように表示され、パターン3のコードで正常なAvroファイルが生成されることが確認できました。


---- data ----
[{'datetime_col': datetime.datetime(2022, 1, 2, 3, 4, 5)}]
---- schema ----
{
    "type": "record",
    "name": "MyType",
    "fields": [
        {
            "type": {
                "type": "long",
                "logicalType": "local-timestamp-millis"
            },
            "name": "datetime_col",
            "doc": "DATETIME type column"
        }
    ],
    "doc": "Types containing DATETIME"
}

Apache Python avro packageでAvroファイルをwrite:TIMESTAMP型

TIMESTAMP型も確認してみます。
DATETIME型のパターン1のコードを以下のように書き換えました。
Apache Python avro packageでは、TIMESTAMP型に挿入するには、Pythonのdatetime型変数にタイムゾーンを設定する必要があるようです。

avro_timestamp_writer_1.py

import traceback
from pathlib import Path
import datetime
import json
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import dataclasses
from dataclasses_avroschema import AvroModel


@dataclasses.dataclass
class MyType(AvroModel):
    "Types containing TIMESTAMP"
    timestamp_col: datetime.datetime


def main():
    # スキーマ表示
    schema_py = MyType.avro_schema_to_python()
    print("---- schema ----")
    print(json.dumps(schema_py, indent=4))

    out_dir = Path.cwd()
    out_path = out_dir / "by_apache_avro_timestamp_1.avro"

    a_timestamp = datetime.datetime(2022, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc)

    # avroファイルwrite
    schema = MyType.avro_schema()
    with open(out_path, "wb") as fo:
        with DataFileWriter(fo, DatumWriter(), avro.schema.parse(schema)) as writer:
            writer.append({
                "timestamp_col": a_timestamp
            })


if __name__ == "__main__":
    try:
        main()
    except Exception:
        print(traceback.format_exc())

パターン1を実行すると以下のように表示されて、Avroファイル「by_apache_avro_timestamp_1.avro」が生成されました。


---- schema ----
{
    "type": "record",
    "name": "MyType",
    "fields": [
        {
            "name": "timestamp_col",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
        }
    ],
    "doc": "Types containing TIMESTAMP"
}

以下のコードを実行して、生成されたAvroファイルの中身を見てみます。


# fastavro_reader.pyのin_path変数を変更しただけのコード

import traceback
import json
import pprint
from pathlib import Path
import fastavro


class MyReader(fastavro.reader):
    def __init__(self, file):
        super().__init__(file)
        self.meta = self._header["meta"]


def main():
    in_dir = Path.cwd()
    in_path = in_dir / "by_apache_avro_timestamp_1.avro"

    # avroファイルread
    data = []
    with open(in_path, "rb") as fi:
        reader = MyReader(fi)
        schema = reader.meta["avro.schema"].decode("utf-8")
        for rec in reader:
            data.append(rec)

    print("---- data ----")
    pprint.pprint(data)
    print("---- schema ----")
    print(json.dumps(json.loads(schema), indent=4))


if __name__ == "__main__":
    try:
        main()
    except Exception:
        print(traceback.format_exc())

以下のように表示され、パターン1のコードだけで正常なAvroファイルが生成されることが確認できました。


---- data ----
[{'timestamp_col': datetime.datetime(2022, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc)}]
---- schema ----
{
    "type": "record",
    "name": "MyType",
    "fields": [
        {
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            },
            "name": "timestamp_col"
        }
    ],
    "doc": "Types containing TIMESTAMP"
}

Apache Python avro packageでAvroファイルをread

writeの次はreadを調べます。
以下のコードで、fastavro_writer.pyが生成したAvroファイル「by_fastavro.avro」を読んでみます。

avro_reader.py

import traceback
import json
import pprint
from pathlib import Path
import datetime
from avro.datafile import DataFileReader
from avro.io import DatumReader


DATETIME_ORIGIN = datetime.datetime(1970, 1, 1, 0, 0)


def main():
    in_dir = Path.cwd()
    in_path = in_dir / "by_fastavro.avro"

    # avroファイルread
    data = []
    with open(in_path, "rb") as fi:
        with DataFileReader(open(in_path, "rb"), DatumReader()) as reader:
            schema = reader.meta["avro.schema"].decode("utf-8")
            for rec in reader:
                data.append({
                    "datetime_col": DATETIME_ORIGIN + datetime.timedelta(milliseconds=rec["datetime_col"]),
                    "timestamp_col": rec["timestamp_col"]
                })

    print("---- data ----")
    pprint.pprint(data)
    print("---- schema ----")
    print(json.dumps(json.loads(schema), indent=4))


if __name__ == "__main__":
    try:
        main()
    except Exception:
        print(traceback.format_exc())

以下のように表示されました。
DATETIME型でwarningsが出て、整数型として読まれましたが、ソースコードでPythonのdatetime型に変換できています。


C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\schema.py:1099: IgnoredLogicalType: Unknown local-timestamp-millis, using long.
  warnings.warn(avro.errors.IgnoredLogicalType(f"Unknown {logical_type}, using {type_}."))
---- data ----
[{'datetime_col': datetime.datetime(2022, 1, 2, 3, 4, 5),
  'timestamp_col': datetime.datetime(2022, 6, 7, 8, 9, 10, tzinfo=<avro.timezones.UTCTzinfo object at 0x0000021FD350AB90>)}]
---- schema ----
{
    "type": "record",
    "name": "MyType",
    "fields": [
        {
            "name": "datetime_col",
            "type": {
                "type": "long",
                "logicalType": "local-timestamp-millis"
            },
            "doc": "DATETIME type column"
        },
        {
            "name": "timestamp_col",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            },
            "doc": "TIMESTAMP type column"
        }
    ],
    "doc": "Types containing DATETIME and TIMESTAMP"
}

Apache Python avro packageのDATETIME型の対応状況

本記事執筆時点で、Apache Python avro packageのAvroファイルDATETIME型への対応状況は以下の結果となりました。

read write
Apache Python avro package 整数型で読まれる 整数型で渡す必要がある

次に、Google BigQueryを調べます。

Google BigQuery

Avroファイルをソースにして(import)、BigQueryテーブルを作成

以下のbq loadコマンドを実行して、fastavro_writer.pyが生成したAvroファイル「by_fastavro.avro」からBigQueryテーブルを作成してみます。
参考URL: BigQueryの外部テーブルでAvroを扱った際にTIMESTAMPでハマった話


bq load \
    --replace \
    --source_format=AVRO \
    --use_avro_logical_types \
    example_dataset.by_fastavro \
    ./by_fastavro.avro

以下のように、DATETIME型のカラムが正しく作成されました。

image.png

image.png

importの次はexportを調べます。

BigQueryテーブルからAvroファイルにexport

以下のように、先ほど作成したBigQueryテーブルから、Avroファイル名「export.avro」にexportしました。

image.png

以下のコードで、Avroファイルの中身を調べてみます。


# fastavro_reader.pyのin_path変数を変更しただけのコード

import traceback
import json
import pprint
from pathlib import Path
import fastavro


class MyReader(fastavro.reader):
    def __init__(self, file):
        super().__init__(file)
        self.meta = self._header["meta"]


def main():
    in_dir = Path.cwd()
    in_path = in_dir / "export.avro"

    # avroファイルread
    data = []
    with open(in_path, "rb") as fi:
        reader = MyReader(fi)
        schema = reader.meta["avro.schema"].decode("utf-8")
        for rec in reader:
            data.append(rec)

    print("---- data ----")
    pprint.pprint(data)
    print("---- schema ----")
    print(json.dumps(json.loads(schema), indent=4))


if __name__ == "__main__":
    try:
        main()
    except Exception:
        print(traceback.format_exc())

以下のように表示され、DATETIME型がstring型になってしまいました。


---- data ----
[{'datetime_col': '2022-01-02T03:04:05',
  'timestamp_col': datetime.datetime(2022, 6, 7, 8, 9, 10, tzinfo=datetime.timezone.utc)}]
---- schema ----
{
    "type": "record",
    "name": "Root",
    "fields": [
        {
            "name": "datetime_col",
            "type": "string",
            "doc": "DATETIME type column"
        },
        {
            "name": "timestamp_col",
            "type": {
                "type": "long",
                "doc": "TIMESTAMP type column",
                "logicalType": "timestamp-micros"
            }
        }
    ]
}

BigQueryのDATETIME型の対応状況

本記事執筆時点で、BigQueryのAvroファイルDATETIME型への対応状況は以下の結果となりました。

import export
Google BigQuery string型で出力される

まとめ

ややこしいので、私はBigQueryではDATETIME型でなくTIMESTAMP型を使うことにしました。
BigQueryのDATETIME型とTIMESTAMP型の違いは、タイムゾーン情報を持っているのがTIMESTAMP型、持っていないのがDATETIME型です。
またTIMESTAMP型は、BigQuery上ではUTCで持つと決まっているそうです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0