Advent Calendar8日目は昨日に引き続き黒枝です。本日は昨日の記事の後編です。
前編では非同期処理の基本的な背景を確認しました。本日の記事では、具体例を見ることで理解を深めていきましょう。
また、2つ目の具体例では実践的な例として、強力なライブラリであるpybottersをサンプルとして紹介させていただいています。pybottersはasync/await
を取り入れて、特にソケット通信周りをすっきりとロジックに組み込む事ができる優れたライブラリです。
pybotters製作者のまちゅけんさんによる非同期処理についての解説はこちら
おさらい
まずは昨日のおさらいとして、非同期処理の大事な部分を確認しておきます。
- asyncはイベントループに制御を返す宣言である。
- asyncは非同期処理(主にawaitを伴って定義された関数)の呼び出しに付与したときのみ働く。
それでは、具体例を見ていきましょう。
複数のループ
次のような3つのループを作成し、単純に呼び出します。記法がやや前回と異なっていますが、Nodejsではこのように変数として関数を定義することができます。Pythonで言うところのLambdaのようなものと思っていただくと良いかと思います。
const sleep = async (ms) => new Promise(r => setTimeout(r, ms))
const counts = Array(5)
const runLoopOne = async () => {
for (const c of counts) {
console.log('loop 1')
}
}
const runLoopTwo = async () => {
for (const c of counts) {
console.log('loop 2')
await sleep(1)
}
}
const runLoopThree = async () => {
for await (const c of counts) {
console.log('loop 3')
}
}
const runLoops = async () => {
runLoopOne()
runLoopTwo()
runLoopThree()
}
runLoops()
3つのループはasyncをつけて定義された関数で、内部でループ処理を実行します。どのループもawaitをつけずに呼び出しています。
1つ目のループは内部でasyncのついた関数を呼び出さず、ループを実行しています。
2つ目は内部でsleepを呼び出しています。
3つ目はforの呼び出しがfor awaitとなっています。
どのような結果になるか、まずは想像してみてください。どのループが適切にイベントループに制御を返すことが出来るでしょうか。
結果はこうなります。
なぜこのような結果になったのか、一つずつ確認していきましょう。
まずは1が全て処理されています。これは一度もイベントループに制御を返すことなく、関数が終了しています。
次に、一度だけ2が呼び出され、それから3のループが最後まで呼び出されています。そして最後に残った2が全て処理されています。
何が起きたのでしょうか。一見ではわかりづらかったかもしれませんが、実は2つ目のみが適切にイベントループに制御を返したがために、1と3が先に処理を終えてしまった、というだけです。
1つ目はasyncを使って定義されているだけの通常のループです。このように定義しても、非同期として処理しなければ普通の同期的な処理となります。
2は一般的な非同期処理を伴うループです。内部でループの終わりに短いsleepを行いますが、それが非同期関数かつawaitで呼び出されているため、その都度制御がイベントループに返されます。これは前回のhello関数とbye関数と同じ仕組みということになります。
ではなぜ3のfor awaitはawaitを使っているにも関わらずイベントループに制御を返さないのでしょうか。各ループの終わりに制御を返しても不思議ではないようにも見えます。
これは、awaitを使ってイベントループに制御を返すのは、あくまで非同期処理を呼び出したときのみである、という部分が働いています。for await (const c of counts)
のかっこ内は通常の処理であるため、このawaitは何もしません。結果的に1つ目のループと同じということです。
まとめると、awaitはあるスコープ内で、非同期処理として定義されている(主にasyncを使って定義する)処理を呼び出した場合、そのスコープ内では一見同期的に動作しますが、実際はawaitが付与された処理が終わるまではイベントループがその他の処理を進めている、ということになります。
ちなみにそれなら一体どういうときにfor awaitは役に立つのか、というもっともな疑問が湧いて来た方もおられるかと思います。これは例えば、ジェネレータという特殊なイテレータが非同期で機能しており、そのリターンを毎回待ちたいときなどに利用できます。
本稿では深く論じませんが、詳しく知りたい方はこちらなどを参照してください。
Pybottersのstore.wait
この章では、pybottersのサンプルでしばしば登場するawait store.wait()
の動作を見ることで、ボットで利用する際の具体的な活用方法を確認したいと思います。
この機能は、ソケットでデータが取得されてstoreという独自のレコード用のオブジェクトが更新されるのを待つ処理です。要するにawait store.wait()
することで、ソケットからデータが流れ込むまで制御をイベントループに預けます。
例1
async def main():
async with pybotters.Client(apis=apis) as client:
store = pybotters.BybitDataStore()
wstask = await client.ws_connect(
...
)
await store.wait()
while True:
# 以下メインロジック
orders = store.order.find() # 実際のデータはfindを呼ぶだけでいつでも取得できる。
...
await asincio.sleep(60) # ループを60秒待機。
上記は簡略化したコードとなりますが、最初にソケットのデータを保持するstoreを準備し、それからws_connectでソケットをつないでいます。
そしてロジックの中心を記述するwhileの直前でstore.wait
を呼んでいます。
なおソケットから取得したデータの前処理などは全て非同期で処理しており、我々はfindを呼び出すだけでソケット周りを意識することなくデータを参照できます。
ループは処理の最後でsleepを呼び出すことで、60秒待機してから次のループを再開します。
このstoreというのは、非同期で受信したソケットのデータを格納しておく自前のオブジェクトでした。今回はメインループの直前でawaitしていますね。
これはメインループが始まる前に、ソケットから最初のデータが流れ込むまで待つことを意図していったんイベントループに制御を返しています。つまり、ソケット周りの処理はそのまま動き続けて、データが入ってきたタイミングでメインループを開始します。
すなわちこれは一種の初期化処理ということになります。
例2
async def main():
async with pybotters.Client(apis=apis) as client:
store = pybotters.BybitDataStore()
wstask = await client.ws_connect(
...
)
while True:
await store.wait()
# 以下メインロジック
...
次の例に参りましょう。今回は、先程とは異なり、メインループの内部でawait store.wait()
を呼び出しています。
また、今回はasincio.sleepでループを最後に呼び出していません。これはいわゆるイベント駆動型のスタイルで、ソケットがデータを流し込むたびに起動するコードとなります。
ここも非同期で待機することをイベントループに宣言しているため、他の処理の邪魔をすることなく、ソケットがDataStoreの更新をしてくれるのを待つことになります。もしソケットが毎秒データを送ってくるなら毎秒作動しますし、1分、2分、10秒、など間隔がまちまちならそれに合わせて作動するということになります。
イベント駆動型というのはなんらかの処理をきっかけに反応するモデルで、非同期処理と相性の良い構造です。
例3
async def main():
async with pybotters.Client(apis=apis) as client:
store = pybotters.BybitDataStore()
wstask = await client.ws_connect(
...
)
while True:
await store.wait()
# 以下メインロジック
...
await asincio.sleep(1) # 1秒間待機。
何らかの高頻度で動作するボットをイメージしてください。このボットは1秒間隔でデータを精査し、何らかの処理を行います。
今回はメインループの内部でawait store.wait
を呼び出し、最後に1秒間待機しています...
...
このコードは誤りです。
何らかの目的があってstore.waitとsleepを併用しているのでない限り、おそらくこの実装はバグを生むでしょう。しかしながら、実際に動かしていてもしばらく意図しない動きに気づかないかもしれません。
先程の2つの例からわかるように、store.waitによりソケットのデータを受信するまでこのコードは待機します。一方で、最後に1秒間の待機は必ず行うため、ソケット受信までの時間+sleepの1秒間ずつこのループは待機することになります。そのため、本来の意図とは異なる間隔でループしてしまうのですが、仮にソケットの受信が概ね安定して1秒以下で流れ込んでいるような場合はバグに気づくのが遅れてしまうでしょう。
しかしasync/awaitとループの基本的な仕組みを理解していれば、こういった非同期処理を適切に使いこなせることと思います。
おわりに
PythonとNodejsで利用することのできる非同期処理について、その振る舞いを確認しました。非同期処理は扱いを間違えると不要な待機を増やしてしまいます。しかしシングルスレッドで複数の処理を捌くことで効率的にリソースを使えるため、有効に扱えば実用的なアプリケーションをコストを抑えて実装することができるのではないかと思います。
このような企画を催してくださったHohetoさん、そしてここまで読んでくださったみなさま、ありがとうございました。それではまたどこかで。
Happy Botter Life!