概要
複数のリソース・時間制限に対するRateLimitを柔軟かつ簡単に掛けられるPythonモジュールを公開しました。
背景
OpenAI 関連などで、最近RateLimitの管理が多くなっています。OpenAI だと、リクエストとトークンに個別に分や日単位の制限が掛かります。この様な複雑な制限に簡単に対応できる既存の手法が見つからなかったため、自作しました。
インストール
pipなら
pip install multi-rate-limit
poetryなら
poetry add multi-rate-limit
単純な利用例
import asyncio
import time
from multi_rate_limit import MultiRateLimit, RateLimit, FilePastResourceQueue
async def work(name: str, time_required: float):
print(f'Start {name} at {time.time()}')
await asyncio.sleep(time_required)
print(f'End {name} at {time.time()}')
# レート制限で動作させる処理は、コルーチン(async def)として実装し、
# 必ず2要素をタプルで返してください。
# 最初の要素は、実行後に得られる正確なリソース利用情報で。事前予測を上書きするために使います。
# ここではNoneなので、上書きせず事前予測のままとなります。
# 2つ目の要素は、処理の結果外部で得たい値を指定して下さい。
return None, None
async def main():
# 3つのレート制限、かつ最大3非同期実行させるレート制限を設定しています。
# 最初のリソースは、1秒で最大3、10秒で最大10、2番目のリソースは3秒で最大6で制限を掛けます。
mrl = await MultiRateLimit.create([[RateLimit(3, 1), RateLimit(10, 10)], [RateLimit(6, 3)]],
None, 3)
ticket1 = mrl.reserve([1, 3], work('1', 1))
ticket2 = mrl.reserve([1, 3], work('2', 1))
ticket3 = mrl.reserve([1, 1], work('3', 1))
ticket4 = mrl.reserve([3, 0], work('4', 1))
ticket5 = mrl.reserve([3, 0], work('5', 1))
ticket6 = mrl.reserve([3, 0], work('6', 1))
await asyncio.gather(ticket1.future, ticket2.future, ticket3.future, ticket4.future, ticket5.future, ticket6.future)
await mrl.term()
asyncio.run(main())
結果は以下の様な感じになります。
poetry run python .\main.py
Start 1 at 1702054558.9240694
Start 2 at 1702054558.9240694 <- リソースと同時実行数ともに余裕があれば同時実行します。
End 1 at 1702054559.929741
End 2 at 1702054559.929741
Start 3 at 1702054562.932087 <- 2番目のリソースが RateLimit(6, 3) に掛かり、待たされています。
End 3 at 1702054563.9323323
Start 4 at 1702054564.9366117 <- 1番目のリソースが RateLimit(3, 1) に掛かり、待たされています。
End 4 at 1702054565.9401765
Start 5 at 1702054566.941147 <- 1番目のリソースが RateLimit(3, 1) に掛かり、待たされています。
End 5 at 1702054567.9440823
Start 6 at 1702054569.9346018 <- 1番目のリソースが RateLimit(10, 10) に掛かり、待たされています。
End 6 at 1702054570.9466405
リソース消費量の上書き方法
別途指定しない限り、reserve時に事前指定したリソース割り当て量が、コルーチン実行完了後に消費された事になります。もし、実行後に正確な値に上書きしたい場合、コルーチンの返値や例外で指定できます。
返値で上書きしたい場合は
((use_time, [use_resource1, use_resource2,,,]), return_value_to_user)
上書き不要なら
(None, return_value_to_user)
を返してください。
例外時に上書きしたい場合は
raise ResourceOverwriteError(use_time, [use_resource1, use_resource2,,,], cause_exception)
上書き不要なら、それ以外の例外を単純に投げるだけです。
raise cause_exception
リソース消費情報の再利用方法
上記の単純な例だと、リソース消費情報は全てメモリ上で管理されます。つまり、別途開始したプログラムには引き継げないため、レート制限が正しく行えない可能性があります。
IPastResourceQueueを実装すれば、メモリ以外の外部で「実行済みの」リソース消費情報を管理できます。単純なファイルを用いたリソース消費情報の再利用には、パッケージ内の実装例のFilePastResourceQueueを以下の様に使ってください。python 実行フォルダの「res-log.tsv」を通して再利用できます。
file_name = 'res-log.tsv'
mrl = await MultiRateLimit.create([[RateLimit(3, 1), RateLimit(10, 10)], [RateLimit(6, 3)]],
lambda len_resource, longest_period_in_seconds: FilePastResourceQueue.create(
len_resource, longest_period_in_seconds, file_name),
3)
予約した処理のキャンセル方法
実行前の待機中のみ、以下の様にキャンセルが可能です。
ticket1 = mrl.reserve([1, 3], work('1', 1))
mrl.cancel(ticket1.reserve_number)
リソース消費状況の確認方法
以下の様に、MultiRateLimit.stats()を使ってください。
import asyncio
import time
from multi_rate_limit import MultiRateLimit, MinuteRateLimit, DayRateLimit
from typing import List, Optional
async def work(name: str, time_required: float, overwrite_resources: Optional[List[int]]):
print(f'Start {name} at {time.time()}')
await asyncio.sleep(time_required)
print(f'End {name} at {time.time()}')
# レート制限で動作させる処理は、コルーチン(async def)として実装し、
# 必ず2要素をタプルで返してください。
# 最初の要素は、実行後に得られる正確なリソース利用情報で。事前予測を上書きするために使います。
# 2つ目の要素は、処理の結果外部で得たい値を指定して下さい。
if overwrite_resources is None:
return None, None
else:
return (time.time(), overwrite_resources), None
async def print_stats(mrl: MultiRateLimit):
# 短時間待って、内部処理が安定するのを待ちます。
await asyncio.sleep(0.1)
stats = await mrl.stats()
print(f'Past resource percentage : {stats.past_use_percents()}')
print(f'Past + current resource percentage : {stats.current_use_percents()}')
print(f'Past + current + next resource percentage : {stats.next_use_percents()}')
async def main():
# 3つのレート制限、かつ最大3非同期実行させるレート制限を設定しています。
mrl = await MultiRateLimit.create([[MinuteRateLimit(3, 0.1), DayRateLimit(100)], [MinuteRateLimit(10)]],
None,
3)
ticket1 = mrl.reserve([1, 3], work('1', 1, None))
ticket2 = mrl.reserve([3, 2], work('2', 1, [2, 2]))
mrl.cancel(ticket1.reserve_number, True)
await print_stats(mrl)
mrl.reserve([1, 0], work('3', 1, [0, 1]))
mrl.reserve([0, 2], work('4', 1, None))
mrl.reserve([1, 1], work('5', 1, None))
mrl.reserve([2, 2], work('6', 1, None))
await ticket2.future
await print_stats(mrl)
await mrl.term(True)
asyncio.run(main())
結果は以下の様に、各レート制限ごとに、どれくらい詰まっているかがパーセントで得られます。
1回目のstats()だと、MinuteRateLimit(3, 0.1)、つまり6秒で最大3に実行済み+実行中で100%となりこれ以上余裕がない事が分かります。
2回目stats()だと、同じ制限に引っかかっていて、かつ待機中も合わせて200%となっているので、この時点からreserve()で1番目のリソースを使う処理を登録しても、6秒くらいは回ってこない事がうかがえます。
poetry run python .\main.py
Start 2 at 1702059926.158294
Past resource percentage : [[0.0, 0.0], [0.0]]
Past + current resource percentage : [[100.0, 3.0], [20.0]]
Past + current + next resource percentage : [[100.0, 3.0], [20.0]]
End 2 at 1702059927.1678545
Start 3 at 1702059927.1678545
Start 4 at 1702059927.1678545
Past resource percentage : [[66.66666666666667, 2.0], [20.0]]
Past + current resource percentage : [[100.0, 3.0], [40.0]]
Past + current + next resource percentage : [[200.0, 6.0], [70.0]]
End 4 at 1702059928.1696303
End 3 at 1702059928.1696303