【Kaggle】巨大データの取得・前処理・Google Cloud Storage へのアップロード方法
はじめに
最近 Kaggle を始め,現在は 約 300GB の巨大データ を扱うコンペに挑戦しています.
当初は下記の記事を参考にして,Google Colab 上で Kaggle Dataset をマウントして処理していました.
しかし,今のコンペのデータが急に Colab からアクセスできなくなってしまいました.
ローカル環境にダウンロードして扱うのも容量的に厳しく,API 使っても途中で切られてしまう,Kaggle Notebook も Colab も流石に 300GB は置けないという状況に困っていました.
そこで今回選んだのが,Kaggle Notebook 上で前処理したデータを Google Cloud Storage (GCS) にアップロードして再利用する方法です.
何がベストプラクティスかまだ模索中ですが,本記事ではその手順をまとめて共有します.
今回は画像ファイルをサンプルにしていますが,基本なんでも大丈夫です.
必要なもの
- Google Cloud Platform(GCP)のアカウント
- プロジェクトを作成しておく
- Google Cloud Storage(GCS)
- バケットを作成しておく
- (Option)サービスアカウントのキー(JSON)
- おそらく不要
- (Option)アップロード用のプログラム(後述)
一旦,ここでは GCP でのプロジェクトの作成方法・バケットの作成方法は割愛します.
事前準備
-
Secrets 登録
Notebook のタブから『Add-ons』->『Secrets』->『Add Secrets』で下記を参考に GCP のプロジェクト名と GCS のバケット名を記載する.LABEL VALUE project-id GCP のプロジェクト ID gcs-name バケット名 Kaggle Notebook には
Secrets
という機能があり,API キーやプロジェクト ID などの秘密情報を安全に保存できます.
Notebook からkaggle_secrets
を使って呼び出せば,コードに直書きしなくても済むので便利です. -
Google Cloud Services 有効化
Notebook のタブから『Add-ons』->『Google Cloud Services』->『Cloud Storage』を有効化しておく.
アップロード
以降,Kaggle Notebook で実行してください.
-
取得したいコンペのページから Notebook 作成
-
ライブラリインポート
ここは前処理などに必要なものをインポートしてください.
必須なのはfrom google.cloud import storage
・from kaggle_secrets import UserSecretsClient
です.
また,今回マルチプロセスにするので,multiprocessing
もインポートしています.# Cloud Storage from google.cloud import storage from kaggle_secrets import UserSecretsClient from dataclasses import dataclass import json from multiprocessing import Pool, cpu_count import os from pathlib import Path import cv2
-
Secrets 取得
次に,先ほど登録した Secrets を Python から使用できるようにします.project-id
・gcs-name
は登録した LABEL 名に置き換えてください.user_secrets = UserSecretsClient() project_id = user_secrets.get_secret("project-id") gcs_name = user_secrets.get_secret("gcs-name")
-
CFG クラス定義
次に,前処理〜アップロードで使用する情報をCFG
にしておきます.
もし,前処理を変更する場合はこちらに設定をまとめておくと便利です.
今回はリサイズする画像サイズを入れています.@dataclass class CFG: gcs_project = project_id gcs_bucket = gcs_name # Add other settings to prerpocess out_dir = "tmp" image_size = 512
-
前処理・アップロードクラス定義
次にデータの前処理の内容とアップロードの処理を含めたクラスを定義します.
これが今回のキモとなるコードです.
単純にpreprocess()
内でデータに対して前処理を行って,upload_to_gcs()
で指定したバケットにアップしているだけです.class KaggleGCSUploader: def __init__(self, cfg: CFG): self.config = cfg Path(self.config.out_dir).mkdir(exist_ok=True) def preprocess(self, file_path): # Add preprocess for training image = cv2.imread(file_path) image = cv2.resize(image, (self.config.image_size, self.config.image_size)) basename = Path(file_path).name out_path = os.path.join(self.config.out_dir, basename) cv2.imwrite(out_path, image) gcs_path = f"image/{basename}" self.upload_to_gcs(Path(out_path), gcs_path) def upload_to_gcs(self, local_path, gcs_path): storage_client = storage.Client(project=self.config.gcs_project) bucket = storage_client.bucket(self.config.gcs_bucket) """Upload local file to GCS""" if bucket is None: print(f"Warning: GCS not available, skipping upload of {local_path}") return try: blob = bucket.blob(gcs_path) blob.upload_from_filename(local_path) print(f"Uploaded {local_path} to gs://{self.config.gcs_bucket}/{gcs_path}") local_path.unlink(missing_ok=True) except Exception as e: print(f"Error uploading {local_path} to GCS: {e}")
-
マルチプロセス用の helper 関数作成
ここではマルチプロセスで実行する関数を定義しています.
内部的にはargs
を受け取って,前述のKaggleGCSUploader()
を作成し処理を実行しています.
一応,プロセスごとにオブジェクトは作成するようにしています.def process_single_file(args): """helper function for multi-processing""" file_path, config = args try: uploader = KaggleGCSUploader(config) return uploader.preprocess(file_path) except Exception as e: print(f"Error in worker process for {file_path}: {e}") return None
-
コンペ上のデータパス取得と実行
最後にコンペ上のデータパスを取得し,KaggleGCSUploader.preprocess()
に渡すようにします.今回はコンペデータのディレクトリ内のjpg
ファイルのパスを取得するようにしています.
コード内のTARGET_DATA_PATH
はコンペごとに変わる(/kaggle/input/*** のような)ので,参加するコンペを参照ください.config = CFG() image_dir = "TARGET_DATA_PATH" image_path_list = Path(image_dir).glob("**/*jpg") successful_count = 0 with Pool(min(cpu_count(), 4)) as pool: # CPU数を制限 args_list = [(str(image_path), config) for image_path in image_path_list] results = pool.map(process_single_file, args_list) for result in results: if result is not None: successful_count += 1 print(f"\nProcessing completed: {successful_count}/{len(image_path_list)} files successful")
実行結果
GCS のバケット内を確認すると,image/
フォルダの下に前処理後のデータがアップされていることが確認できます.
バックグラウンドで実行
バックグラウンドで実行しておきたい場合は,Notebook から『Save Version』->『Save & Run All』->『Save』で実行されます.
まとめ
- Kaggle Notebook から直接 Google Cloud Storage にデータをアップロード
- Secrets を使用
- 前処理の部分を自由にカスタムすれば,DICOM -> NPY 変換なども同じ要領で実装可能
これでローカル環境に大容量データをダウンロードする必要がなくなり,Colab や GCS を活用した効率的なワークフローが実現できます 🚀
API キーやバケット名などは取り扱いにお気をつけください.