2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Mars Flag Advent Calendar 2023

Day 18

pythonでAWSの処理にasync/multiprocessingを使ってみた。

Last updated at Posted at 2023-12-17

はじめに

Mars Flag Advent Calendar 2023 18日目担当の小川です。よろしくお願いします。
この記事は、以前から個人的に興味を持っていた物事を、Advent Calenderという機会において調査した記事となっていることをご了承ください。

本記事の目的

効率よく大量のデータを取得・処理をしたいというときには、並立/並行処理をして、うまくリソースを活用して処理時間を短くしたいと考えたところ、Pythoではasync, multi threading, multiprocessingの手法を使うことも検討すると思います。
参考にした記事には指針はあげられていますが、今回は、上記のような状態として、S3上に大量のファイルが存在し、それを加工するような時には、それぞれの手法がどのような傾向があるのかを調べてみたいと思います。

比較条件

比較する処理

 今回の比較では、asyncとmulti processingの2つについて比較を行いました。

条件

試験内容は以下となります。

  • AWSへはSwitchRoleしてアカウント切り替えて対象環境を利用
  • 指定したファイル内のS3上のファイルリスト(100件)について、すべて行を読み込み、キューに投入
  • async/multi processingにて、作業用のworkerを起動
  • async/multi processingのworkerは、キューから1つづつデータ取り出し、head_object(存在チェック)/get_object(ファイル取得)を実施
  • キューから取り出すデータがなくなったら、workerは終了
  • すべてのworkerが終了したらプロセス終了
  • プロセスの開始~プロセスの終了までを測定(timeコマンド使用)

試験対象のパラメータ

試験対象のパラメータは以下となります

  • workerの数
  • 実行環境のCPU数(1,2,4,8) ※ Oracle VirtualBox上の設定で実施

環境

Windows11ホストにOracle VirtualBoxを起動し、Linux(Ubuntu22.04LTS)上で起動しています。

CPU: Core i7-13700H ※Oracle Virtul Boxの設定で、使用するCPUを1,2,4と変えています。
Memory: 10GB

試験対象コード

async によるコード

asyncでは、AWSアクセスにasyncbotocoreを利用し、キューに Async Queueを利用しました。

import asyncio

import botocore.exceptions
from aiobotocore.session import AioSession, get_session
from types_aiobotocore_s3 import S3Client
from types_aiobotocore_sts import STSClient


ACCESS_KEY="XXXXX"
SECRET_ACCESS_KEY="YYYYY"
REGION="ZZZZZ"
SESSION_NAME="session"
ASSUMED_ROLE"XXXXX"
BUCKER_NAME="XXXXX"

async def worker(bucket: str, queue: asyncio.Queue, number: int):

    session = get_session()

    # STSを利用しAWSアカウント切り替えつつ、S3クライアント作成
    async with session.create_client(
        'sts',
        aws_access_key_id=ACCESS_KEY,
        aws_secret_access_key=SECRET_ACCESS_KEY,
        region_name=REGION,
    ) as sts_client:
        sts_client: STSClient

        # AWSアカウントの切り替え
        response = await sts_client.assume_role(
            RoleArn=ASSUMED_ROLE,
            RoleSessionName=SESSION_NAME,
        )
        async with session.create_client(
            's3',
            aws_access_key_id=response["Credentials"]["AccessKeyId"],
            aws_secret_access_key=response["Credentials"]["SecretAccessKey"],
            aws_session_token=response["Credentials"]["SessionToken"],
            region_name=REGION,
        ) as s3_client:
            s3_client: S3Client

            while True:
                # 処理対象取得
                if queue.empty():
                    break
                item: str = await queue.get()

                # file存在チェック
                is_file_exist = False
                try:
                    response = await s3_client.head_object(Bucket=bucket, Key=item)
                    is_file_exist = True
                    print(f"{number} {webp_item} found.")
                except botocore.exceptions.ClientError as e:
                    if e.response["Error"]["Code"] == "404":
                        print(f"{number} {webp_item} not found.")
                    else:
                        # それ以外の例外は異常発生とする
                        print(e)
                except Exception as e:
                    print(e)

                if not is_file_exist:
                    queue.task_done()
                    continue

                # fileダウンロード
                try:
                    response = await s3_client.get_object(Bucket=bucket, Key=item)
                    body = await response.get("Body").read()
                    print(f"{number} {item} download success.")
                except Exception:
                    print(f"{number} {item} download failed.")

                queue.task_done()

            return


async def main():

    # ファイルの内容を読み取り、キューに詰め込む
    file_path = "list.txt"

    with open(file_path) as f:
        lines = f.readlines()

    lines = [line.rstrip('\n') for line in lines]

    queue = asyncio.Queue(maxsize=len(lines))
    for line in lines:
        queue.put_nowait(line)

    # workerの起動・処理実施
    tasks = []
    for i in range(8):
        task = asyncio.create_task(go_worker(bucket=BUCKER_NAME, queue=queue, number=i))
        tasks.append(task)

    # キューの中身のデータ処理が全て完了するまで待機
    await queue.join()


