LoginSignup
3
2

More than 1 year has passed since last update.

Python で Amazon S3 の MultiPartUpload を扱う

Posted at

Amazon S3 に巨大ファイルを置きたいときに MultiPartUpload を使いたくなることが多々あるが、Python + boto3 からそのまま扱おうとするとややこしいので自分が使いやすいようなクラスを実装した。

実装

class AmazonS3MultipartUploader:
    """Amazon S3 に巨大なオブジェクトに少しずつ書き込んでいくためのクラス"""

    def __init__(self, s3, *, part_size_threshold: int = 5 * 1024 * 1024, **kwargs):
        self.s3 = s3
        self.mpu = self.s3.create_multipart_upload(**kwargs)
        self.kwargs = {
            "Bucket": self.mpu["Bucket"],
            "Key": self.mpu["Key"],
            "UploadId": self.mpu["UploadId"],
        }
        self.threshold = part_size_threshold
        self.parts = []
        self._init_part()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_value:
            self.abort()
            raise exc_value
        self.complete()

    def _init_part(self):
        self.buf = b""

    def _upload_part(self):
        part_number = len(self.parts) + 1
        response = self.s3.upload_part(PartNumber=part_number, Body=self.buf, **self.kwargs)
        self.parts.append(
            {
                "PartNumber": part_number,
                "ETag": response["ETag"],
            }
        )

    def complete(self):
        if len(self.buf) > 0:
            self._upload_part()
        if len(self.parts) == 0:
            self.abort()
            return
        self.s3.complete_multipart_upload(MultipartUpload={"Parts": self.parts}, **self.kwargs)

    def abort(self):
        self.s3.abort_multipart_upload(**self.kwargs)

    def write(self, b: bytes):
        self.buf += b
        if len(self.buf) > self.threshold:
            self._upload_part()
            self._init_part()

使用例

以下は、 s3://foo/parts/ 内にある大量の .csv.gz ファイル (ヘッダ行なし) をすべて結合して s3://foo/concatenated.csv.gz に保存したい、という場合のサンプルコード。

Linux コマンドでいえば cat parts/* > concatenated.csv.gz で実現できるような操作を、Amazon S3 オブジェクトに対してやりたい、というイメージ。データ量が少なければ全ファイル読み込んで結合して PutObject すればよいが、メモリに乗り切らない場合は MultiPartUpload を用いて読み込みながらアップロードしていく必要がある。

import boto3

s3 = boto3.client("s3")
bucket = "foo"

with AmazonS3MultipartUploader(s3, Bucket=bucket, Key="concatenated.csv.gz") as mpu:
    for obj in list_all_objects(s3, Bucket=bucket, Prefix="parts/"):
        b = s3.get_object(Bucket=bucket, Key=obj["Key"])["Body"].read()
        mpu.write(b)

ここで使用している list_all_objects ジェネレータについては以下の記事を参照。

コンテキストマネージャとして実装して with 文で使えるようにすることで、使う側で complete や abort などの操作を不要にした。

あとは mpu.write に bytes データを放り込んでいけばいいだけなのだが、 bytes 型でさえあればいいのでかなり自由になんでもできる。(例えば BytesIO と組み合わせて Pandas DataFrame を .to_csv() したものを放り込んだり。)

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2