1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

初心者向け!GCPでParquetファイルをDataflowを用いてBigQueryにインポートする方法

Last updated at Posted at 2024-10-31

本記事は初学者向けのDataflowとCloud Functionsを活用したパイプライン構築に関する内容です。Dataflowを使うことになった経緯や使い方、構築時のポイントやエラーハンドリングについて詳しく解説していきます。


GCSに配置されたParquetファイルを自動でBigQueryにロードするパイプラインの構築 - 初心者向け解説

はじめに:ビジネス背景とパイプライン構築の目的

あるプロジェクトで大量のデータをParquetファイル形式でGoogle Cloud Storage(GCS)に定期的にアップロードし、それをBigQueryに蓄積する必要がありました。毎回手動でデータをインポートするのではなく、パイプライン構築によって自動化したいと考え、Cloud FunctionsとDataflowを用いたパイプラインの構築を進めました。ここではその実装の流れを解説し、Dataflowを初めて使う方向けにナレッジを共有します。

技術選定:DataflowとCloud Functionsを選んだ理由

パイプラインを実装するにあたり、GCSにファイルがアップロードされると自動でトリガーが発生し、データをBigQueryにロードする流れが必要でした。このため、ファイル配置をトリガーに実行できるCloud Functionsと、分散処理が可能でパフォーマンスに優れたDataflowの組み合わせが最適だと判断しました。

  • Cloud Functions:GCS上でのファイル配置イベントをトリガーし、Dataflowジョブを簡単に開始できます。
  • Dataflow:Apache Beamをベースにしたデータ処理基盤で、スケーラブルで柔軟なデータ処理を実現し、処理負荷に応じたスケーリングが可能です。

Dataflowについて:Dataflowの特徴と一般的なユースケース

Google Cloud DataflowはApache Beamを基盤にしたデータ処理基盤であり、以下の特徴を持っています:

  • 分散処理の自動スケーリング:大量データ処理に適しており、処理量に応じてスケールアップ・スケールダウンが自動で行われます。
  • バッチ・ストリーミング処理:リアルタイムのストリーミングデータ処理にも対応しています。
  • さまざまなデータソースに対応:GCSやBigQuery、Pub/SubなどのGoogle Cloudプロダクトとの統合が容易です。

Dataflowは主に、データの前処理やETLパイプライン、リアルタイムデータ分析、ストリーミングデータの変換に利用されることが多いです。ここでは、GCSに配置されたParquetファイルをBigQueryにロードするために利用しています。

Dataflow構築について

実装したパイプラインの概要

Cloud FunctionsがGCSに配置されたParquetファイルを検知し、Dataflowジョブを起動することで、自動的にParquetファイルのデータがBigQueryにロードされるパイプラインを構築しました。

つまずいたポイント:スキーマ設定まわり

Dataflowでは、BigQueryにデータをロードする際にスキーマが必要です。Parquetファイルのスキーマを自動で検出する方法として、SCHEMA_AUTODETECT を利用しようとしましたが、スキーマの並び順が変わってしまう、つまりBQテーブル作成時にカラム順が一致しないという現象が起きることがわかりました。

そこで、以下の工夫を取り入れました:

  1. Parquetファイルからスキーマを取得する関数を定義
    GCSに保存されたParquetファイルを直接読み込み、BigQuery用のスキーマに変換する処理を組み込みました。このように、Apache Arrowを活用してParquetファイルのスキーマを動的に取得し、BigQueryに渡せるように変換することでスキーマ設定の問題を解決しました。

  2. Dataflowオプションの詳細設定
    データ量に応じて動的にスケールできるよう、DataflowのオートスケーリングやSSDディスクの設定を活用し、パフォーマンス向上と費用の最適化を図りました。また、pd-ssd を設定してI/Oを高速化し、データ処理が詰まらないよう工夫しています。

  3. エラーハンドリングとテーブル存在確認
    BigQueryへのデータロード後にテーブルが作成されているか確認するロジックを追加し、テーブルが未作成の場合はエラーログを出力するようにしました。ただし、Dataflowのジョブ完了を待ってからテーブルの存在を確認する必要があるため、適切なタイミングで確認処理を行うよう工夫が求められました。

コード例:パイプライン全体の構造

以下は、今回実装したパイプラインの一部です。Cloud FunctionsがGCSのファイル配置を検知し、Dataflowで処理を行いBigQueryにロードする流れを実現しています。

