はじめに
Pythonで記述したLambda関数からAWSのAPIを呼び出す処理で、いまいち処理性能が出ないという課題があったので、並行処理・並列処理を実装して処理の効率が上がるのか検証してみました。
並行・並列処理とは?とうい方は以下の記事に分かりやすく説明されているので参考になさってください。
前提
検証の内容
UUIDのリストを1000件分作成し、一件ずつSQSに登録します。
SQSに登録する処理を以下の三つの方法で実装し、それぞれの処理にかかる時間を比較します。
- forループ
- 並行処理
- 並列処理
「とりあえず動いて結果が比較出来るもの」を最短で作っていますのでコードの汚さには目をつぶっていただけると幸いです。
用意するもの
- Lambda関数
- リソース名:test-lambda
- ランタイム:Python 3.7
- メモリ:512MB
- タイムアウト:3分
- CPU数:2(Lambda側で固定されている)
- 環境変数に以下SQSのキューのURLを指定
- SQSのキュー
- リソース名:test-queue
- IAMロール
- Lambda関数からSQSのキューへアクセスするためのロール
- Lambda関数作成時に作成されるロールにsqs:sendMessageポリシーを追加
さっそく実行してみる
並行処理と並列処理のソースに関しては一つ目のソースとの差分のみ記載します。
まずは普通にループ
まずは普通forループで処理を実装します。
import os
import time
import uuid
import boto3
# 環境変数の取得
QUEUE_URL = os.environ['QUEUE_URL']
# 作成するUUIDのリストの長さ
LIST_LENGTH = 1000
sqs = boto3.client('sqs')
# UUIDのリストを作成するための関数
def create_uuid_list(length):
uuid_list = []
for i in range(length):
uuid_list.append(str(uuid.uuid4()))
return uuid_list
# SQSへメッセージを送信するための関数
def send_message(message):
response = sqs.send_message(
QueueUrl = QUEUE_URL,
DelaySeconds = 0,
MessageBody = (
message
)
)
# ハンドラー関数
def lambda_handler(event, context):
# UUIDのリストを作成する
uuid_list = create_uuid_list(LIST_LENGTH)
# 処理開始時間の取得
time_start = time.perf_counter()
# --------------- ここからの処理を変更する ---------------
# forループを回しUUIDのリストから一件ずつSQSに送信する
for id in uuid_list:
send_message(id)
# --------------- ここまで ---------------
# 処理終了時間の取得
time_end = time.perf_counter()
time_total = time_end- time_start
print(time_total)
並行処理
次に並行処理の実装します。
並行処理を行うためのモジュールはいくつかありますが、今回はその中でもより高速だと言われているconcrrent.futureを使います。
# --------------- ここからの処理を変更する ---------------
# 並行処理を行うためのモジュールをインポート
from concurrent import future
# 並行処理でSQSへメッセージを送信する
future_list = []
with futures.ThreadPoolExecutor(max_workers=4) as executor:
for id in uuid_list:
future = executor.submit(fn=send_message, message=id)
future_list.append(future)
_ = futures.as_completed(fs=future_list)
# --------------- ここまで ---------------
並列処理
最後に並列処理を実装します。
Lambdaではmultiprocessing.Poolが使えないようなので、multiprocessing.Processを使います。ただ、multiprocessing.Processはプロセスが増えるのに比例してファイルディスクリプタを消費するので、ファイルディスクリプタの上限数が1024であるLambda上で用いるのには注意が必要です。
# --------------- ここからの処理を変更する ---------------
# 並列処理に必要なモジュールをインポート
from multiprocessing import Process
# 並列処理でSQSへメッセージを送信する
process_list = []
for id in uuid_list:
process = Process(
target=send_message,
kwargs={'message': id})
process.start()
process_list.append(process)
for process in process_list:
process.join()
# --------------- ここまで ---------------
結果
結果は以下のようになりました。単位は秒です。
回数 | for ループ | 並行処理 | 並列処理 |
---|---|---|---|
1回目 | 13.4 | 3.77 | 19.4 |
2回目 | 14.1 | 3.87 | 19.3 |
3回目 | 11.8 | 3.50 | 20.9 |
平均 | 13.1 | 3.71 | 19.9 |
並行処理がダントツで最速でした。
並列処理はforループで回した場合よりも遅くなるという結果でした。
考察
これまでの内容で得られた結果について私なりに考察したいと思います。
並行処理
今回並行処理したsend_message関数ではsqs.send_message()を呼び出すという処理をしています。sqs.send_message()ではSQSのAPIを呼び出し、そのレスポンスを待っているので、その待ち時間を並行処理にあてることにより、リソースを有効活用でき処理時間の短縮につながったと考えられます。
このことから並行処理は「APIの呼び出し」や「DBへのクエリ」など、他システムの処理待ちが発生するような処理を実行する際に有効であるこ考えられます。
並列処理
今回の処理では並列処理を行った方がforループより遅くなるという結果になりました。
これは、処理自体は並列に走るようになるが、プロセスを増やす処理がオーバヘッドになり、今回のような簡単な処理では並列化のうまみが得られなかったのだと考えられます。
参考にした記事にもこのような記述がありました。
プロセス生成などにある程度オーバーヘッドがあるので、一瞬で終わるようなタスクに対してプロセスを分けたりすると逆に遅くなるケースがある。
まとめ
今回はAPI呼び出しを並行処理、並列処理で実装し処理時間を比較しました。
実行環境がマルチCPUだからといって並列処理が最適なわけではなく、処理の内容によって並行処理、並列処理を検討する必要があることが分かった。
クラウドサービスの多くは実行時間によって課金される従量課金製なので、リソースサイズの最適化だけではなく、アプリの実行時間の最適化も合わせて設計することで、より良いシステムの設計につながると感じました。
参考文献