0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS S3にファイルをアップロードしてもらう(5GB超〜5TB)

Last updated at Posted at 2025-02-11

環境

Amazon Web Services
MacOS
Python3

AWS S3にファイルをアップロードしてもらう(5GB超〜5TB)

外部の人にS3バケットにファイルをアップロードしてもらう手段として、
Amazon S3 Presigned URL
があります。これは、
「AWS CLIが〜」
とか、
「S3が〜」
とか、
そういうまだるっこしい話は抜きにして、
「URLをお伝えするんで、それに対して curl で PUT してください」
という、エンジニアの9割くらいの人にはささっと対応して貰える感じでファイルを受け取れる、とても便利なモノです。

ただ、ファイルの容量が5GBを超えると、 EntityTooLarge エラーが発生します。

<Error>
    <Code>
        EntityTooLarge
    </Code>
    <Message>
        Your proposed upload exceeds the maximum allowed size
    </Message>
</Error>

巨大サイズのオブジェクトをアップロードするには マルチパートアップロード を使う

アップロードするファイルのサイズが5GBを超えて5TBまでの場合は、マルチパートアップロードを使います。これは、巨大ファイルを最大1,000個に分けてURLを発行し、アップロードを行い、AWS側で合体させてS3に保存するものです。
単一のファイルをアップロードするよりも、パラレルにデータをアップロードできるため、高速化も見込めるらしいです。

(1) AWSのクレデンシャルを設定する

(2) URLを発行する
データ受け入れ側が、APIを利用してURLを発行します。
bucket_name、key_name、content_type、expires_in(URLの有効期限:秒)を指定します。

python generate_presigned_urls.py

generate_presigned_urls.py
#!/usr/bin/env python3
import boto3
import json
from botocore.config import Config


def generate_multipart_urls(bucket_name, key_name, content_type, expires_in):
    """マルチパートアップロードURL生成"""
    s3_client = boto3.client(
        's3',
        region_name='ap-northeast-1',  # 東京リージョンを明示的に指定
        config=Config(
            s3={'addressing_style': 'path'},
            signature_version='s3v4'
        )
    )

    try:
        urls = {}

        # 1. アップロード開始用URL
        response = s3_client.create_multipart_upload(
            Bucket=bucket_name,
            Key=key_name,
            ContentType=content_type
        )
        upload_id = response['UploadId']
        urls['upload_id'] = upload_id  # 実際の UploadId を保存

        urls['initiate'] = s3_client.generate_presigned_url(
            'create_multipart_upload',
            Params={
                'Bucket': bucket_name,
                'Key': key_name,
                'ContentType': content_type
            },
            ExpiresIn=expires_in,
            HttpMethod='POST'
        )

        # 2. パート用URL
        urls['parts'] = {}
        for part_number in range(1, 1001):
            urls['parts'][part_number] = s3_client.generate_presigned_url(
                'upload_part',
                Params={
                    'Bucket': bucket_name,
                    'Key': key_name,
                    'UploadId': upload_id,
                    'PartNumber': part_number
                },
                ExpiresIn=expires_in
            )

        # 3. 完了用URL
        urls['complete'] = s3_client.generate_presigned_url(
            'complete_multipart_upload',
            Params={
                'Bucket': bucket_name,
                'Key': key_name,
                'UploadId': upload_id
            },
            ExpiresIn=expires_in
        )

        # 4. 中止用URL
        urls['abort'] = s3_client.generate_presigned_url(
            'abort_multipart_upload',
            Params={
                'Bucket': bucket_name,
                'Key': key_name,
                'UploadId': upload_id
            },
            ExpiresIn=expires_in
        )

        with open('upload_urls.json', 'w') as f:
            json.dump(urls, f, indent=2)

        print(f"Generated URLs for 1000 parts")
        print(f"Upload ID: {upload_id}")
        print(f"URLs will expire in {expires_in} seconds")
        print(f"URLs have been saved to 'upload_urls.json'")

        return urls

    except Exception as e:
        print(f"Error generating URLs: {e}")
        return None


if __name__ == "__main__":
    config = {
        'bucket_name': 'SAMPLE_BUCKET',
        'key_name': 'sample.MP4',
        'content_type': 'video/mp4',
        'expires_in': 3600
    }

    generate_multipart_urls(**config)

URLは単一ではなく、1,002個のURLが記述されたjsonファイルです。
生成された presigned_urls.json を、オブジェクトをアップロードしてくれる人に渡します。

