以前、Google DriveからGoogle Cloud Storageにファイルをコピーする方法1を紹介しました。当該記事はNode.js向けの内容だったのですが、Python向けに対応できたのでその方法を紹介いたします。
この手法のメリット・デメリット
メリット
- データ送受信のチャンクサイズにもよるものの、メモリ使用量を抑えることができる
- コード上でのファイル生成が不要
デメリット
- ダウンロードとアップロードを交互に行うので、エラーハンドリング・デバッグが若干煩雑になる
- 途中で失敗してもGoogle Cloud Storage上にファイルができるので、後から削除が必要になる
前提
- コードの実況環境は Google Cloud Function(第一世代)/Pythonとする
事前準備
-
GCPプロジェクトの「APIとライブラリ」でGoogle Drive APIとCloud Storageが使えるように設定しておく
-
ローカル開発時、GCPの認証を通せるように設定しておく(
gcloud auth application-default login
やサービスアカウントキーなど) -
依存ライブラリのインストール。以下ライブラリが必要。
- google-api-python-client: Drive APIを呼び出すために使う。検証済みバージョンは2.52.0
- google-cloud-storage: Google Cloud Storageアクセスに使用。検証済みバージョンは2.4.0
コード
以下、実装したコードです。本コードのポイントはいくつかあり、
- Drive APIによるファイルダウンロードを、io.BytesIOクラスを継承したクラスのreadメソッド上で実装する。
- 同メソッド内でダウンロードしつつ、tell()およびread()を使ってダウンロードしたバイト列を取得・returnする
- 内部でio.BytesIOクラスのインスタンスを持っておき、データの授受は当インスタンスで行う
- ChunkedDriveFileStreamないしupload_by_streamにおいて、chunk_sizeは同じバイトサイズを指定する
- 読み書きのサイズが整合性とれず、正しくデータをチャンクコピーできないため
となります。
import io
from googleapiclient.http import MediaIoBaseDownload
class ChunkedDriveFileStream(io.BytesIO):
def __init__(
self, drive_service, # googleapiclient.discovery.build('drive', 'v3', credentials=...)で得られたDrive APIアクセス用サービス
file_id: str,
chunk_size: int
):
super().__init__()
request = drive_service.files().get_media(fileId=file_id)
self._fd = io.BytesIO()
self._downloader = MediaIoBaseDownload(
self._fd, request, chunksize=chunk_size)
self._done = False
self.chunk_size = chunk_size
def tell(self) -> int:
return self._fd.tell()
def flush(self) -> None:
self._fd.flush()
def close(self) -> None:
self._fd.close()
def read(self, *args, **kwargs) -> bytes:
"""Google Driveからchunk_size[Byte]だけデータをダウンロードの上、得られたデータをGoogle Cloud Storageアップロード用に返却する
"""
if self._done:
return b"" # 処理完了時は空のバイト列を渡して完了通知する
pos = self._fd.tell()
progress, done = self._downloader.next_chunk()
if done:
self._done = done
after_pos = self._fd.tell()
read_bytes = after_pos - pos # next_chunkで読みだしたバイト数を取得
self._fh.seek(pos)
return self._fd.read(read_bytes)
Google Cloud Storageにアップロードする処理
以下、一例です。ポイントは2点で、
- bucket.blob呼び出し時にchunk_sizeを指定する
- blob.upload_from_fileにChunkedDriveFileStreamクラスのインスタンスを渡す
となります。
import io
from google.cloud import storage
class CloudStorage:
def __init__(self, project_id: str):
self.project_id = project_id
def upload_by_stream(chunk_stream: ChunkedDriveFileStream, file_name: str):
client = storage.Client(self.project_id)
bucket = storage.Bucket(
client, name="your-bucket-name") # バケット名は適宜指定する
blob = bucket.blob(
file_name,
chunk_size=chunk_stream.chunk_size)
# 注: GoogleCloudErrorなどについて、必要な例外処理を行うようにしてください
blob.upload_from_file(byte_stream)
以上により、chunk_sizeごとにデータをダウンロード・アップロードすることができます!