import os
import re
import datetime
import apache_beam as beam
import pyarrow.parquet as pq
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, WorkerOptions
from google.cloud import storage, bigquery

# 定数定義(必須設定)
PROJECT_ID = 'project-id'  # Google CloudプロジェクトID
REGION = 'asia-northeast1'  # Dataflowジョブの実行リージョン
TEMP_LOCATION = 'gs://path/to/temp'  # 一時データの保存先
SUBNETWORK_NAME = 'subnetwork-name'  # サブネットワークの設定
ZONE = 'zone-name'  # ディスク設定のためのゾーン

def extract_table_name(file_path):
    """
    Parquetファイルのパスからテーブル名を抽出
    """
    return re.sub(r'\.parquet$', '', os.path.basename(file_path))  # 拡張子を削除しファイル名をテーブル名として返す

def get_parquet_schema_from_gcs(gcs_path):
    """
    GCS上のParquetファイルから直接スキーマを取得し、BigQuery用に変換
    """
    # GCSからファイルをダウンロードせずに直接バッファで読み込む
    storage_client = storage.Client()
    bucket_name, object_name = gcs_path.replace("gs://", "").split("/", 1)
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(object_name)
    
    # Parquetファイルをストリームとして読み込み、スキーマを取得
    with blob.open("rb") as file_stream:
        parquet_file = pq.ParquetFile(file_stream)
        # ArrowスキーマからBigQueryスキーマに変換
        return [{"name": field.name, "type": 'STRING' if field.type == 'utf8' else 'INTEGER'} for field in parquet_file.schema_arrow]

def run_pipeline(parquet_files):
    """
    Dataflowパイプラインを実行し、ParquetファイルをBigQueryにロード
    """
    options = PipelineOptions()

    # Google Cloud関連のオプション設定
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = PROJECT_ID
    google_cloud_options.temp_location = TEMP_LOCATION  # 一時ファイルの保存先を設定

    # Dataflowランナーの指定
    standard_options = options.view_as(StandardOptions)
    standard_options.runner = 'DataflowRunner'

    # ワーカーの設定(スケーリングと性能最適化)
    worker_options = options.view_as(WorkerOptions)
    worker_options.machine_type = 'n2-highmem-16'  # 大容量メモリのインスタンスタイプ
    worker_options.disk_type = f'compute.googleapis.com/projects/{PROJECT_ID}/zones/{ZONE}/diskTypes/pd-ssd'
    worker_options.use_public_ips = False  # 内部IPのみでの実行を指定

    # パイプラインの実行
    with beam.Pipeline(options=options) as p:
        for file_path in parquet_files:
            # 各Parquetファイルごとにテーブルを指定してBigQueryに書き込み
            output_table = f"{PROJECT_ID}:dataset.{extract_table_name(file_path)}"
            schema = get_parquet_schema_from_gcs(file_path)  # ParquetスキーマをBigQuery用に変換
            
            try:
                # ReadFromParquetでParquetファイルを読み込み、BigQueryに書き込む
                (p
                 | f'Read Parquet: {file_path}' >> beam.io.ReadFromParquet(file_path)
                 | f'Write to BigQuery: {output_table}' >> beam.io.WriteToBigQuery(
                        output_table, schema=schema,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
            except Exception as e:
                # エラー発生時の処理
                print(f"Error: {e}")

def trigger_dataflow(event, context):
    """
    Cloud Functionのトリガー関数。GCSにアップロードされたParquetファイルを検知してDataflowジョブを開始
    """
    bucket_name = event['bucket']
    file_name = event['name']
    
    # Parquetファイルのみ処理
    if file_name.endswith('.parquet'):
        gcs_file_path = f'gs://{bucket_name}/{file_name}'
        run_pipeline([gcs_file_path])  # 複数ファイルを扱えるようリストで渡す

終わりに

今回のパイプライン構築では、Cloud FunctionsでのトリガーからDataflowの処理までを一貫して実装し、ParquetファイルをBigQueryにロードするための一連の流れを学びました。スキーマの自動取得やSSDの設定、エラーハンドリングなど、初心者でも参考になる工夫を盛り込み、今後のデータパイプライン構築に役立てる内容となっています。

特にDataflowの詳細設定や、Cloud Functionsと連携する際のポイントなど、初めてDataflowを使う方にとって、少しでもお役立てできておりましたら幸いです。


1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?