10月からGoogle BigQueryを使い始めました。
データのimport/exportに、Apache Avro形式のファイルを使ってみましたので、その扱いについて記事にします。
Avro形式は、データだけでなくテーブルのスキーマ情報も持つので、テーブル作成のデータソースとしてjsonやcsvより便利です。
また、バイナリファイルなのでjsonやcsvよりも高速に処理できるそうです。
参考URL(感謝します)
- BigQueryのドキュメント
- Avroの仕様
- Dataclasses Avro Schema Generator / Logical Types
- BigQueryの外部テーブルでAvroを扱った際にTIMESTAMPでハマった話
AvroファイルでDATETIME型を扱えるか?
BigQueryのドキュメントによれば、
BigQueryのDATETIME型 == Avroのlocal-timestamp-millis型
または
BigQueryのDATETIME型 == Avroのlocal-timestamp-micros型
だそうです。
また、Avroの仕様には、 Local timestamp (millisecond precision) 型と Local timestamp (microsecond precision) 型が定義されています。
これらを見るとAvroファイルでDATETIME型が使えそうですが、実際に試してみてそれなりにハマったので、私が試した範囲内で整理して記事にします。
また、Python等でAvroファイルを作り、それをデータソースにしてBigQueryでテーブルを作るという作業はしばしば発生します。
そのため、Avroファイルを操作するPythonコードを残しておく意味でも記事にしました。
先に結果表示:AvroファイルDATETIME型への対応状況
本記事執筆時点で、以下の結果となりました。
Pythonパッケージの対応状況
(DATETIME型) | read | write |
---|---|---|
fastavro | 〇 | 〇 |
Apache Python avro package | 整数型で読まれる | 整数型で渡す必要がある |
fastavro、いいですね。
Google BigQueryの対応状況
(DATETIME型) | import | export |
---|---|---|
Google BigQuery | 〇 | string型で出力される |
先に結果表示:AvroファイルTIMESTAMP型への対応状況
Pythonパッケージの対応状況
(TIMESTAMP型) | read | write |
---|---|---|
fastavro | 〇 | 〇 |
Apache Python avro package | 〇 | 〇 |
Google BigQueryの対応状況
(TIMESTAMP型) | import | export |
---|---|---|
Google BigQuery | 〇 | 〇 |
TIMESTAMP型への対応は、すべて問題ありませんでした。
結果を先に書いてしまいましたが、以下私が試した手順&Pythonコードを記します。
最初にfastavroを調べます。
fastavro
インストールしたPythonパッケージ:
fastavro==1.7.0
fastavroでAvroファイルをwrite
以下のコードを動かします。
import traceback
import fastavro
import datetime
from pathlib import Path
def main():
schema = {
"type": "record",
"name": "MyType",
"fields": [
{
"name": "datetime_col",
"type": {
"type": "long",
"logicalType": "local-timestamp-millis"
},
"doc": "DATETIME type column"
},
{
"name": "timestamp_col",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "TIMESTAMP type column"
}
],
"doc": "Types containing DATETIME and TIMESTAMP"
}
out_dir = Path.cwd()
out_path = out_dir / "by_fastavro.avro"
# データ作成
a_datetime = datetime.datetime(2022, 1, 2, 3, 4, 5)
# a_timestamp = datetime.datetime(2022, 6, 7, 8, 9, 10, tzinfo=datetime.timezone.utc)
a_timestamp = datetime.datetime(2022, 6, 7, 8, 9, 10)
# avroファイルwrite
with open(out_path, "wb") as fo:
writer = fastavro.write.Writer(fo, schema)
writer.write({
"datetime_col": a_datetime,
"timestamp_col": a_timestamp
})
writer.flush()
if __name__ == "__main__":
try:
main()
except Exception:
print(traceback.format_exc())
Avroファイル「by_fastavro.avro」が生成されました。
次の節で、このAvroファイルが正常に作られているか、中身を確認します。
fastavroでAvroファイルをread
以下のコードを動かして、前の節で生成されたAvroファイル「by_fastavro.avro」をreadします。
import traceback
import json
import pprint
from pathlib import Path
import fastavro
class MyReader(fastavro.reader):
def __init__(self, file):
super().__init__(file)
self.meta = self._header["meta"]
def main():
in_dir = Path.cwd()
in_path = in_dir / "by_fastavro.avro"
# avroファイルread
data = []
with open(in_path, "rb") as fi:
reader = MyReader(fi)
schema = reader.meta["avro.schema"].decode("utf-8")
for rec in reader:
data.append(rec)
print("---- data ----")
pprint.pprint(data)
print("---- schema ----")
print(json.dumps(json.loads(schema), indent=4))
if __name__ == "__main__":
try:
main()
except Exception:
print(traceback.format_exc())
以下のように表示され、write・read共にDATETIME型が正常に処理された様子です。
---- data ----
[{'datetime_col': datetime.datetime(2022, 1, 2, 3, 4, 5),
'timestamp_col': datetime.datetime(2022, 6, 7, 8, 9, 10, tzinfo=datetime.timezone.utc)}]
---- schema ----
{
"type": "record",
"name": "MyType",
"fields": [
{
"name": "datetime_col",
"type": {
"type": "long",
"logicalType": "local-timestamp-millis"
},
"doc": "DATETIME type column"
},
{
"name": "timestamp_col",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "TIMESTAMP type column"
}
],
"doc": "Types containing DATETIME and TIMESTAMP"
}
fastavroのDATETIME型の対応状況
本記事執筆時点で、fastavroのAvroファイルDATETIME型への対応状況は以下の結果となりました。
read | write | |
---|---|---|
fastavro | 〇 | 〇 |
次に、Apache Python avro packageを調べます。
Apache Python avro package
インストールしたPythonパッケージ:
avro==1.11.1
dataclasses-avroschema==0.34.2
Apache Python avro packageでAvroファイルをwrite:DATETIME型
DATETIME型では、結果的に3パターンのコードを書き、3パターン目でAvroファイルの出力に成功しました。
まずは以下のパターン1のコードを動かします。
参考URL: Dataclasses Avro Schema Generator / Logical Types
import traceback
from pathlib import Path
import datetime
import json
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import dataclasses
from dataclasses_avroschema import AvroModel
@dataclasses.dataclass
class MyType(AvroModel):
"Types containing DATETIME"
datetime_col: datetime.datetime
def main():
# スキーマ表示
schema_py = MyType.avro_schema_to_python()
print("---- schema ----")
print(json.dumps(schema_py, indent=4))
out_dir = Path.cwd()
out_path = out_dir / "by_apache_avro_datetime_1.avro"
a_datetime = datetime.datetime(2022, 1, 2, 3, 4, 5)
# avroファイルwrite
schema = MyType.avro_schema()
with open(out_path, "wb") as fo:
with DataFileWriter(fo, DatumWriter(), avro.schema.parse(schema)) as writer:
writer.append({
"datetime_col": a_datetime
})
if __name__ == "__main__":
try:
main()
except Exception:
print(traceback.format_exc())
パターン1の実行結果は以下の通りです。
logicalTypeがtimestamp-millis、つまりTIMESTAMP型になってしまい、データ挿入もコケました。
このコードではダメなようです。
---- schema ----
{
"type": "record",
"name": "MyType",
"fields": [
{
"name": "datetime_col",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
],
"doc": "Types containing DATETIME"
}
Traceback (most recent call last):
File "F:\PycharmProjects\sample_project\avro_datetime_writer_1.py", line 40, in <module>
main()
File "F:\PycharmProjects\sample_project\avro_datetime_writer_1.py", line 33, in main
writer.append({
File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\datafile.py", line 259, in append
self.datum_writer.write(datum, self.buffer_encoder)
File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\io.py", line 1013, in write
validate(self.writers_schema, datum, raise_on_error=True)
File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\io.py", line 147, in validate
raise avro.errors.AvroTypeException(current_node.schema, current_node.name, current_node.datum)
avro.errors.AvroTypeException: The datum "2022-01-02 03:04:05" provided for "datetime_col" is not an example of the schema {
"type": "long",
"logicalType": "timestamp-millis"
}
パターン1から改善したパターン2のコード。
import traceback
from pathlib import Path
import datetime
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
SCHEMA = '''\
{
"type": "record",
"name": "MyType",
"fields": [
{
"name": "datetime_col",
"type": {
"type": "long",
"logicalType": "local-timestamp-millis"
},
"doc": "DATETIME type column"
}
],
"doc": "Types containing DATETIME"
}
'''
def main():
out_dir = Path.cwd()
out_path = out_dir / "by_apache_avro_datetime_2.avro"
a_datetime = datetime.datetime(2022, 1, 2, 3, 4, 5)
# avroファイルwrite
with open(out_path, "wb") as fo:
with DataFileWriter(fo, DatumWriter(), avro.schema.parse(SCHEMA)) as writer:
writer.append({
"datetime_col": a_datetime
})
if __name__ == "__main__":
try:
main()
except Exception:
print(traceback.format_exc())
パターン2の実行結果は以下の通りです。
local-timestamp-millis型、つまりDATETIME型がUnknownとwarningsが表示され、その後データ挿入がコケました。
このコードもダメなようです。
C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\schema.py:1099: IgnoredLogicalType: Unknown local-timestamp-millis, using long.
warnings.warn(avro.errors.IgnoredLogicalType(f"Unknown {logical_type}, using {type_}."))
Traceback (most recent call last):
File "F:\PycharmProjects\sample_project\avro_datetime_writer_2.py", line 44, in <module>
main()
File "F:\PycharmProjects\sample_project\avro_datetime_writer_2.py", line 37, in main
writer.append({
File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\datafile.py", line 259, in append
self.datum_writer.write(datum, self.buffer_encoder)
File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\io.py", line 1013, in write
validate(self.writers_schema, datum, raise_on_error=True)
File "C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\io.py", line 147, in validate
raise avro.errors.AvroTypeException(current_node.schema, current_node.name, current_node.datum)
avro.errors.AvroTypeException: The datum "2022-01-02 03:04:05" provided for "datetime_col" is not an example of the schema {
"type": "long",
"logicalType": "local-timestamp-millis"
}
さらに改善したパターン3のコード。
パターン2ではlogicalTypeが認識されなかったので、DATETIME型から整数型に変換してデータ挿入を試みます。
その整数型変数には、1970年1月1日からの経過時間を入れるのですが、logicalTypeがlocal-timestamp-millis型なので、経過ミリ秒を計算して入れます。
import traceback
from pathlib import Path
import datetime
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
SCHEMA = '''\
{
"type": "record",
"name": "MyType",
"fields": [
{
"name": "datetime_col",
"type": {
"type": "long",
"logicalType": "local-timestamp-millis"
},
"doc": "DATETIME type column"
}
],
"doc": "Types containing DATETIME"
}
'''
DATETIME_ORIGIN = datetime.datetime(1970, 1, 1, 0, 0)
def main():
out_dir = Path.cwd()
out_path = out_dir / "by_apache_avro_datetime_3.avro"
a_datetime = datetime.datetime(2022, 1, 2, 3, 4, 5)
# avroファイルwrite
with open(out_path, "wb") as fo:
with DataFileWriter(fo, DatumWriter(), avro.schema.parse(SCHEMA)) as writer:
writer.append({
"datetime_col": int((a_datetime - DATETIME_ORIGIN).total_seconds() * 1000)
})
if __name__ == "__main__":
try:
main()
except Exception:
print(traceback.format_exc())
パターン3の実行結果は以下の通りです。
warningsが表示されますが、Avroファイル「by_apache_avro_datetime_3.avro」が生成されました。
C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\schema.py:1099: IgnoredLogicalType: Unknown local-timestamp-millis, using long.
warnings.warn(avro.errors.IgnoredLogicalType(f"Unknown {logical_type}, using {type_}."))
以下のコードを実行して、生成されたAvroファイルの中身を見てみます。
# fastavro_reader.pyのin_path変数を変更しただけのコード
import traceback
import json
import pprint
from pathlib import Path
import fastavro
class MyReader(fastavro.reader):
def __init__(self, file):
super().__init__(file)
self.meta = self._header["meta"]
def main():
in_dir = Path.cwd()
in_path = in_dir / "by_apache_avro_datetime_3.avro"
# avroファイルread
data = []
with open(in_path, "rb") as fi:
reader = MyReader(fi)
schema = reader.meta["avro.schema"].decode("utf-8")
for rec in reader:
data.append(rec)
print("---- data ----")
pprint.pprint(data)
print("---- schema ----")
print(json.dumps(json.loads(schema), indent=4))
if __name__ == "__main__":
try:
main()
except Exception:
print(traceback.format_exc())
以下のように表示され、パターン3のコードで正常なAvroファイルが生成されることが確認できました。
---- data ----
[{'datetime_col': datetime.datetime(2022, 1, 2, 3, 4, 5)}]
---- schema ----
{
"type": "record",
"name": "MyType",
"fields": [
{
"type": {
"type": "long",
"logicalType": "local-timestamp-millis"
},
"name": "datetime_col",
"doc": "DATETIME type column"
}
],
"doc": "Types containing DATETIME"
}
Apache Python avro packageでAvroファイルをwrite:TIMESTAMP型
TIMESTAMP型も確認してみます。
DATETIME型のパターン1のコードを以下のように書き換えました。
Apache Python avro packageでは、TIMESTAMP型に挿入するには、Pythonのdatetime型変数にタイムゾーンを設定する必要があるようです。
import traceback
from pathlib import Path
import datetime
import json
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import dataclasses
from dataclasses_avroschema import AvroModel
@dataclasses.dataclass
class MyType(AvroModel):
"Types containing TIMESTAMP"
timestamp_col: datetime.datetime
def main():
# スキーマ表示
schema_py = MyType.avro_schema_to_python()
print("---- schema ----")
print(json.dumps(schema_py, indent=4))
out_dir = Path.cwd()
out_path = out_dir / "by_apache_avro_timestamp_1.avro"
a_timestamp = datetime.datetime(2022, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc)
# avroファイルwrite
schema = MyType.avro_schema()
with open(out_path, "wb") as fo:
with DataFileWriter(fo, DatumWriter(), avro.schema.parse(schema)) as writer:
writer.append({
"timestamp_col": a_timestamp
})
if __name__ == "__main__":
try:
main()
except Exception:
print(traceback.format_exc())
パターン1を実行すると以下のように表示されて、Avroファイル「by_apache_avro_timestamp_1.avro」が生成されました。
---- schema ----
{
"type": "record",
"name": "MyType",
"fields": [
{
"name": "timestamp_col",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
],
"doc": "Types containing TIMESTAMP"
}
以下のコードを実行して、生成されたAvroファイルの中身を見てみます。
# fastavro_reader.pyのin_path変数を変更しただけのコード
import traceback
import json
import pprint
from pathlib import Path
import fastavro
class MyReader(fastavro.reader):
def __init__(self, file):
super().__init__(file)
self.meta = self._header["meta"]
def main():
in_dir = Path.cwd()
in_path = in_dir / "by_apache_avro_timestamp_1.avro"
# avroファイルread
data = []
with open(in_path, "rb") as fi:
reader = MyReader(fi)
schema = reader.meta["avro.schema"].decode("utf-8")
for rec in reader:
data.append(rec)
print("---- data ----")
pprint.pprint(data)
print("---- schema ----")
print(json.dumps(json.loads(schema), indent=4))
if __name__ == "__main__":
try:
main()
except Exception:
print(traceback.format_exc())
以下のように表示され、パターン1のコードだけで正常なAvroファイルが生成されることが確認できました。
---- data ----
[{'timestamp_col': datetime.datetime(2022, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc)}]
---- schema ----
{
"type": "record",
"name": "MyType",
"fields": [
{
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"name": "timestamp_col"
}
],
"doc": "Types containing TIMESTAMP"
}
Apache Python avro packageでAvroファイルをread
writeの次はreadを調べます。
以下のコードで、fastavro_writer.pyが生成したAvroファイル「by_fastavro.avro」を読んでみます。
import traceback
import json
import pprint
from pathlib import Path
import datetime
from avro.datafile import DataFileReader
from avro.io import DatumReader
DATETIME_ORIGIN = datetime.datetime(1970, 1, 1, 0, 0)
def main():
in_dir = Path.cwd()
in_path = in_dir / "by_fastavro.avro"
# avroファイルread
data = []
with open(in_path, "rb") as fi:
with DataFileReader(open(in_path, "rb"), DatumReader()) as reader:
schema = reader.meta["avro.schema"].decode("utf-8")
for rec in reader:
data.append({
"datetime_col": DATETIME_ORIGIN + datetime.timedelta(milliseconds=rec["datetime_col"]),
"timestamp_col": rec["timestamp_col"]
})
print("---- data ----")
pprint.pprint(data)
print("---- schema ----")
print(json.dumps(json.loads(schema), indent=4))
if __name__ == "__main__":
try:
main()
except Exception:
print(traceback.format_exc())
以下のように表示されました。
DATETIME型でwarningsが出て、整数型として読まれましたが、ソースコードでPythonのdatetime型に変換できています。
C:\Users\User\.virtualenvs\sample_project\lib\site-packages\avro\schema.py:1099: IgnoredLogicalType: Unknown local-timestamp-millis, using long.
warnings.warn(avro.errors.IgnoredLogicalType(f"Unknown {logical_type}, using {type_}."))
---- data ----
[{'datetime_col': datetime.datetime(2022, 1, 2, 3, 4, 5),
'timestamp_col': datetime.datetime(2022, 6, 7, 8, 9, 10, tzinfo=<avro.timezones.UTCTzinfo object at 0x0000021FD350AB90>)}]
---- schema ----
{
"type": "record",
"name": "MyType",
"fields": [
{
"name": "datetime_col",
"type": {
"type": "long",
"logicalType": "local-timestamp-millis"
},
"doc": "DATETIME type column"
},
{
"name": "timestamp_col",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "TIMESTAMP type column"
}
],
"doc": "Types containing DATETIME and TIMESTAMP"
}
Apache Python avro packageのDATETIME型の対応状況
本記事執筆時点で、Apache Python avro packageのAvroファイルDATETIME型への対応状況は以下の結果となりました。
read | write | |
---|---|---|
Apache Python avro package | 整数型で読まれる | 整数型で渡す必要がある |
次に、Google BigQueryを調べます。
Google BigQuery
Avroファイルをソースにして(import)、BigQueryテーブルを作成
以下のbq loadコマンドを実行して、fastavro_writer.pyが生成したAvroファイル「by_fastavro.avro」からBigQueryテーブルを作成してみます。
参考URL: BigQueryの外部テーブルでAvroを扱った際にTIMESTAMPでハマった話
bq load \
--replace \
--source_format=AVRO \
--use_avro_logical_types \
example_dataset.by_fastavro \
./by_fastavro.avro
以下のように、DATETIME型のカラムが正しく作成されました。
importの次はexportを調べます。
BigQueryテーブルからAvroファイルにexport
以下のように、先ほど作成したBigQueryテーブルから、Avroファイル名「export.avro」にexportしました。
以下のコードで、Avroファイルの中身を調べてみます。
# fastavro_reader.pyのin_path変数を変更しただけのコード
import traceback
import json
import pprint
from pathlib import Path
import fastavro
class MyReader(fastavro.reader):
def __init__(self, file):
super().__init__(file)
self.meta = self._header["meta"]
def main():
in_dir = Path.cwd()
in_path = in_dir / "export.avro"
# avroファイルread
data = []
with open(in_path, "rb") as fi:
reader = MyReader(fi)
schema = reader.meta["avro.schema"].decode("utf-8")
for rec in reader:
data.append(rec)
print("---- data ----")
pprint.pprint(data)
print("---- schema ----")
print(json.dumps(json.loads(schema), indent=4))
if __name__ == "__main__":
try:
main()
except Exception:
print(traceback.format_exc())
以下のように表示され、DATETIME型がstring型になってしまいました。
---- data ----
[{'datetime_col': '2022-01-02T03:04:05',
'timestamp_col': datetime.datetime(2022, 6, 7, 8, 9, 10, tzinfo=datetime.timezone.utc)}]
---- schema ----
{
"type": "record",
"name": "Root",
"fields": [
{
"name": "datetime_col",
"type": "string",
"doc": "DATETIME type column"
},
{
"name": "timestamp_col",
"type": {
"type": "long",
"doc": "TIMESTAMP type column",
"logicalType": "timestamp-micros"
}
}
]
}
BigQueryのDATETIME型の対応状況
本記事執筆時点で、BigQueryのAvroファイルDATETIME型への対応状況は以下の結果となりました。
import | export | |
---|---|---|
Google BigQuery | 〇 | string型で出力される |
まとめ
ややこしいので、私はBigQueryではDATETIME型でなくTIMESTAMP型を使うことにしました。
BigQueryのDATETIME型とTIMESTAMP型の違いは、タイムゾーン情報を持っているのがTIMESTAMP型、持っていないのがDATETIME型です。
またTIMESTAMP型は、BigQuery上ではUTCで持つと決まっているそうです。