はじめに
データレイクとしてS3を選択し、データサイズはそれほど大きくない(データウェアハウスを用意するほどではない)場合、
データの保存場所をS3内で完結させ、S3を直接見る形で分析できないかと考えると思います。
この時、
- S3上のオブジェクト数が多い
と、S3オブジェクトのダウンロードにかなりの時間がかかってしまいます。
「LambdaがS3からGetObjectするのにかかる時間を計測してみた」の記事でまとめられている通り、
S3からデータを落としてくるのに、1kBのデータサイズであっても、0.0数秒は要してしまいます。
仮に、1kBのデータの取得に0.05秒かかったとして、データサイズが10万件あれば、それらを取得するのに、5000秒(≒83分)かかることになります。(待ってられません)
これは、通信がボトルネックになっているため、分析環境のメモリサイズをいくら上げても改善しきれません。
そんな時に、並列処理でダウンロード時間を短縮したというお話です。
具体的にどうしたか
Pythonのasyncioモジュールを利用し、並列処理を行いました。
以下、簡単な説明になります。
- asyncio.get_event_loop()でイベントループを取得
- 並列化したい関数をasyncをつけて定義
- awaitと宣言してから、取得したイベントループのrun_in_executorに、時間がかかる処理を手配する
- asyncio.gatherで手配した処理を並列実行します
なお、asyncio.gatherの返り値は、await loop.run_in_executorで渡した順に返ってきます。
少し雑ですが、以下該当部分のコードになります。
関数定義
# マルチスレッドでboto3のリソースを共有してはいけないため、bucketを受け取らず、bucket_nameを受け取っている
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-and-multiprocessing
async def async_get_objects(bucket_name, object_keys):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, get_objects, bucket_name, object_keys)
def get_objects(bucket_name, object_keys):
bucket = boto3.resource('s3').Bucket(bucket_name)
response_bodies = []
for key in object_keys:
response = bucket.Object(key).get()
response_bodies.append(response['Body'].read())
# ファイルディスクリプタの制限を受けないため
response['Body'].close()
return response_bodies
呼び出し
read_objects = []
# bucket : Boto3のBucketオブジェクト
# objects_divided : 並列数分に分割したS3 object keyのリストのリスト
# 例 [[obj_key1, obj_key2, ...,obj_key100], [obj_key101, ...,obj_key200],...]
for objects_divided_part in objects_divided:
read_objects.append(async_get_objects(bucket.name, objects_divided_part))
response_bodies_list = await asyncio.gather(*read_objects)
おわりに
上記の並列化を行うと、確保されているプロセス数分はS3オブジェクト取得に要する時間を削減できます。
以下、asyncioモジュールを理解する上で参考になった記事です。
https://docs.python.org/ja/3/library/asyncio-task.html#asyncio.gather
asyncioでPythonの非同期処理を書いてみる
Pythonの非同期通信(asyncioモジュール)入門を書きました
Pythonにおける非同期処理: asyncio逆引きリファレンス