aiostream とは
aiostreamは、Pythonの asyncio による非同期プログラミングをサポートするストリーム処理ライブラリです。このライブラリは、非同期イテレータを操作するための豊富な関数プログラミング的なオペレータを提供し、複雑な非同期データフローを簡潔かつ効率的に構築することを可能にします。
似たようなプロジェクトとしては、 RxPY などがありますが、それがReactive ExtensionsをPythonに移植したものであるのに対し、aiostreamはPythonの標準ライブラリに近いインターフェイスを提供することを目指しているため、よりPythonらしいコードを書くことができます。
インストール
pip install aiostream
他の依存ライブラリはなく、Python 3.9以上で動作します。
コード例
import asyncio
from aiostream import stream, pipe
async def source_fn():
for i in range(10):
yield i
async def main():
res = await (
stream.iterate(source_fn()) # async generator をもとにストリームを生成
| pipe.map(lambda x: x * 2) # 各要素に2を掛ける
| pipe.filter(lambda x: x % 3 == 0) # 3の倍数のみを通過させる
| pipe.spaceout(1) # 1秒間隔で要素を出力するようにする
| pipe.print() # 標準出力に出力
| pipe.list() # ストリームをリストに変換
)
print(res)
asyncio.run(main())
この例では、0から9までの数値を生成し、各要素を2倍し、3の倍数のみをフィルタリングし、1秒間隔で出力しています。
出力:
debug: 0
debug: 6 (1秒待ち)
debug: 12 (1秒待ち)
debug: 18 (1秒待ち)
[0, 6, 12, 18]
このように、|
演算子を使ってストリームをパイプラインでつなげることで、非同期処理を直感的に記述することができます。
なお、パイプラインの演算子については以下の3点については等価です。
ys = stream.map(xs, f)
⇔
ys = pipe.map(f)(xs)
⇔
ys = xs | pipe.map(f)
各オペレータの説明
生成系オペレータ
ストリームの基点となる Stream[T]
を生成します。
iterate
は同期・非同期のイテラブルからストリームを生成する基本的な演算子です。preserve
はイテレータを明示的にクローズせずに非同期イテラブルから値を生成できます。
async def f():
for i in range(10):
yield i
fs = stream.iterate(f())
print(await fs[0]) # => 0
print(await stream.list(fs)) # => 1
fsp = stream.preserve(f())
print(await fsp[0]) # => 0
print(await stream.list(fsp)) # => [1, 2, 3, 4, 5, 6, 7, 8, 9]
単一の値を扱うものとして、just
やcall
があります。just
は値をそのまま、call
は関数の実行結果をストリームとして生成します。call
は引数がストリームの評価まで遅延評価されます。
def emit(i: int):
print("foo")
return i
st = stream.call(emit, 1)
print("bar")
await st[0] # ← この時点でemitが評価される。bar→fooの順で出力
連続した値の生成にはrepeat
、range
、count
が用意されており、itertoolsと同じようなインターフェイスで利用できます。これに特有なものとしては、interval
パラメータを使用することで時間間隔を制御できます。
await (stream.range(1, 10, interval=1) | pipe.print())
# 1秒ごとに1から10までの数値を出力
特殊な演算子としては、empty
(空のストリーム)、throw
(例外発生)、never
(永続的な待機)があります。
throw
は、明示的にエラーをストリームを流したときのパイプラインの挙動を確認するためにデバッグやテストなどでの利用ができそうです。
トランスフォーム系オペレータ
トランスフォーム系オペレータは非同期ストリームの要素を加工・変形するための強力なツールです。
関数ストリーミングプログライングの基礎となります。
利用頻度が高い map
は、1つまたは複数のストリームの要素に対して同期・非同期関数を適用します。特に非同期関数が渡された場合、以下がポイントになります。
- デフォルトで入出力の順序は保持するように出力されます。これは、
ordered
パラメータで制御できます - コルーチンは同時に実行されますが、
task_limit
引数を使用してその数を制限できます。値が 1 の場合、コルーチンは順番に実行されます
async def long_running_task(n):
await asyncio.sleep(1)
return n*2
await (
stream.range(10)
| pipe.map(long_running_task, task_limit=5)
| pipe.print())
# 出力: 0 2 4 6 8 の1秒後に 10 12 14 16 18 が出力される
async def long_running_task_2(n):
await asyncio.sleep(10-n)
return n*2
await (
stream.range(10)
| pipe.map(long_running_task_2, ordered=__)
| pipe.print())
# ordered=True の場合は、10秒経った後に 0 2 4 6 が一気に出力される (最初のアイテムを待つので)
# ordered=False の場合は、1秒おきに 18 16 14 が1秒おきに出力される
それに加え、enumerate
や zip
などの組み込み関数や itertools.starmap()
に相当する演算子も用意されています。
await (
stream.range(10)
| pipe.enumerate()
| pipe.starmap(lambda i, x: i * x)
| pipe.print()) # => 0 1 4 9 16 25 ...
accumulate
は、後に紹介する reduce
と同様にストリームの要素を累積していきますが、その累積結果を順次出力していきます。その点で一種のトランスフォーム演算子と言えるでしょう。
await (
stream.range(10)
| pipe.accumulate(lambda x, y: x + y)
| pipe.print()) # => 0 1 3 6 10 15 ...
集約系では、複数のストリームをn個ごとに集約する chunk
も利用機会が多いでしょう。
await (
stream.range(10)
| pipe.chunks(3)
| pipe.print()) # => [0, 1, 2] [3, 4, 5] [6, 7, 8] [9]
アクションオペレータ
aciton(func)
はストリームの要素に対して関数を適用し、その結果を無視します。デバックだけでなく、副作用を持つ関数、例えば、ストリーミングでデータを出力したり保存したりする際にも利用できます。
またこれまでにも登場してきましたが、デバッグ用に print
も用意されています。通常の print
と同じように、sep
や end
などの引数を指定できます。template
は、出力する文字列を指定できます。
async def save_to_db(n):
...
print(f"save {n} to db")
await (
stream.iterate(some_stream)
| ...
| pipe.action(save_to_db)
集約オペレータ
reduce
は、ストリームの要素を累積し、最終的に1つの値を返します。reduce
は、initial
パラメータで初期値を指定できます。initial
が指定されていない場合、最初の要素が初期値として使用されます。
print(await (stream.range(10) |
pipe.reduce(lambda x, y: x + y, initial=0))) # => 45
# カウンターの例
print(await (stream.range(10) |
pipe.reduce(lambda x, y: x + 1, initial=0))) # => 10
また、要素を全て集約してリストにする list
は良く使われる演算子です。
print(await (stream.range(10) | pipe.list())) # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
結合オペレータ
ストリームは複数のストリームを結合することができます。merge
は複数のストリームを連結し、それぞれのストリームから要素を取り出して返します。chain
は最初のストリームから要素を取り出し、そのストリームが終了したら次のストリームに移ります。
また、chain は +
演算子で代用できます。
async def stream1(n):
for i in range(n):
await asyncio.sleep(1)
yield i
async def main():
await (
stream.iterate(stream1(3))
| pipe.chain(stream1(3))
# または stream.range(3) + stream.range(3)
| pipe.print()) # => 0 1 2 0 1 2
await (
stream.iterate(stream1(3))
| pipe.merge(stream1(3))
| pipe.print()) # => 0 0 1 1 2 2
zip
は複数のストリームを結合し、それぞれのストリームから要素を取り出してタプルにして返します。zip
は最短のストリームが終了するまで要素を取り出します。
一方、ziplatest
は複数のストリームを結合し、それぞれのストリームから最新の要素を取り出してタプルにして返します。まだ要素がないストリームに対しては、default
で指定した値を返します。partial
を False
にすることで、全てのストリームが要素を返すまで待機します。
async def stream1():
yield 1
yield 3
yield 5
async def stream2():
yield 0
yield 2
yield 4
yield 6
st1 = stream.iterate(stream1())
st2 = stream.iterate(stream2())
await (stream.zip(st1, st2) | pipe.print())
# => (1, 0) (3, 2) (5, 4)
st1 = stream.iterate(stream1())
st2 = stream.iterate(stream2())
await (stream.ziplatest(st1, st2) | pipe.print())
# => (1, None), (3, None), (5, None), (5, 0), (5, 2), (5, 4), (5, 6)
フィルタオペレータ
もちろん基本であるストリームをフィルタするオペレータ群も多く用意されています。
- 要素数による制御
-
take(n)
: 最初のn個の要素のみを取得 -
takelast(n)
: 最後のn個の要素のみを取得 -
skip(n)
: 最初のn個の要素をスキップ -
skiplast(n)
: 最後のn個の要素をスキップ
-
- 条件による制御
-
filter(func)
: 条件に合う要素のみを通過させる -
until(func)
: 条件を満たすまで要素を通過(条件を満たした要素も含む) -
takewhile(func)
: 条件を満たす間、要素を通過(条件を満たさなくなった要素は含まない) -
dropwhile(func)
: 条件を満たす間、要素をスキップ
-
- インデックスによる制御
-
getitem(index)
: 特定のインデックスや範囲の要素を取得
-
最後のオペレータは独特で、ストリームの位置指定で切り出しができます。
await (
stream.count(0)
| pipe.getitem(slice(10, 15))
| pipe.list()
) # => [10, 11, 12, 13, 14]
# 註: stream.count(0) は無限ストリームだが、スライスされることで閉じられている。
発展的オペレータ
「ストリームのストリーム」を扱うオペレータも用意されています。いずれのオペレータも「ストリームのストリーム」を「フラットなストリーム」に変換します。
3つのストラテジー戦略があります。外側のストリームを親とし、内側のストリームを子とします。
-
concat
: 親のストリームを順番に展開し、子のストリームを順番に展開します。 -
flatten
: 親のストリームを順番に展開し、子のストリームの要素について到達した順番に展開します。 -
switch
: 展開された親ストリームの最後の子ストリームの要素のみを展開します。最後の親ストリームが終了すると、子ストリームは最後まで展開されます。
いずれも、エラーは伝播されます。
引数 task_limit
は、各ストリームの同時実行総数を制御します。(親ストリームの展開数にも影響します)
async def substream(i):
await asyncio.sleep(0.1)
yield f'A-{i}'
await asyncio.sleep(0.1)
yield f'B-{i}'
await asyncio.sleep(0.1)
yield f'C-{i}'
async def sos(n):
for i in range(3):
yield substream(i)
async def main():
print(await (stream.concat(sos(3)) | pipe.list()))
# ['A-0', 'B-0', 'C-0', 'A-1', 'B-1', 'C-1', 'A-2', 'B-2', 'C-2']
print(await (stream.flatten(sos(3)) | pipe.list()))
# ['A-0', 'A-1', 'A-2', 'B-0', 'B-1', 'B-2', 'C-0', 'C-1', 'C-2']
print(await (stream.switch(sos(3)) | pipe.list()))
# ['A-2', 'B-2', 'C-2']
特に有用なのは flatten
から派生した flatmap
です。これは map
の拡張として「受信した1つのストリームの要素から複数個(あるいは0個も含む)の要素を送出する」処理が行うことができます。
async def substream(i):
yield f'A-{i}'
yield f'B-{i}'
await (
stream.range(3)
| pipe.flatmap(substream)
| pipe.print()) # => A-0 B-0 A-1 B-1 A-2 B-2
)
もちろん concat
や switch
にも concatmap
や switchmap
があります。
時間系オペレータ
spaceout
は、指定された時間間隔で要素を出力するようにします。ストリームの流量を制御するために利用できます。
await (
stream.range(10)
| pipe.spaceout(1)
| pipe.print()) # => 0 1 2 3 4 5 6 7 8 9 (1秒ごとに出力)
timeout
は、指定された時間内に要素が到着しない場合にエラーを発生させます。
async def slow_stream():
await asyncio.sleep(2)
yield 1
await (
stream.iterate(slow_stream())
| pipe.timeout(1)) # => TimeoutError
delay
は、それぞれの値を指定された時間だけ遅延させて要素を出力します。
カスタムオペレータ
上のものに加えて、カスタムオペレータを作成することもできます。
詳細はドキュメントを参照してください。
応用例: スクレイピング + LLMアプリケーション
より実践的な例として、RAG構築のようなものを目的とした、スクレイピングとLLMを組み合わせたモックアプリケーションを考えてみましょう。
簡単な要件として、以下のようなものを考えます。
- 与えられたURLの一覧からHTMLを取得する
- 1秒おきに新規ページを取得する
- 同時に5並列までのリクエストを行う
- HTMLをセクションに分割する
- セクションの文章を自然言語処理やLLMなどで処理する。
- 処理した文章を埋め込みAPIにリクエストしてベクトル化する。
- 埋め込みAPIは5つのセクションを一度に処理できる
- 処理した文章をベクトル化してDBに保存する
最後に処理件数を返すとします。
それぞれの処理は以下のような関数で実装できるとします。
# URLからHTMLを取得する
async def fetch(url: str) -> Page: ...
# HTMLを処理してセクションに分割する
async def split_into_sections(html: Page) -> List[Section]: ...
# LLMなどで分割したセクションを処理する
async def process_section(section: Section) -> ProcessedSection: ...
# セクションを埋め込みしてベクトル化する (APIは5つのセクションを一度に処理することができる)
async def embed_sections(sections: list[ProcessedSection]) -> list[Vector]: ...
# ベクトルをDBに保存する
async def save_vectors(vector: Vector): ...
このとき、aiostreamを使って以下のようなコードを書くことができます。
import asyncio
import functools
def expand_to_stream(fn):
# list[T] から AsyncGenerator[T, None] に変換する
@functools.wraps(fn)
async def f(*args, **kwargs):
for x in await fn(*args, **kwargs):
yield x
return f
async def build(urls: list[str]) -> int:
count = await (
stream.iterate(urls)
| pipe.spaceout(1) # 1秒おきにリクエストを送る
| pipe.map(fetch, task_limit=5) # 同時に5つまでのリクエストを送る
| pipe.map(split_into_sections) # セクションに分割
| pipe.flatmap(expand_to_stream(process_section)) # セクションを処理
| pipe.map(process_section) # セクションを処理
| pipe.chunks(5) # ストリームを5つずつに配列に分割
| pipe.map(expand_to_stream(embed_sections)) # セクションを埋め込みAPIにリクエスト
| pipe.action(save_vectors) # ベクトルをDBに保存
| pipe.reduce(lambda x, y: x + 1, initial=0) # 処理件数をカウント
)
return count
このように、aiostreamを使うことで、一連の非同期処理を直感的に記述することができます。
特に、
点が特に強力であると言えるでしょう。
注意
aiostream は GPL3 ライセンスの元に公開されています。特にライブラリをソフトウェアに組み込んで頒布する場合は注意しましょう。