Amazon S3 に巨大ファイルを置きたいときに MultiPartUpload を使いたくなることが多々あるが、Python + boto3 からそのまま扱おうとするとややこしいので自分が使いやすいようなクラスを実装した。
実装
class AmazonS3MultipartUploader:
"""Amazon S3 に巨大なオブジェクトに少しずつ書き込んでいくためのクラス"""
def __init__(self, s3, *, part_size_threshold: int = 5 * 1024 * 1024, **kwargs):
self.s3 = s3
self.mpu = self.s3.create_multipart_upload(**kwargs)
self.kwargs = {
"Bucket": self.mpu["Bucket"],
"Key": self.mpu["Key"],
"UploadId": self.mpu["UploadId"],
}
self.threshold = part_size_threshold
self.parts = []
self._init_part()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_value:
self.abort()
raise exc_value
self.complete()
def _init_part(self):
self.buf = b""
def _upload_part(self):
part_number = len(self.parts) + 1
response = self.s3.upload_part(PartNumber=part_number, Body=self.buf, **self.kwargs)
self.parts.append(
{
"PartNumber": part_number,
"ETag": response["ETag"],
}
)
def complete(self):
if len(self.buf) > 0:
self._upload_part()
if len(self.parts) == 0:
self.abort()
return
self.s3.complete_multipart_upload(MultipartUpload={"Parts": self.parts}, **self.kwargs)
def abort(self):
self.s3.abort_multipart_upload(**self.kwargs)
def write(self, b: bytes):
self.buf += b
if len(self.buf) > self.threshold:
self._upload_part()
self._init_part()
使用例
以下は、 s3://foo/parts/
内にある大量の .csv.gz
ファイル (ヘッダ行なし) をすべて結合して s3://foo/concatenated.csv.gz
に保存したい、という場合のサンプルコード。
Linux コマンドでいえば cat parts/* > concatenated.csv.gz
で実現できるような操作を、Amazon S3 オブジェクトに対してやりたい、というイメージ。データ量が少なければ全ファイル読み込んで結合して PutObject すればよいが、メモリに乗り切らない場合は MultiPartUpload を用いて読み込みながらアップロードしていく必要がある。
import boto3
s3 = boto3.client("s3")
bucket = "foo"
with AmazonS3MultipartUploader(s3, Bucket=bucket, Key="concatenated.csv.gz") as mpu:
for obj in list_all_objects(s3, Bucket=bucket, Prefix="parts/"):
b = s3.get_object(Bucket=bucket, Key=obj["Key"])["Body"].read()
mpu.write(b)
ここで使用している list_all_objects
ジェネレータについては以下の記事を参照。
コンテキストマネージャとして実装して with
文で使えるようにすることで、使う側で complete や abort などの操作を不要にした。
あとは mpu.write
に bytes データを放り込んでいけばいいだけなのだが、 bytes 型でさえあればいいのでかなり自由になんでもできる。(例えば BytesIO
と組み合わせて Pandas DataFrame を .to_csv()
したものを放り込んだり。)