(3) ファイルをアップロードする
URLを受け取った側はAPIを利用してオブジェクトのアップロードを行います。
受け取った presigned_urls.jsonupload_file.py は同じ階層にある方が良いでしょう。

upload_file.py
#!/usr/bin/env python3
import os
import json
import time
import requests
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime


class ProgressMonitor:
    def __init__(self, total_size):
        self._total = float(total_size)
        self._current = 0
        self._start_time = time.time()

    def update(self, size):
        self._current += size
        now = time.time()
        duration = now - self._start_time
        mb_transferred = self._current / (1024 ** 2)
        total_mb = self._total / (1024 ** 2)
        percentage = (self._current / self._total) * 100
        speed = (self._current / duration) / (1024 ** 2) if duration > 0 else 0

        print(f"{datetime.now().strftime('%H:%M:%S')} - "
              f"{mb_transferred:.2f}MB / {total_mb:.2f}MB "
              f"({percentage:.1f}%) "
              f"Speed: {speed:.1f}MB/s")


def upload_file(file_path, urls_file='upload_urls.json'):
    if not os.path.exists(file_path):
        print(f"File not found: {file_path}")
        return False

    with open(urls_file) as f:
        urls = json.load(f)

    upload_id = urls.get('upload_id')
    if not upload_id:
        print("Upload ID not found in JSON")
        return False

    file_size = os.path.getsize(file_path)
    monitor = ProgressMonitor(file_size)
    chunk_size = 8 * 1024 * 1024  # 8MB

    try:
        completed_parts = []

        def upload_part(args):
            part_number, url = args

            # print(f"Uploading part {part_number}")
            # print(f"URL: {url}")

            with open(file_path, 'rb') as f:
                f.seek((part_number - 1) * chunk_size)
                data = f.read(chunk_size)
                if not data:
                    return None

                response = requests.put(url, data=data)
                if response.status_code != 200:
                    print(f"Part {part_number} failed with status {response.status_code}")
                    print(f"Response headers: {response.headers}")
                    print(f"Response body: {response.text}")
                    raise Exception(f"Failed to upload part {part_number}")

                monitor.update(len(data))
                return {
                    'PartNumber': part_number,
                    'ETag': response.headers['ETag']
                }

        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = []
            for part_number, url in urls['parts'].items():
                futures.append(
                    executor.submit(upload_part, (int(part_number), url))
                )

            for future in futures:
                result = future.result()
                if result:
                    completed_parts.append(result)

        if not completed_parts:
            raise Exception("No parts were uploaded successfully")

        complete_url = urls['complete']
        complete_xml = '<CompleteMultipartUpload>'
        for part in sorted(completed_parts, key=lambda x: x['PartNumber']):
            complete_xml += f'''
                <Part>
                    <PartNumber>{part['PartNumber']}</PartNumber>
                    <ETag>{part['ETag']}</ETag>
                </Part>'''
        complete_xml += '</CompleteMultipartUpload>'

        response = requests.post(complete_url, data=complete_xml)
        if response.status_code != 200:
            print(f"Failed to complete upload. Status code: {response.status_code}")
            print(f"Response: {response.text}")
            raise Exception("Failed to complete upload")

        print("\nUpload completed successfully!")
        return True

    except Exception as e:
        print(f"\nUpload failed: {e}")
        if upload_id:
            try:
                abort_url = urls['abort']
                requests.delete(abort_url)
                print("Upload aborted")
            except Exception as abort_error:
                print(f"Failed to abort upload: {abort_error}")
        return False


if __name__ == "__main__":
    import sys

    if len(sys.argv) != 2:
        print("Usage: python upload_file.py <file_path>")
        sys.exit(1)

    upload_file(sys.argv[1])

python upload_file.py {アップロードするファイル名}

% python upload_file.py ScreenRecording_02-06-2025_22-08-25_1.MP4
16:33:37 - 8.00MB / 6820.65MB (0.1%) Speed: 5.4MB/s
…(略)
16:35:26 - 6804.65MB / 6820.65MB (99.8%) Speed: 61.5MB/s
16:35:26 - 6812.65MB / 6820.65MB (99.9%) Speed: 61.5MB/s
16:35:26 - 6820.65MB / 6820.65MB (100.0%) Speed: 61.5MB/s

Upload completed successfully!

2分弱で6.8GBのファイルをアップロードできました。

Upload completed successfully! と表示されたら、ファイルを受け取る側にS3にオブジェクトがあるか、確認してもらいましょう。

S3バケットにオブジェクトができているか、見てみる

image.png

🎉

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?