LoginSignup
0
0

More than 1 year has passed since last update.

trioによる並行処理⑦(open_memory_channel)

Posted at

目次

trioopen_memory_channelを紹介します。

open_memory_channel

open_memory_channel()はタスク間でオブジェクトを受け渡しするためのチャンネルを開きます。
具体的にはMemorySendChannelMemoryReceiveChannelのインスタンスを返します。

サンプル
import trio

async def sender(send_channel):
    for i in range(3):
        print(f"start sending {i}")
        await send_channel.send(i)
        await trio.sleep(0)  # "end of sending"より先に受信側のメッセージが出るようにするためのsleep(通常は不要)
        print(f"end of sending {i}")

async def receiver(receive_channel):
    async for value in receive_channel:
        print(f"receive {value}")

async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        nursery.start_soon(sender, send_channel)
        nursery.start_soon(receiver, receive_channel)

trio.run(main)
実行結果
start sending 0
receive 0
end of sending 0
start sending 1
receive 1
end of sending 1
start sending 2
receive 2
end of sending 2
  • open_memory_channel(値)に渡す値は最大バッファサイズです。これについては次セクションに書きます。
  • send_channelMemorySendChannelです。
    • await send_channel.send(オブジェクト)のようにすることでオブジェクトを送信する事ができます。
  • receive_channelMemoryReceiveChannelです。
    • 下記のようにすればsendで送信されるオブジェクトを受信する事ができます(受信するたびにループが回ります)。
    async for value in receive_channel:
       ...
    

最大バッファサイズ

前述の通りopen_memory_channel(値)の引数は最大バッファサイズでした。
この数だけawait send_channel.send(オブジェクト)で送信されたオブジェクトを蓄える事ができます。
もし最大バッファサイズを超えるとawait send_channel.send(オブジェクト)は受信を待ちます。
例えば上のサンプルで受信側に1秒待機する処理を置いてみます。

サンプル
import trio

async def sender(send_channel):
    for i in range(3):
        print(f"start sending {i}")
        await send_channel.send(i)
        await trio.sleep(0)  # "end of sending"より先に受信側のメッセージが出るようにするためのsleep(通常は不要)
        print(f"end of sending {i}")

async def receiver(receive_channel):
    await trio.sleep(1)
    async for value in receive_channel:
        print(f"receive {value}")
        await trio.sleep(1)

async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        nursery.start_soon(sender, send_channel)
        nursery.start_soon(receiver, receive_channel)

trio.run(main)
実行結果
start sending 0  ※ここで1秒止まる
receive 0
end of sending 0
start sending 1  ※ここで1秒止まる
receive 1
end of sending 1
start sending 2  ※ここで1秒止まる
receive 2
end of sending 2

最大バッファサイズが0であるため、受信側が受信するまでawait send_channel.send(i)で止まっている事が分かります。

最大バッファサイズを1(trio.open_memory_channel(1))にすると次の結果になります。

実行結果
start sending 0
end of sending 0
start sending 1  ※ここで1秒程度止まる
receive 0
end of sending 1
start sending 2  ※ここで1秒程度止まる
receive 1
end of sending 2  ※ここで1秒程度止まる
receive 2

最大バッファサイズが1であるため、受信側が受信しなくてもawait send_channel.send(i)で止まりません。
ただし2回目のsendで止まっています。
1つ受信が終わると2回目のsendも完了し、3回目のsendで止まります。

最大バッファサイズを無限大(trio.open_memory_channel(math.inf))にすると次の結果になります。

実行結果
start sending 0
end of sending 0
start sending 1
end of sending 1
start sending 2
end of sending 2
receive 0  ※ここで1秒止まる
receive 1  ※ここで1秒止まる
receive 2

受信側が受信しなくてもawait send_channel.send(i)で止まりません。
ただし、これは通常悪い使い方です。
受信側が遅れるとバッファサイズが膨らんでいき、最終的にはメモリを使い果たすためです。
それ以前にレイテンシ(通信の遅延)も問題になりそうです。
詳しくはtrioのbuffering in channelsを参照ください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0