3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

S3への動画アップロードを3分の1にできた話(マルチパートアップロード+並列タスク実行)

Posted at

S3へのファイルのアップロードに時間がかかるのでなんとか短縮できないかと思い試行錯誤した結果、アップロード時間を3分の1にできたお話です。
S3のマルチパートアップロードとpythonのconcurrent.futuresというライブラリを使うことで実現しました。
今回使ったファイルは16.2GBの動画ファイルとなります。

結果

私の環境では16.2GBの動画をアップロードすると下記のような結果になりました。
変数を調整することでさらに速度を上げることができるかもしれませんが一旦下記で満足しました。

  • AWSコンソールからのアップロード 7MB/s
  • マルチパートアップロードのみ 11MB/s(1GB 1:30)
  • マルチパートアップロード+並列タスク実行 25MB/s(16.2GB 10:30)

S3マルチパートアップロード

用途としては下記のような場合に使います。
単一のオブジェクトをパートに分けてアップロードして、全てのパートのアップロードが終わるとそのパートを組み立ててオブジェクトを作るという機能です。下記のようなメリットがあります。

  • スループットの向上
     - パートを並列にアップロードすることでスループットを向上
  • ネットワーク問題からの迅速な回復
     - パートサイズが小さいほど、ネットワークエラーにより失敗したアップロードを再開する際の影響を最小限に抑えることができます。
  • オブジェクトのアップロードの一時停止と再開
    - オブジェクトの作成中でもアップロードを開始できます。

concurrent.futures -- 並列タスク実行

非同期に呼び出しが可能なモジュールです。
ThreadPoolExecutorというサブクラスを使います。
パラメータとして重要なものは"max_workers"です。この値を増やすことで並列処理の数が増えてアップロードの速度が向上します。
あまりに大きくすると帯域幅やリソース制限によってアップロード速度が低下することがあります。
ちなみにドキュメントにも記載されてますが"max_workers"がNoneか指定されない場合はmax_workerの値はマシンのプロセッサの数に 5 を掛けたものになります。

tqdm

こちらのpythonパッケージも使ってます。これは進捗を表示させるものです。
pythonで実行するとS3のマルチパートアップロードの進捗が見えないため導入しました。
下記のような感じでターミナルに表示されます。
スクリーンショット 2023-04-15 11.32.45.png

マルチパートアップロードだけを使った場合のコード

下記のコードを実行しました。

python3.10.2
import boto3
import os
from tqdm import tqdm

aws_access_key_id = 'AWSのアクセスキーを入力してください'
aws_secret_access_key = 'AWSのシークレットアクセスキーを入力してください'

s3 = boto3.client("s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

file_path = "ローカルのファイルパス"
bucket = "保存先のバケット名"
key = "保存するオブジェクトの名前"

file_size = os.stat(file_path).st_size

create_multipart_upload_response = s3.create_multipart_upload(Bucket=bucket, Key=key)
upload_id = create_multipart_upload_response["UploadId"]

part_size = 50 * 1024 * 1024
parts_count = -(-file_size // part_size)

parts = []
with open(file_path, "rb") as f:
    # tqdmを使ってプログレスバーを表示
    with tqdm(total=file_size, unit="B", unit_scale=True, desc=f"Uploading {file_path}") as pbar:
        for i in range(parts_count):
            part_number = i + 1
            part_data = f.read(part_size)

            upload_part_response = s3.upload_part(
                Bucket=bucket,
                Key=key,
                UploadId=upload_id,
                PartNumber=part_number,
                Body=part_data,
            )

            parts.append(
                {"ETag": upload_part_response["ETag"], "PartNumber": part_number}
            )

            # プログレスバーを更新
            pbar.update(len(part_data))

s3.complete_multipart_upload(
    Bucket=bucket,
    Key=key,
    UploadId=upload_id,
    MultipartUpload={"Parts": parts},
)

print(f"File '{file_path}' uploaded to '{bucket}/{key}' as a multipart upload.")

ここでのポイントは"part_size"です。
上記のコードですとアップロードするファイルを50MB毎のパートに分割してアップロードすることを意味しています。
この値"50"という数字を変更することで一つのパートのファイルサイズが増えて、アップロードするパート数が減るためアップロード時間が短縮されます。
ただし、大きくしすぎるとネットワークエラーが出た際に再試行に時間がかかるので注意が必要です。
僕の環境では40〜50MBぐらいが一番早そうだったので45MBにしてます。ご自分の環境で試してみてください

余談ですがaws_access_key_idとaws_secret_access_keyをコードに記載していますがこちらは非推奨ですので本番環境では控えてください。
ルートユーザーのアクセスキーの発行もAWSのベストプラクティスに沿うと非推奨となります。
アクセスキーの発行方法は下記を参照ください。

実行結果

コードの結果は下記です。
最初にも記載した通り1:30で約1GBとなるので約11MB/sとなります。
スクリーンショット 2023-04-15 11.49.39.png

マルチパートアップロード+並列タスク実行を使った場合のコード

下記のコードを実行しました。

python3.10.2
import boto3
import os
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

aws_access_key_id = 'AWSのアクセスキーを入力してください'
aws_secret_access_key = 'AWSのシークレットアクセスキーを入力してください'

s3 = boto3.client("s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

file_path = "ローカルにあるアップロードしたいのファイルパス"
bucket = "保存先のバケット名"
key = "保存するオブジェクトの名前"

file_size = os.stat(file_path).st_size

create_multipart_upload_response = s3.create_multipart_upload(Bucket=bucket, Key=key)
upload_id = create_multipart_upload_response["UploadId"]

part_size = 50 * 1024 * 1024
parts_count = -(-file_size // part_size)

parts = [None] * parts_count


def upload_part(part_number, part_data):
    upload_part_response = s3.upload_part(
        Bucket=bucket,
        Key=key,
        UploadId=upload_id,
        PartNumber=part_number,
        Body=part_data,
    )

    return {"ETag": upload_part_response["ETag"], "PartNumber": part_number}


with open(file_path, "rb") as f:
    with tqdm(total=file_size, unit="B", unit_scale=True, desc=f"Uploading {file_path}") as pbar:
        with ThreadPoolExecutor(max_workers=8) as executor:
            futures = []

            for i in range(parts_count):
                part_number = i + 1
                part_data = f.read(part_size)
                futures.append(executor.submit(upload_part, part_number, part_data))

            for future in as_completed(futures):
                part_info = future.result()
                parts[part_info["PartNumber"] - 1] = part_info

                # プログレスバーを更新
                pbar.update(part_size)

s3.complete_multipart_upload(
    Bucket=bucket,
    Key=key,
    UploadId=upload_id,
    MultipartUpload={"Parts": parts},
)

print(f"File '{file_path}' uploaded to '{bucket}/{key}' as a multipart upload.")

ここでのポイントはThreadPoolExecutor内の"max_workers"の値です。
この値を増やすことで並列処理の数を調整することでアップロードの速度を向上させます。
ただし上でも書きましたがあまりに大きくすると帯域幅やリソース制限によってアップロード速度が低下することがあります。
私の環境では8と設定しています。

実行結果

コードの結果は下記です。
最初にも記載した通り10:32で16.2GBとなるので約25MB/sとなります。
スクリーンショット 2023-04-15 11.59.16.png

感想

パラメータを調整することでもっとアップロード時間が早くなるのではないかと思います。
ファイルサイズによっても適切なパラメータの設定が異なるので試行錯誤が必要だと思います。
AWS CLIで実行できれば実行中でもPCが重くならないのではと考えてます。(並列実行が可能なのかは不明)

最後までご覧いただきありがとうございました。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?