1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Google Cloud Storage(GCS)上でgzipファイルを解凍する

Posted at

背景と課題

Cloud Functionsを使って、GCS(Google Cloud Storage)上に保存された .csv.gz ファイル(2.9GB)をBigQueryにインポートしようとしたところ、以下のような問題が発生しました。

  • CSVファイルのロードより、gzipファイルのロード時間が長い
  • 解凍後のCSVサイズが約27GBと巨大
  • 一時ファイル方式(/tmp)やインメモリ解凍では、RAM不足(32GB必要)で処理失敗
  • gsutil には .gz ファイルの解凍機能がない
  • ローカルで展開して再アップロードするのは非現実的
# ★★★ START: GZIP解凍用関数追加 ★★★
def decompress_gzip_in_gcs(storage_client, bucket_name, file_name):
    """
    GCS上のgzipファイル(例: 『a/b.csv.gz』)をその場で展開し 『a/b.csv』 として保存し、元のgzファイルを削除します。
    ファイルが.gzで終わらない場合、元のfile_nameをそのまま返します。
    成功時は展開後のファイル名 (str) を返し、失敗時は None を返します。
    """
    
    # 1. ファイルが .gz 形式かどうかを確認する
    if not file_name.endswith('.gz'):
        print(f"Skipping decompression: {file_name} is not a gzip file.")
        return file_name # GZIPではなく、元のファイル名を返す

    print(f"Decompressing {file_name} in place...")
    
    # /tmp/ディレクトリ内の一時ファイルのパス設定
    # GCSパス内の『/』を『_』に置換し、/tmp/内に不要なディレクトリが作成されるのを防止
    temp_gz_path = f"/tmp/{file_name.replace('/', '_')}"
    temp_decompressed_path = temp_gz_path[:-3] # .gzを削除

    try:
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(file_name)

        if not blob.exists():
            print(f"Error: File not found: {file_name}")
            return None # ファイルが存在しない

        # 2. 解凍後のGCSターゲットファイルのパス(例: 「path/to/file.csv」)
        decompressed_file_name = file_name[:-3] 

        # 3. .gzファイルを/tmp/にダウンロードする
        blob.download_to_filename(temp_gz_path)
        print(f"Downloaded to {temp_gz_path}")

        # 4. /tmp/ で解凍する
        with gzip.open(temp_gz_path, 'rb') as f_in:
            with open(temp_decompressed_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        print(f"Decompressed to {temp_decompressed_path}")

        # 5. 解凍したファイル(CSV)を元のパスでGCSにアップロードする
        new_blob = bucket.blob(decompressed_file_name)
        new_blob.upload_from_filename(temp_decompressed_path)
        print(f"Uploaded decompressed file to gs://{bucket_name}/{decompressed_file_name}")

        # 6. GCSの元の.gzファイルを削除する
        blob.delete()
        print(f"Deleted original file: gs://{bucket_name}/{file_name}")

        # 7. 解凍後のファイル名に戻る
        return decompressed_file_name

    except Exception as e:
        print(f"Error during decompression for {file_name}: {e}")
        return None # 解凍失敗しました
        
    finally:
        # 8. 成功・失敗にかかわらず、/tmp/ディレクトリ内の一時ファイルを削除する
        if os.path.exists(temp_gz_path):
            os.remove(temp_gz_path)
        if os.path.exists(temp_decompressed_path):
            os.remove(temp_decompressed_path)
# ★★★ END: GZIP解凍用関数追加 ★★★

解決策:Stream方式で処理

Cloud Run Functionで CPU 1000m, RAM 1GBで、ストリーム処理で「ダウンロード → 解凍 → アップロード」を一括実行することで、RAM不足を回避しました。

# ★★★ START: GZIP解凍用関数(Streaming版) ★★★
def decompress_gzip_in_gcs(storage_client, bucket_name, file_name):
    """
    (ストリーミング処理バージョン)
    GCS上のgzipファイル(例: 'a/b.csv.gz')を解凍して 'a/b.csv' として保存し、元のgzファイルを削除します。
    この方法はストリーミングを使用するため、大容量の /tmp/ (メモリ) を必要としません。
    """
    
    # 1. ファイルが .gz で終わるかチェック
    if not file_name.endswith('.gz'):
        print(f"Skipping decompression: {file_name} is not a gzip file.")
        return file_name # GZIPではないので、元のファイル名をそのまま返す

    print(f"Decompressing {file_name} via streaming...")
    
    try:
        bucket = storage_client.bucket(bucket_name)
        source_blob = bucket.blob(file_name) # (入力元: data.csv.gz)

        if not source_blob.exists():
            print(f"Error: File not found: {file_name}")
            return None # ファイルが存在しない

        # 2. 解凍後の出力先blobを定義
        decompressed_file_name = file_name[:-3] # (出力先: data.csv)
        dest_blob = bucket.blob(decompressed_file_name)

        # 3. ストリーミング解凍とアップロード
        # source_blob.open('rb'):GCSからのダウンロードストリームを開く
        # gzip.GzipFile(...):そのストリームをラップし、読み取り時に自動解凍する
        # dest_blob.upload_from_file(...):解凍ストリームから読み取り、GCSに直接アップロードする
        
        with source_blob.open('rb') as f_in:
            with gzip.GzipFile(fileobj=f_in, mode='rb') as gzip_stream:
                # upload_from_fileが自動的にチャンク(分割)処理を行う
                dest_blob.upload_from_file(gzip_stream, content_type='text/csv')

        print(f"Streaming decompression complete: {decompressed_file_name}")

        # 4. GCS上の元の.gzファイルを削除
        source_blob.delete()
        print(f"Deleted original file: gs://{bucket_name}/{file_name}")

        # 5. 解凍後のファイル名を返す
        return decompressed_file_name

    except Exception as e:
        print(f"Error during streaming decompression for {file_name}: {e}")
        # エラー発生時、中途半端に作成されたCSVファイルを削除する試み
        try:
            if 'dest_blob' in locals() and dest_blob.exists():
                dest_blob.delete()
                print(f"Cleaned up incomplete file: {decompressed_file_name}")
        except Exception as ce:
            print(f"Error during cleanup: {ce}")
        return None # 失敗
# ★★★ END: GZIP解凍用関数追加 ★★★

この方法では、ファイル全体をメモリに保持せず、少しずつ処理するため、メモリ消費を最小限に抑えられます。

📊 ベンチマーク結果

gzipサイズ 解凍後サイズ 実行時間 そのうち解凍時間 そのうちロード時間 BigQueryスロットサイズ
2.9GB 27.1GB 約14分 約12分 約1分半 100
2.9GB 27.1GB 約13分 約12分 約1分弱 600
442KB 4.1MB 約20秒 約1秒 約19秒 100
442KB 4.1MB 約15秒 約1秒 約14秒 600

メリット

  • RAM不足を回避
  • ローカル不要、すべてGCP上で完結
  • 処理時間も安定して高速

まとめ

巨大な .csv.gz ファイルを扱う際、従来の一時ファイル方式ではメモリ不足に陥る可能性があります。今回のように Stream方式 を採用することで、GCP上で完結しつつ、効率的な処理が可能になります。

BigQueryへのロード前処理に悩んでいる方の参考になれば幸いです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?