if __name__ == '__main__':
    asyncio.run(main())

multiprocessing によるコード

multiprocessingでは、AWSアクセスにboto3を利用し、キューに multiprocessing.Queueを利用しました。

import multiprocessing

import boto3
import botocore.exceptions
import click
from boto3.session import Session
from mypy_boto3_s3.client import S3Client
from mypy_boto3_sts.client import STSClient

ACCESS_KEY = "XXXXX"
SECRET_ACCESS_KEY = "YYYYY"
REGION = "ZZZZZ"
SESSION_NAME = "session"
ASSUMED_ROLE = "XXXXX"
BUCKER_NAME = "XXXXX"


def get_aws_session() -> Session:
    sts_client: STSClient = boto3.client(
        "sts",
        aws_access_key_id=ACCESS_KEY,
        aws_secret_access_key=SECRET_ACCESS_KEY,
    )

    # AWSアカウントの切り替え
    response = sts_client.assume_role(
        RoleArn=ASSUMED_ROLE,
        RoleSessionName=SESSION_NAME,
    )

    return Session(
        aws_access_key_id=response["Credentials"]["AccessKeyId"],
        aws_secret_access_key=response["Credentials"]["SecretAccessKey"],
        aws_session_token=response["Credentials"]["SessionToken"],
        region_name=REGION,
    )


def worker_queue(worker_index: int, bucket: str, queue: multiprocessing.Queue):
    session = get_aws_session()
    s3client: S3Client = session.client("s3")
    timeout = 1
    while True:
        is_file_exist = False
        if queue.empty():
            break

        try:
            item = queue.get(timeout=timeout)
            head_response = s3client.head_object(Bucket=bucket, Key=item)
            is_file_exist = True
            print(f"{number} {item} found.")
        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "404":
                print(f"{worker_index} item not found. item={item}")
            else:
                # それ以外の例外は異常発生とする
                print(e)
        except Exception as e:
            print(e)

        if not is_file_exist:
            continue

        try:
            response = s3client.get_object(Bucket=bucket, Key=item)
            body = response.get("Body").read()
            print(f"{number} {item} download success.")
        except Exception as e:
            print(f"{number} {item} download failed.")
            print(e)

    return


def create_worker(
    bucket: str,
    queue: multiprocessing.Queue,
    worker_index: int,
) -> multiprocessing.Process:
    return multiprocessing.Process(
        target=worker_queue, args=(worker_index, bucket, queue)
    )


@click.command(help="test.")
@click.option(
    "--worker",
    "worker_num",
    help="worker_num",
    required=False,
    type=int,
    default=None,
)
def main(worker_num: int):
    file_path = "list.txt"

    with open(file_path) as f:
        lines = f.readlines()

    lines = [line.rstrip("\n") for line in lines]

    queue = multiprocessing.Queue(maxsize=len(lines))

    for item in lines:
        queue.put(item)

    processes = []
    for i in range(worker_num):
        process = create_worker(bucket=BUCKER_NAME, queue=queue, worker_index=i)
        processes.append(process)
        process.start()

    for check_process in processes:
        check_process: multiprocessing.Process
        check_process.join()


if __name__ == "__main__":
    main()

試験結果

async

image.png

横軸にCPU数、縦軸にworker数としてあります。

worker数 1 2 4 8 16
CPU=1 10.892 7.517 8.095 11.522 17.124
CPU=2 9.708 6.514 6.245 8.501 13.632
CPU=4 10.235 6.607 6.723 8.893 14.247

傾向

  • CPU数を変えても、同じような傾向にあることから、CPU数のグラフへの影響はあまり大きくはないように見える。
  • worker数が少ないうち処理完了までの時間は短くなっているが、多くなるとかえって長くなる。

multiprocessing

image.png

横軸にCPU数、縦軸にworker数としてあります。

worker数 1 2 4 8 16
CPU=1 11.982 8.122 9.312 12.799 21.784
CPU=2 9.357 6.020 5.134 5.912 9.237
CPU=4 11.430 6.107 4.618 4.990 7.255

傾向

  • CPU数を変えることの影響が大きいように見えます(CPU数の2倍程度まで処理時間が短くなっている)

まとめ

  • 実際にコードを動かし、結果を比較したところ、CPUを増やし、その分プロセスを動かすのはわかりやすく処理能力が向上していた。
  • asyncの結果は、worker数の影響が大きいため、このケースでは、小数のworker数で使う方がよさげに見えた
  • asyncは稼働したいるプロセス上でおこなわれていることから、multiprocessingを組み合わせれば、さらに性能をあげられるかも。

おわりに

Advend Calender初参加ということで、気になっていたということをお題にしてみました。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?