trioのopen_memory_channelを紹介します。
open_memory_channel
open_memory_channel()
はタスク間でオブジェクトを受け渡しするためのチャンネルを開きます。
具体的にはMemorySendChannelとMemoryReceiveChannelのインスタンスを返します。
import trio
async def sender(send_channel):
for i in range(3):
print(f"start sending {i}")
await send_channel.send(i)
await trio.sleep(0) # "end of sending"より先に受信側のメッセージが出るようにするためのsleep(通常は不要)
print(f"end of sending {i}")
async def receiver(receive_channel):
async for value in receive_channel:
print(f"receive {value}")
async def main():
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(sender, send_channel)
nursery.start_soon(receiver, receive_channel)
trio.run(main)
start sending 0
receive 0
end of sending 0
start sending 1
receive 1
end of sending 1
start sending 2
receive 2
end of sending 2
-
open_memory_channel(値)
に渡す値は最大バッファサイズです。これについては次セクションに書きます。 -
send_channel
はMemorySendChannelです。-
await send_channel.send(オブジェクト)
のようにすることでオブジェクトを送信する事ができます。
-
-
receive_channel
はMemoryReceiveChannelです。- 下記のようにすればsendで送信されるオブジェクトを受信する事ができます(受信するたびにループが回ります)。
async for value in receive_channel: ...
最大バッファサイズ
前述の通りopen_memory_channel(値)
の引数は最大バッファサイズでした。
この数だけawait send_channel.send(オブジェクト)
で送信されたオブジェクトを蓄える事ができます。
もし最大バッファサイズを超えるとawait send_channel.send(オブジェクト)
は受信を待ちます。
例えば上のサンプルで受信側に1秒待機する処理を置いてみます。
import trio
async def sender(send_channel):
for i in range(3):
print(f"start sending {i}")
await send_channel.send(i)
await trio.sleep(0) # "end of sending"より先に受信側のメッセージが出るようにするためのsleep(通常は不要)
print(f"end of sending {i}")
async def receiver(receive_channel):
await trio.sleep(1)
async for value in receive_channel:
print(f"receive {value}")
await trio.sleep(1)
async def main():
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(sender, send_channel)
nursery.start_soon(receiver, receive_channel)
trio.run(main)
start sending 0 ※ここで1秒止まる
receive 0
end of sending 0
start sending 1 ※ここで1秒止まる
receive 1
end of sending 1
start sending 2 ※ここで1秒止まる
receive 2
end of sending 2
最大バッファサイズが0であるため、受信側が受信するまでawait send_channel.send(i)
で止まっている事が分かります。
最大バッファサイズを1(trio.open_memory_channel(1)
)にすると次の結果になります。
start sending 0
end of sending 0
start sending 1 ※ここで1秒程度止まる
receive 0
end of sending 1
start sending 2 ※ここで1秒程度止まる
receive 1
end of sending 2 ※ここで1秒程度止まる
receive 2
最大バッファサイズが1であるため、受信側が受信しなくてもawait send_channel.send(i)
で止まりません。
ただし2回目のsendで止まっています。
1つ受信が終わると2回目のsendも完了し、3回目のsendで止まります。
最大バッファサイズを無限大(trio.open_memory_channel(math.inf)
)にすると次の結果になります。
start sending 0
end of sending 0
start sending 1
end of sending 1
start sending 2
end of sending 2
receive 0 ※ここで1秒止まる
receive 1 ※ここで1秒止まる
receive 2
受信側が受信しなくてもawait send_channel.send(i)
で止まりません。
ただし、これは通常悪い使い方です。
受信側が遅れるとバッファサイズが膨らんでいき、最終的にはメモリを使い果たすためです。
それ以前にレイテンシ(通信の遅延)も問題になりそうです。
詳しくはtrioのbuffering in channelsを参照ください。