はじめに
こんにちは、株式会社unerryでデータエンジニアをしているJanosと申します。
弊社ではIoTセンサーのBeaconによる実社会の位置情報を中心としたサービスを展開しています。私はデータエンジニアとして普段他社とのデータ連携処理を実装することが多く、例えば今回のようにcsvファイルを受領しBigQueryへloadするようなタスクは往々にして発生します。この時連携するデータは多くのケースで非常に大きいサイズとなっています。
issue
今回の問題についてまとめると、まず31日間のサンプルデータをcsv.gz
で受領したのですが
- 1ファイルあたり約5~7MiB
- 1日あたり200ファイル
というような構造になっており1日あたり約10~11Gib、総計1ヶ月全体で約300 GiBというデータ量でした。
これら全てを分析のためBigQueryへloadしていきます。PythonでcsvをBigQueryにloadするにあたりコードをそんなに書く必要はありません。
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
)
ところがload_table_from_uri
はcsvのparseに失敗したりcsvのrowにschemaで指定された型と違う値が入っていたりすると全体が失敗してしまいます。失敗すると今回の処理のどこで失敗したか教えてくれるのですが、この先ファイルのどこで失敗するのかは教えてくれません。毎回1つのエラーだけです。(参考:https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html#google.cloud.bigquery.client.Client.load_table_from_uri)
こうなるとload処理は大変なものになってしまいます。全体でどれくらい不正なrowがあるかも分からず、また不正なrowがあったとしてその原因を突き止めて修正なり除去なりをしようとしても一つのファイルが巨大すぎて効率よく対処できません。
回避する方法としてmax_bad_records
として無視する不正なrowの最大数を指定するというものがあります。しかし巨大なファイル群を前にして不正なrow数が予めわかっていることは殆どなく、これでは神頼みのようになってしまいます。
Apache Avro形式とは
今回この問題をcsvからApache Avro形式に変換することで解決しました。Apache Avroはバイナリ形式にシリアライズしたデータとそのデータのスキーマを同一ファイル内にバンドルするファイル形式です。
次はAvroで用いられるスキーマの例を示します。スキーマはJSONで定義することができます:
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Avro形式を利用することの最大の利点はこの型を指定できるスキーマにありました。csvと違い明確に型を指定することで、BigQueryにloadする際不正なデータが混入することを防ぐことが可能になったのです。
BigQueryとAvro
またAvro形式はBigQueryへデータをloadすること自体に適しています。データを並列に読み取ることができる為、高速にBigQueryへloadすることができます。
CSVからAvroへの変換
ではいよいよ実際にcsvからAvroへ変換する方法を見ていきます。
まずはAvroスキーマの定義からです。
fields = [
{"name": "id",
"type": "string"},
{"name": "name",
"type": ["null", "string"],
"default": None},
{"name": "weight",
"type": ["null", "float"],
"default": None},
{"name": "height",
"type": ["null", "float"],
"default": None},
]
schema = {
"doc": "this is sample data",
"name": "Data",
"namespace": "example",
"type": "record",
"fields": fields,
}
null値を許容するcolumnは["null", "string"]
のようにそのcolumnの型とnullのリストを指定します。
まずはcsvを読み込みたいのですが、これにはpandasを用いることにします。
import pandas as pd
columns = ["id", "name", "weight", "height"]
df = pd.read_csv(buffer, error_bad_lines=False)
df.columns = columns
records = df.to_dict("records")
Avroに書き込むときに [{column -> value_0}, … , {column -> value_n}]
の形のdictのlistが必要になるのでdf.to_dict("records")
でこれを生成しています。
今このrecordsの中には不正なrowが混入している状態です。今回Avro形式を採用した恩恵はschemaに沿ったvalidationを実行することにより浴することができます。validate関数を作りfilterすることでschemaに適合しない不正なrowを除去します。
import fastavro
def validate(x) -> bool:
return fastavro.validate(x, schema, raise_errors=False)
good_records = filter(validate, records)
Avroの書き込みにはより高速と言われているfastavroを用いています。今回は変換したAvroファイルをGCSへアップロードしました。
import fastavro
from google.cloud import storage
buffer = io.BytesIO()
fastavro.writer(buffer, schema=schema,
records=good_records)
buffer.seek(0)
storage_client = storage.Client()
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(target_uri)
try:
blob.upload_from_string(buffer.read())
except Exception as e:
logging.error(e)
以上でcsvからAvroへの変換が出来ました。実際変換したAvroファイルをBigQueryにまとめてloadするとデータ一日あたり数分で終わり、途中でエラーで躓くこともなく無事に読み込むことができました。
日付はどう扱う?
最後にAvroで日付をどう扱うかについて触れておこうと思います。日付を表すDateはAvroではlogical typeというprimitiveまたはcomplexの型に付加的な属性を持つもののひとつとして定義されます。
{"name": "measuring_date",
"type": {
"type": "int",
"logicalType": "date"
}}
AvroではDateの値は1970年1月1日からの経過日数のintとして表現されます。日付を変換する際には一手間必要になるかと思います。