この記事ですることを3行で
- Pythonの標準ライブラリでできる並列実行を、あらためて総当たりで速度比較しよう
- ウォーターフォールチャートで、それぞれの並列処理の処理時間の特徴を可視化しよう
- boto3の実行をモデルケースにして、どの並列処理が一番早いのかを調べよう
この記事の結論を先に
-
Python 3.12から本格的に使えるようになったサブインタープリターは、CPUで実行する処理について言えば、従来のサブプロセスよりも高速
-
boto3の実行は、サブインタープリターよりも署名付きURLの非同期実行のほうが速い
→ S3からの10ファイルの取得であれば、実行時間を90%削減できます
→ Bedrockの3回実行であれば、実行時間を60%削減できます
今回使ったソースコードはこちらに置いています。
お手持ちの環境で再実行できるようにしていますので、気になる方はぜひ。
どうしてこの記事を書くのか
2020年の10月に、「Faster Cpython」の計画が立ち上がりました。
The overall aim is to speed up CPython by a factor of (approximately) five.
(最終的な狙いは、CPythonを5倍高速にすることです)
3.10以降のPythonではこの成果が取り入れられています。3.12ではこれまで使い物にならなかった並列化処理(サブインタープリター)も使えるようになりました。次のバージョンである3.13では、GILを撤廃するモードも追加されます。
これまでのPythonの並列化の常識は変わりつつあるので、あらためて調べておきたく思いました。
この記事で扱う実行環境
- Python 3.12.1
※比較のために、一部でPython 3.8.3を使います
この記事で扱う並列化の対象処理
- CPUを使った単純な計算(フィボナッチ数の計算)
- Sleep処理
- S3からのファイル取得
- LLM(Bedrock Claude Haiku)の実行
同時実行数は10、LLMの同時実行数は3にします。
この記事で扱う実行方法
この記事では、以下の7通りの処理を比較します
※処理する対象の関数名はprocess、非同期はasync_processとしています
※7通りのいずれも、標準のpythonで使える実行方法です
※並列と並行を含みますが、この記事では並列で表記します。
1. for文でそのまま書く(グラフでの表記: SINGLE)
for _ in range(10):
process() # 実行する処理
2. asyncioで非同期実行する(グラフでの表記: ASYNC)
import asyncio
async def execute():
task_list = [
asyncio.create_task(async_process())
for _ in range(10)
]
for task in task_list:
await task
def start():
# スレッドの実行
asyncio.run(execute())
3. multiprocessingでプロセスを分割する(グラフでの表記: SUBPROCESS)
import multiprocessing
proc_list = []
for _ in range(10):
proc = multiprocessing.Process(target=process)
proc.start()
proc_list.append(proc)
for p in proc_list:
p.join()
4. threading.Threadでスレッドを立てる(グラフでの表記: THREADS)
import threading
threads = []
for _ in range(10):
thread = threading.Thread(target=process)
thread.start()
threads.append(thread)
for t in threads:
t.join()
5. SubInterpreterを直接実行する(グラフでの表記: SUBINTERPRETER)
import _xxsubinterpreters as interpreters # type: ignore
from inspect import getsource
from textwrap import dedent
# 関数を文字列に変換、インデントを除去する
process_str = dedent(getsource(process))
for _ in range(10):
intp_id = interpreters.create()
interpreters.run_string(intp_id, process_str)
interpreters.destroy(intp_id)
6. SubInterpreterをthreading内で実行する(グラフでの表記: SUBINTERPRETER_THREADS)
import _xxsubinterpreters as interpreters # type: ignore
from inspect import getsource
from textwrap import dedent
def call_subinterpreter():
# 関数を文字列に変換、インデントを除去する
process_str = dedent(getsource(process))
intp_id = interpreters.create()
interpreters.run_string(intp_id, process_str)
interpreters.destroy(intp_id)
# threadingを使って実行する
threads = []
for _ in range(10):
thread = threading.Thread(target=call_subinterpreter)
thread.start()
threads.append(thread)
for t in threads:
t.join()
7. SubInterpreterをsubprocess内で実行する(グラフでの表記: SUBINTERPRETER_SUBPROC)
import _xxsubinterpreters as interpreters # type: ignore
from inspect import getsource
from textwrap import dedent
def call_subinterpreter():
# 関数を文字列に変換、インデントを除去する
process_str = dedent(getsource(process))
intp_id = interpreters.create()
interpreters.run_string(intp_id, process_str)
interpreters.destroy(intp_id)
# subprocessを使って実行する
proc_list = []
for _ in range(10):
proc = multiprocessing.Process(target=call_subinterpreter)
proc.start()
proc_list.append(proc)
for p in proc_list:
p.join()
比較していく
CPUで実行する計算処理を比較する
簡単なフィボナッチ数の計算をします。関数の再帰を繰り返して、CPUだけで処理をする関数です。
def fib_r(seq):
if seq <= 1:
return seq
return fib_r(seq - 1) + fib_r(seq - 2)
res = fib_r(process_count)
この計算を、先に挙げた7通りの並列実行で比較します。
Python 3.8の場合
Python 3.8で実行すると、以下のようになりました。
for文をそのまま書いた場合(SINGLE)でかかる時間を1としたときに、それぞれの処理がどれだけ時間がかかるのかをグラフにしています。
Pythonの3.8では、サブプロセス以外では実行時間の短縮が見られません。
サブインタープリターをサブプロセス上で実行させたケースも数字は良いのですが、3.8の時点では、致命的な不具合(例:関数の引数として渡した文字列の中身が壊れる)があるので、実用性はありません。
Python 3.12の場合
全く同じ処理をPython 3.12で実行して比較してみます。
最も実行時間の短い処理は、サブインタープリターのスレッド実行になりました。
他の処理の半分以下の時間で済んでいます。
サブプロセスの処理が遅いのは、Pythonの3.12にとって、渡した処理が短すぎたせいです。もう少し長い時間(90秒ほど)かかる処理を渡して再実行します。
処理の時間が長くなると、サブプロセスも悪くない数字になりました。
サブインタープリターは処理時間に関わらず、半分以下の実行時間で安定しています。
詳しく見る
それぞれの実行時間をウォーターフォールチャートで見ていきます。
おそらくブラウザの検証モードで馴染みがあるチャートだと思います。
この図です。処理が移り変わっていくときに、それぞれの処理の開始時間と終了時間を棒グラフで表したものです。今回はmatplotlibのBoxplotを変形して作っています。
import matplotlib.pyplot as plt
# ウォーターフォールチャートのデータを用意する(開始秒、終了秒のタプル)
span_data = [
(0.0, 1.0),
(1.0, 2.0),
(2.0, 3.0),
]
# ボックスプロットを描画する
plt.boxplot(
# 2と3の間が箱、それ以外はひげになるため、箱だけを書く
# データは若いものが上になるように逆順にする
x=[[s[0], s[0], s[1], s[1]] for s in span_data.__reversed__()],
labels=[f"{i}" for i in range(len(span_data), 0, -1)],
# デザインを設定する
vert=False, # 横向きにする
patch_artist=True, # 色を設定
boxprops=dict( # 箱の色を設定
facecolor="#0972d3",
color="#0972d3",
),
medianprops=dict(color="black", linewidth=0), # 箱の上の線は消す
)
# 描画する
plt.show()
for文を直接回す(Single)
for文を回したときのウォーターフォールチャートは下のようになります。
処理が終わると次の処理に移る、それを10回繰り返すため、実行時間は階段状に並びます。
非同期実行する(Async)
Asyncの場合、awaitのない処理は通常のfor文と同じように処理されます。
ごくわずかですが、前の処理が完了する前に次の処理が始まります。
サブインタープリターを直接実行する(Subinterpreter)
サブインタープリターを直接実行した場合も、通常のfor文と同じように処理されます。
処理の先頭でわずかに準備の時間がかかるため、その分だけ遅くなっています。
サブプロセスを実行する(Subprocess)
サブプロセスは、実行前の準備に大きく時間がかかります。
処理そのものは完全な並列ですから、グラフは階段状ではなく縦に並びます。
処理が短すぎると実行前の準備時間の分をペイできないのですが、十分に時間のかかる処理を渡してやると、下のようなきれいな並列処理になります。
スレッド実行する(Threads)
スレッド実行は、サブプロセスと違って実行前の準備時間がありません。
ただ、スレッドが他のスレッドを止めてしまうため、まばらな待ち時間が発生します。
スレッド実行に時間のかかる処理を渡すと、処理が切り替わるわけではなく、全体が引きずられて遅延します。サブプロセスであれば45秒で終わる処理なのですが、どのスレッドも2倍の時間がかかっています。
どうしてこんな挙動をするのかというと、PythonのスレッドにはGILと呼ばれるロックがあって、同時に2つ以上のスレッドを処理しないようになっているためです。
GILについて、詳しくはこちらをご参考ください。
サブインタープリターでスレッド実行する
やや古いPythonでは、マルチプロセスだけはGILの制限にかからないので、並列実行の速度的なメリットを受けるにはマルチプロセスで実装する必要がある、という事情がありました。
3.12から、本格的にサブインタープリターが使えるようになったことで、この事情が変わりました。
サブインタープリターには以下の特徴があります。
- スレッド上で実行することが可能
- GILの制限を受けない
サブプロセスとの違いは、ウォーターフォールチャートで見ると分かりやすいです。
スレッドで実行するため、サブプロセスにあった準備時間がありません。
GILの遅延がないため、サブプロセス並みの実行時間で処理が完了します。
時間のかかる処理を渡してみます。
とても綺麗な並列実行です。
まさに、スレッドとサブプロセスのいいとこどりの動きをしていることが分かります。
サブインタープリターをサブプロセス実行する
一応、サブインタープリターをサブプロセス上で実行することもできます。
※実装が難しくなるだけで、何もメリットはありません。
サブプロセスのデメリット(準備に時間がかかる)を受けていて、処理時間そのものもサブプロセス並みです。ウォーターフォールチャートで見ると、サブプロセスの上でサブインタープリターを実行しない理由が分かりやすいと思います。
データの件数を増やしても、動きはサブプロセスと同じです。
Sleep処理を比較する
スリープ処理をする場合の時間を比較します
※サブインタープリター内ではSleepできないので、それを除いた処理を比較します。
sleep(1.0)
Asyncでは使う関数が変わります。
await asyncio.sleep(1.0)
1秒待機する処理を10件実行しました。
かかった時間は以下の通りです。
CPUを利用する並列処理であれば、サブプロセスは強力でした。サブプロセスに事前準備で遅くなるデメリットがあっても、それを補うだけの速度がありました。
Sleepでは事情が変わります。非同期実行のAsyncとスレッド実行は、CPUを使わない処理であれば、どちらもきれいに10分の1の時間で完了します。
S3のファイル取得を比較する
S3のファイル取得処理を比較します。
※サブインタープリター内ではboto3は動かないので、ひとまず除いた処理を比較します。
bucket = boto3.resource("s3").Bucket(BUCKET_NAME)
content = bucket.Object(FILES[index]).get()["Body"].read()
ちなみに、boto3の公式ドキュメントにある通り、boto3のリソースとセッションはスレッドセーフではないため、並列処理で使いまわすことができません。各スレッドで作るようにします。
Similar to Resource objects, Session objects are not thread safe and should not be shared across threads and processes. It’s recommended to create a new Session object for each thread or process:(Resourceオブジェクトと同様に、Sessionオブジェクトはスレッドセーフではないため、スレッドやプロセス間で共有すべきではありません。Sessionスレッドまたはプロセスごとに新しいオブジェクトを作成することをお勧めします。)
S3にある10件の画像ファイルを並列で取得した結果がこちらです。
boto3はawaitできないため、Asyncの非同期実行は遅くなります。
スレッド実行は30%ほど実行時間を短縮できています。
ですが、ちょっと物足りない数字です。マルチスレッドに対応していないboto3を無理やり動かすのではなくて、どうにかしてサブインタープリターでboto3を実行して、もっと大きく実行時間を削減したい…
しばらく考えて、ふと思いつきました。
これ、boto3から署名付きURLを取れば、Asyncで非同期実行も可能で、サブインタープリター内で実行できるんじゃね?
テイク2:署名付きURLでS3からファイルを取る
署名付きURLを並列実行の外側で発行します。
s3 = boto3.client("s3")
my_config = Config(region_name="ap-northeast-1", signature_version="s3v4")
s3 = boto3.client("s3", config=my_config)
presigned_url = s3.generate_presigned_url(
ClientMethod="get_object",
Params={
"Bucket": BUCKET_NAME,
"Key": FILES[index],
},
ExpiresIn=3600,
)
発行した署名付きURLを使って、並列処理の中でファイルを取得します。
import urllib3
response = urllib3.PoolManager().request(method="get", url=presigned_url)
res = response.data
Asyncについては、aiohttpを使って実行します。
※URL encoded=Trueを指定しないとboto3の署名が無効にされるので、encoded=Trueを指定してリクエストを投げます。
import aiohttp
from aiohttp.client import URL
async with aiohttp.ClientSession() as session:
async with session.request(
method="get",
url=URL(presigned_url, encoded=True),
) as response:
res = await response.read()
先ほどと同じように、署名付きURLを挟んで、10件の画像ファイルをS3から取得します。
直列実行で署名付きURLを取得、そのままファイルを取得する場合と比べて、およそ70%ほど実行時間が短縮されています。S3からのファイル取得をサブインタープリター化することはできました。
では、boto3で直接取得する場合に比べると、どのくらい実行時間は短縮されているのでしょうか。比較してみます。
だいたい90%ほど実行時間が短縮できています。
Bedrockの実行を並列化する
S3の署名付きURLの発行は簡単ですが、ほかのAPIでも同じようなことができます。
Bedrock のClaude 3 Haikuの実行を並列化してみます。
以下のようなクラスを作って、Boto3のリクエストをhttpクライアントで実行できるようにします。
import boto3
from botocore.auth import SigV4Auth
from botocore.credentials import create_credential_resolver
from botocore.awsrequest import prepare_request_dict, create_request_object
from botocore.model import OperationModel
class Boto3LowLevelClient:
_service_name: str
_credential_scope: str
_boto3_session = None
_client = None
def __init__(
self,
service_name: str,
credential_scope: str = None,
region_name: str = None,
profile_name: str = None,
) -> None:
# サービス名を確保する
# 権限スコープは基本的にサービス名に同じ、異なるなら個別に設定する
self._service_name = service_name
if credential_scope is None:
self._credential_scope = service_name
else:
self._credential_scope = credential_scope
# boto3のセッションを確保する
self._boto3_session = boto3.Session(
region_name=region_name, profile_name=profile_name
)
# boto3のクライアントを作成する
self._client = self._boto3_session.client(service_name)
def create_request_parameter(self, method_name: str, method_parameter: dict):
"""
boto3への要求情報を作成する
"""
# クライアントから関数の定義情報を参照する
operation_model: OperationModel = self._client._service_model.operation_model(
method_name
)
request_dict = self._client._serializer.serialize_to_request(
method_parameter, operation_model
)
# 今の環境に設定されている認証情報を取得する
resolver = create_credential_resolver(self._boto3_session._session)
credentials = resolver.load_credentials()
# リージョン名を取得する
region_name = self._boto3_session.region_name
# エンドポイントのurlを取得する
endpoint_url = f"https://{self._service_name}.{region_name}.amazonaws.com"
# 要求情報にコンテキストとエンドポイントを書き込む
prepare_request_dict(
request_dict,
endpoint_url,
{
"client_region": region_name,
"client_config": self._client.meta.config,
"has_streaming_input": operation_model.has_streaming_input,
"auth_type": operation_model.auth_type,
},
user_agent=self._client._user_agent_creator.to_string(),
)
# リクエストに対して署名する
request = create_request_object(request_dict)
SigV4Auth(
credentials=credentials,
service_name=self._credential_scope,
region_name=region_name,
).add_auth(request)
# ヘッダを辞書型に詰め直す
headers = {}
for kv in request.headers.__dict__["_headers"]:
headers[kv[0]] = kv[1]
return {
"method": request.method,
"url": request.url,
"body": request.body.decode("utf-8"),
"headers": headers,
}
実行するときはboto3と同じです。
import urllib3
import json
# 署名をboto3から抜き取る
sts = Boto3LowLevelClient("sts")
# stsのget_caller_identityを作成する
get_caller_identity = sts.create_request_parameter("GetCallerIdentity", {})
# urllib3やaiohttpで実行する
# ここからはboto3ではないので、並列化やサブインタープリター内での実行ができる
res = urllib3.PoolManager().request(**get_caller_identity)
print(res.data.decode("utf-8"))
import urllib3
import json
# Bedrockは署名スコープがbedrockになるので、そこだけ指定する
bedrock_runtime = Boto3LowLevelClient(
"bedrock-runtime", credential_scope="bedrock", region_name="us-east-1"
)
# BedrockRuntimeのinvoke_modelを作成する
invoke_model = bedrock_runtime.create_request_parameter(
"InvokeModel",
{
"body": json.dumps(
{
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 100,
"messages": [
{
"role": "user",
"content": "こんにちは、Claude",
}
],
}
),
"modelId": "anthropic.claude-3-haiku-20240307-v1:0",
"accept": "application/json",
"contentType": "application/json",
},
)
# urllib3やaiohttpで実行する
# ここからはboto3ではないので、並列化やサブインタープリター内での実行ができる
res = urllib3.PoolManager().request(**invoke_model)
print(res.data.decode("utf-8"))
aiohttpで実行するときは下のように書きます
import aiohttp
from aiohttp.client import URL
async with aiohttp.ClientSession() as session:
async with session.request(
method=invoke_model["method"],
url=URL(invoke_model["url"], encoded=True), # encoded=Trueを指定
data=invoke_model["body"], # 引数の変数名が違うので、dataにする
headers=invoke_model["headers"],
) as response:
res = await response.text()
この方法で、boto3のclient経由で実行できるメソッドならどれでも並列化できます
詳しいソースはこちらです
署名を並列処理の外側で作成して、3回のBedrockへのリクエストを実行しました。
非同期のAsyncやスレッド実行で、3分の1に近い実行時間まで削ることができています。Bedrockは実行時間の揺れが大きいので、ややばらついた部分はありますが、挙動はS3の並列実行と同じだと考えて良さそうです。
詳しく見る
for文でBedrockを3回実行する
for文で実行したときのウォーターフォールチャートです。
Haikuは1秒ほどでレスポンスを返しています。
Asyncを使って、非同期で実行する
aiohttpを使って、非同期でリクエストを投げたときの挙動は以下の通りです
わずかに実行開始までのあいだのラグがあるのですが、Haikuを完全な並列で実行しています。joinではなくraceの形で実行していれば、1.6秒ほどでLLMの結果を受け取ることができそうです。
スレッド実行を使って、非同期でBedrockを実行する
スレッド実行ではawaitの必要はないので、aiohttpではなく、urllib3を使って実行します。
スレッド実行のウォーターフォールチャートは以下のようになります。
LLMに3回のリクエストを投げて結果を受け取るまでの時間は2秒を切っています。
サブインタープリターをスレッド実行する
サブインタープリターのスレッド実行もurllib3で実行します。
ウォーターフォールチャートは以下のようになります。
こちらもスレッドに近い結果です。
LLMではSQL文やJSONの作成で意図に沿わない形式の結果を返すことがあるのですが、並列で同時に複数回のリクエストを送って、使える結果を選ぶようにすれば、リトライのもたつきを感じることなく実装することができます。
まとめ
あらためて、Pythonの3.12の並列処理を総あたりで調べてみました。
- Python 3.12から本格的に使えるようになったサブインタープリターは、CPUで実行する計算処理について言えば、従来のサブプロセスよりも高速
- boto3の実行は、新しいサブインタープリターに頼るよりも署名付きURLを噛ませるほうが早い
あらためて、今回使ったソースコードはこちらに置いています。
お手持ちの環境で再実行できるようにしています。