7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

[並列処理] 大量なS3オブジェクトのダウンロード時間を短縮する

Posted at

はじめに

データレイクとしてS3を選択し、データサイズはそれほど大きくない(データウェアハウスを用意するほどではない)場合、
データの保存場所をS3内で完結させ、S3を直接見る形で分析できないかと考えると思います。

この時、

  • S3上のオブジェクト数が多い

と、S3オブジェクトのダウンロードにかなりの時間がかかってしまいます。

「LambdaがS3からGetObjectするのにかかる時間を計測してみた」の記事でまとめられている通り、
S3からデータを落としてくるのに、1kBのデータサイズであっても、0.0数秒は要してしまいます。

仮に、1kBのデータの取得に0.05秒かかったとして、データサイズが10万件あれば、それらを取得するのに、5000秒(≒83分)かかることになります。(待ってられません)

これは、通信がボトルネックになっているため、分析環境のメモリサイズをいくら上げても改善しきれません。
そんな時に、並列処理でダウンロード時間を短縮したというお話です。

具体的にどうしたか

Pythonのasyncioモジュールを利用し、並列処理を行いました。

以下、簡単な説明になります。

  • asyncio.get_event_loop()でイベントループを取得
  • 並列化したい関数をasyncをつけて定義
  • awaitと宣言してから、取得したイベントループのrun_in_executorに、時間がかかる処理を手配する
  • asyncio.gatherで手配した処理を並列実行します

なお、asyncio.gatherの返り値は、await loop.run_in_executorで渡した順に返ってきます。

少し雑ですが、以下該当部分のコードになります。

関数定義

# マルチスレッドでboto3のリソースを共有してはいけないため、bucketを受け取らず、bucket_nameを受け取っている
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-and-multiprocessing
async def async_get_objects(bucket_name, object_keys):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, get_objects, bucket_name, object_keys)

def get_objects(bucket_name, object_keys):
    bucket = boto3.resource('s3').Bucket(bucket_name)
    response_bodies = [] 
    for key in object_keys:
        response = bucket.Object(key).get()
        response_bodies.append(response['Body'].read())
        # ファイルディスクリプタの制限を受けないため
        response['Body'].close()        
    return response_bodies

呼び出し

read_objects = []
# bucket : Boto3のBucketオブジェクト
# objects_divided : 並列数分に分割したS3 object keyのリストのリスト
# 例 [[obj_key1, obj_key2, ...,obj_key100], [obj_key101, ...,obj_key200],...] 
for objects_divided_part in objects_divided:
    read_objects.append(async_get_objects(bucket.name, objects_divided_part))
        
response_bodies_list = await asyncio.gather(*read_objects)

おわりに

上記の並列化を行うと、確保されているプロセス数分はS3オブジェクト取得に要する時間を削減できます。
以下、asyncioモジュールを理解する上で参考になった記事です。

https://docs.python.org/ja/3/library/asyncio-task.html#asyncio.gather
asyncioでPythonの非同期処理を書いてみる
Pythonの非同期通信(asyncioモジュール)入門を書きました
Pythonにおける非同期処理: asyncio逆引きリファレンス

7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?