本記事は初学者向けのDataflowとCloud Functionsを活用したパイプライン構築に関する内容です。Dataflowを使うことになった経緯や使い方、構築時のポイントやエラーハンドリングについて詳しく解説していきます。
GCSに配置されたParquetファイルを自動でBigQueryにロードするパイプラインの構築 - 初心者向け解説
はじめに:ビジネス背景とパイプライン構築の目的
あるプロジェクトで大量のデータをParquetファイル形式でGoogle Cloud Storage(GCS)に定期的にアップロードし、それをBigQueryに蓄積する必要がありました。毎回手動でデータをインポートするのではなく、パイプライン構築によって自動化したいと考え、Cloud FunctionsとDataflowを用いたパイプラインの構築を進めました。ここではその実装の流れを解説し、Dataflowを初めて使う方向けにナレッジを共有します。
技術選定:DataflowとCloud Functionsを選んだ理由
パイプラインを実装するにあたり、GCSにファイルがアップロードされると自動でトリガーが発生し、データをBigQueryにロードする流れが必要でした。このため、ファイル配置をトリガーに実行できるCloud Functionsと、分散処理が可能でパフォーマンスに優れたDataflowの組み合わせが最適だと判断しました。
- Cloud Functions:GCS上でのファイル配置イベントをトリガーし、Dataflowジョブを簡単に開始できます。
- Dataflow:Apache Beamをベースにしたデータ処理基盤で、スケーラブルで柔軟なデータ処理を実現し、処理負荷に応じたスケーリングが可能です。
Dataflowについて:Dataflowの特徴と一般的なユースケース
Google Cloud DataflowはApache Beamを基盤にしたデータ処理基盤であり、以下の特徴を持っています:
- 分散処理の自動スケーリング:大量データ処理に適しており、処理量に応じてスケールアップ・スケールダウンが自動で行われます。
- バッチ・ストリーミング処理:リアルタイムのストリーミングデータ処理にも対応しています。
- さまざまなデータソースに対応:GCSやBigQuery、Pub/SubなどのGoogle Cloudプロダクトとの統合が容易です。
Dataflowは主に、データの前処理やETLパイプライン、リアルタイムデータ分析、ストリーミングデータの変換に利用されることが多いです。ここでは、GCSに配置されたParquetファイルをBigQueryにロードするために利用しています。
Dataflow構築について
実装したパイプラインの概要
Cloud FunctionsがGCSに配置されたParquetファイルを検知し、Dataflowジョブを起動することで、自動的にParquetファイルのデータがBigQueryにロードされるパイプラインを構築しました。
つまずいたポイント:スキーマ設定まわり
Dataflowでは、BigQueryにデータをロードする際にスキーマが必要です。Parquetファイルのスキーマを自動で検出する方法として、SCHEMA_AUTODETECT
を利用しようとしましたが、スキーマの並び順が変わってしまう、つまりBQテーブル作成時にカラム順が一致しないという現象が起きることがわかりました。
そこで、以下の工夫を取り入れました:
-
Parquetファイルからスキーマを取得する関数を定義
GCSに保存されたParquetファイルを直接読み込み、BigQuery用のスキーマに変換する処理を組み込みました。このように、Apache Arrowを活用してParquetファイルのスキーマを動的に取得し、BigQueryに渡せるように変換することでスキーマ設定の問題を解決しました。 -
Dataflowオプションの詳細設定
データ量に応じて動的にスケールできるよう、DataflowのオートスケーリングやSSDディスクの設定を活用し、パフォーマンス向上と費用の最適化を図りました。また、pd-ssd
を設定してI/Oを高速化し、データ処理が詰まらないよう工夫しています。 -
エラーハンドリングとテーブル存在確認
BigQueryへのデータロード後にテーブルが作成されているか確認するロジックを追加し、テーブルが未作成の場合はエラーログを出力するようにしました。ただし、Dataflowのジョブ完了を待ってからテーブルの存在を確認する必要があるため、適切なタイミングで確認処理を行うよう工夫が求められました。
コード例:パイプライン全体の構造
以下は、今回実装したパイプラインの一部です。Cloud FunctionsがGCSのファイル配置を検知し、Dataflowで処理を行いBigQueryにロードする流れを実現しています。
import os
import re
import datetime
import apache_beam as beam
import pyarrow.parquet as pq
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, WorkerOptions
from google.cloud import storage, bigquery
# 定数定義(必須設定)
PROJECT_ID = 'project-id' # Google CloudプロジェクトID
REGION = 'asia-northeast1' # Dataflowジョブの実行リージョン
TEMP_LOCATION = 'gs://path/to/temp' # 一時データの保存先
SUBNETWORK_NAME = 'subnetwork-name' # サブネットワークの設定
ZONE = 'zone-name' # ディスク設定のためのゾーン
def extract_table_name(file_path):
"""
Parquetファイルのパスからテーブル名を抽出
"""
return re.sub(r'\.parquet$', '', os.path.basename(file_path)) # 拡張子を削除しファイル名をテーブル名として返す
def get_parquet_schema_from_gcs(gcs_path):
"""
GCS上のParquetファイルから直接スキーマを取得し、BigQuery用に変換
"""
# GCSからファイルをダウンロードせずに直接バッファで読み込む
storage_client = storage.Client()
bucket_name, object_name = gcs_path.replace("gs://", "").split("/", 1)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(object_name)
# Parquetファイルをストリームとして読み込み、スキーマを取得
with blob.open("rb") as file_stream:
parquet_file = pq.ParquetFile(file_stream)
# ArrowスキーマからBigQueryスキーマに変換
return [{"name": field.name, "type": 'STRING' if field.type == 'utf8' else 'INTEGER'} for field in parquet_file.schema_arrow]
def run_pipeline(parquet_files):
"""
Dataflowパイプラインを実行し、ParquetファイルをBigQueryにロード
"""
options = PipelineOptions()
# Google Cloud関連のオプション設定
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.temp_location = TEMP_LOCATION # 一時ファイルの保存先を設定
# Dataflowランナーの指定
standard_options = options.view_as(StandardOptions)
standard_options.runner = 'DataflowRunner'
# ワーカーの設定(スケーリングと性能最適化)
worker_options = options.view_as(WorkerOptions)
worker_options.machine_type = 'n2-highmem-16' # 大容量メモリのインスタンスタイプ
worker_options.disk_type = f'compute.googleapis.com/projects/{PROJECT_ID}/zones/{ZONE}/diskTypes/pd-ssd'
worker_options.use_public_ips = False # 内部IPのみでの実行を指定
# パイプラインの実行
with beam.Pipeline(options=options) as p:
for file_path in parquet_files:
# 各Parquetファイルごとにテーブルを指定してBigQueryに書き込み
output_table = f"{PROJECT_ID}:dataset.{extract_table_name(file_path)}"
schema = get_parquet_schema_from_gcs(file_path) # ParquetスキーマをBigQuery用に変換
try:
# ReadFromParquetでParquetファイルを読み込み、BigQueryに書き込む
(p
| f'Read Parquet: {file_path}' >> beam.io.ReadFromParquet(file_path)
| f'Write to BigQuery: {output_table}' >> beam.io.WriteToBigQuery(
output_table, schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
except Exception as e:
# エラー発生時の処理
print(f"Error: {e}")
def trigger_dataflow(event, context):
"""
Cloud Functionのトリガー関数。GCSにアップロードされたParquetファイルを検知してDataflowジョブを開始
"""
bucket_name = event['bucket']
file_name = event['name']
# Parquetファイルのみ処理
if file_name.endswith('.parquet'):
gcs_file_path = f'gs://{bucket_name}/{file_name}'
run_pipeline([gcs_file_path]) # 複数ファイルを扱えるようリストで渡す
終わりに
今回のパイプライン構築では、Cloud FunctionsでのトリガーからDataflowの処理までを一貫して実装し、ParquetファイルをBigQueryにロードするための一連の流れを学びました。スキーマの自動取得やSSDの設定、エラーハンドリングなど、初心者でも参考になる工夫を盛り込み、今後のデータパイプライン構築に役立てる内容となっています。
特にDataflowの詳細設定や、Cloud Functionsと連携する際のポイントなど、初めてDataflowを使う方にとって、少しでもお役立てできておりましたら幸いです。