背景と課題
Cloud Functionsを使って、GCS(Google Cloud Storage)上に保存された .csv.gz ファイル(2.9GB)をBigQueryにインポートしようとしたところ、以下のような問題が発生しました。
- CSVファイルのロードより、gzipファイルのロード時間が長い
- 解凍後のCSVサイズが約27GBと巨大
- 一時ファイル方式(
/tmp)やインメモリ解凍では、RAM不足(32GB必要)で処理失敗 -
gsutilには.gzファイルの解凍機能がない - ローカルで展開して再アップロードするのは非現実的
# ★★★ START: GZIP解凍用関数追加 ★★★
def decompress_gzip_in_gcs(storage_client, bucket_name, file_name):
"""
GCS上のgzipファイル(例: 『a/b.csv.gz』)をその場で展開し 『a/b.csv』 として保存し、元のgzファイルを削除します。
ファイルが.gzで終わらない場合、元のfile_nameをそのまま返します。
成功時は展開後のファイル名 (str) を返し、失敗時は None を返します。
"""
# 1. ファイルが .gz 形式かどうかを確認する
if not file_name.endswith('.gz'):
print(f"Skipping decompression: {file_name} is not a gzip file.")
return file_name # GZIPではなく、元のファイル名を返す
print(f"Decompressing {file_name} in place...")
# /tmp/ディレクトリ内の一時ファイルのパス設定
# GCSパス内の『/』を『_』に置換し、/tmp/内に不要なディレクトリが作成されるのを防止
temp_gz_path = f"/tmp/{file_name.replace('/', '_')}"
temp_decompressed_path = temp_gz_path[:-3] # .gzを削除
try:
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
if not blob.exists():
print(f"Error: File not found: {file_name}")
return None # ファイルが存在しない
# 2. 解凍後のGCSターゲットファイルのパス(例: 「path/to/file.csv」)
decompressed_file_name = file_name[:-3]
# 3. .gzファイルを/tmp/にダウンロードする
blob.download_to_filename(temp_gz_path)
print(f"Downloaded to {temp_gz_path}")
# 4. /tmp/ で解凍する
with gzip.open(temp_gz_path, 'rb') as f_in:
with open(temp_decompressed_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
print(f"Decompressed to {temp_decompressed_path}")
# 5. 解凍したファイル(CSV)を元のパスでGCSにアップロードする
new_blob = bucket.blob(decompressed_file_name)
new_blob.upload_from_filename(temp_decompressed_path)
print(f"Uploaded decompressed file to gs://{bucket_name}/{decompressed_file_name}")
# 6. GCSの元の.gzファイルを削除する
blob.delete()
print(f"Deleted original file: gs://{bucket_name}/{file_name}")
# 7. 解凍後のファイル名に戻る
return decompressed_file_name
except Exception as e:
print(f"Error during decompression for {file_name}: {e}")
return None # 解凍失敗しました
finally:
# 8. 成功・失敗にかかわらず、/tmp/ディレクトリ内の一時ファイルを削除する
if os.path.exists(temp_gz_path):
os.remove(temp_gz_path)
if os.path.exists(temp_decompressed_path):
os.remove(temp_decompressed_path)
# ★★★ END: GZIP解凍用関数追加 ★★★
解決策:Stream方式で処理
Cloud Run Functionで CPU 1000m, RAM 1GBで、ストリーム処理で「ダウンロード → 解凍 → アップロード」を一括実行することで、RAM不足を回避しました。
# ★★★ START: GZIP解凍用関数(Streaming版) ★★★
def decompress_gzip_in_gcs(storage_client, bucket_name, file_name):
"""
(ストリーミング処理バージョン)
GCS上のgzipファイル(例: 'a/b.csv.gz')を解凍して 'a/b.csv' として保存し、元のgzファイルを削除します。
この方法はストリーミングを使用するため、大容量の /tmp/ (メモリ) を必要としません。
"""
# 1. ファイルが .gz で終わるかチェック
if not file_name.endswith('.gz'):
print(f"Skipping decompression: {file_name} is not a gzip file.")
return file_name # GZIPではないので、元のファイル名をそのまま返す
print(f"Decompressing {file_name} via streaming...")
try:
bucket = storage_client.bucket(bucket_name)
source_blob = bucket.blob(file_name) # (入力元: data.csv.gz)
if not source_blob.exists():
print(f"Error: File not found: {file_name}")
return None # ファイルが存在しない
# 2. 解凍後の出力先blobを定義
decompressed_file_name = file_name[:-3] # (出力先: data.csv)
dest_blob = bucket.blob(decompressed_file_name)
# 3. ストリーミング解凍とアップロード
# source_blob.open('rb'):GCSからのダウンロードストリームを開く
# gzip.GzipFile(...):そのストリームをラップし、読み取り時に自動解凍する
# dest_blob.upload_from_file(...):解凍ストリームから読み取り、GCSに直接アップロードする
with source_blob.open('rb') as f_in:
with gzip.GzipFile(fileobj=f_in, mode='rb') as gzip_stream:
# upload_from_fileが自動的にチャンク(分割)処理を行う
dest_blob.upload_from_file(gzip_stream, content_type='text/csv')
print(f"Streaming decompression complete: {decompressed_file_name}")
# 4. GCS上の元の.gzファイルを削除
source_blob.delete()
print(f"Deleted original file: gs://{bucket_name}/{file_name}")
# 5. 解凍後のファイル名を返す
return decompressed_file_name
except Exception as e:
print(f"Error during streaming decompression for {file_name}: {e}")
# エラー発生時、中途半端に作成されたCSVファイルを削除する試み
try:
if 'dest_blob' in locals() and dest_blob.exists():
dest_blob.delete()
print(f"Cleaned up incomplete file: {decompressed_file_name}")
except Exception as ce:
print(f"Error during cleanup: {ce}")
return None # 失敗
# ★★★ END: GZIP解凍用関数追加 ★★★
この方法では、ファイル全体をメモリに保持せず、少しずつ処理するため、メモリ消費を最小限に抑えられます。
📊 ベンチマーク結果
| gzipサイズ | 解凍後サイズ | 実行時間 | そのうち解凍時間 | そのうちロード時間 | BigQueryスロットサイズ |
|---|---|---|---|---|---|
| 2.9GB | 27.1GB | 約14分 | 約12分 | 約1分半 | 100 |
| 2.9GB | 27.1GB | 約13分 | 約12分 | 約1分弱 | 600 |
| 442KB | 4.1MB | 約20秒 | 約1秒 | 約19秒 | 100 |
| 442KB | 4.1MB | 約15秒 | 約1秒 | 約14秒 | 600 |
メリット
- RAM不足を回避
- ローカル不要、すべてGCP上で完結
- 処理時間も安定して高速
まとめ
巨大な .csv.gz ファイルを扱う際、従来の一時ファイル方式ではメモリ不足に陥る可能性があります。今回のように Stream方式 を採用することで、GCP上で完結しつつ、効率的な処理が可能になります。
BigQueryへのロード前処理に悩んでいる方の参考になれば幸いです。