2018年は私にとって衝撃でした。よくGUI FrameworkのKivyを使って遊んでいたのですがgeneratorを用いる事でcallback関数だらけの醜いcodeを驚くほど読みやすくできると知ったからです。そして色々と試している内にそれまで私にとって得体の知れない魔法であったasync/awaitによる非同期処理が理解できるようになり、ちょっとした非同期処理libraryを作れるようにもなりました。この記事では今更ですが
- generatorとそこから生まれたnative coroutineの素晴らしさに私が気付くまでの過程と
- それを用いてKivyとtkinterでasync/awaitを実現する過程を
書き綴りたいと思います。(文章量を減らすため、以後はgeneratorはgenと、coroutineはcoroと略す。)
genの秘めたる力
値を生み出す物としてのgen
多くの入門書ではgenは何らかの値を生み出す物として紹介されていると思います。
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, b+a
for i in fibonacci():
print(i, end=' ')
import time; time.sleep(.1)
0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 ...
"generator"という名前である事を考えると元々はそういう目的だったのかもしれません。実際2018年までは私もそういう物としか見ていなかったし、上のようにfor-in
で値を取り出す場合がほとんどだった私にとってその概念から抜け出すのは難しかったと思います。
for-inを使わずに値を取り出す
入門書によってはsend()
を使ってgenから値を取り出す例も載せています。
gen = fibonacci()
print(gen.send(None))
print(gen.send(None))
print('ちょっと休憩')
import time;time.sleep(1)
print('休憩おわり')
print(gen.send(None))
print(gen.send(None))
0
1
ちょっと休憩
休憩おわり
1
2
このほうがgenの"秘めたる力"が垣間見えていると思いますが私はこの段階でもそれに気付きませんでした。でも次の例はどうでしょうか?
何も生まないgen
def sub_task():
print('sub: 処理1')
yield
print('sub: 処理2')
yield
print('sub: 処理3')
def main_task():
gen = sub_task()
try:
print('main: 処理1')
gen.send(None)
print('main: 処理2')
gen.send(None)
print('main: 処理3')
gen.send(None)
except StopIteration:
pass
main_task()
main: 処理1
sub: 処理1
main: 処理2
sub: 処理2
main: 処理3
sub: 処理3
gen.send()
とyield
を切り替え地点として両taskが少しずつ進んでいるのが分かります。これは並行処理そのものじゃないか!
秘めたる力
これがgenの"秘めたる力"でした。genはつまりは
- 一時停止可能で...(
yield
した所で停まり、gen.send()
により動き出す) - 停止地点で利用者側と通信が可能な...(gen側からは
yield
で、利用者側からはgen.send()
で値を送れる)
関数のような物で、その停止可能という特徴ゆえ並行処理をmulti threadingに頼らずに行えるのでした。
genへの期待
この事がcallback関数だらけの醜いcodeに苦しめられていた私に希望を抱かせました。というのも例えばKivyで次のような事をしたい時
def some_task():
print('処理1')
buttonが押されるまで待つ
print('処理2')
1秒待つ
print('処理3')
実際のcodeは
from kivy.clock import Clock
def some_task(button):
print('処理1')
def callback2(button):
button.unbind(on_press=callback2)
print('処理2')
Clock.schedule_once(callback3, 1)
def callback3(__):
print('処理3')
button.bind(on_press=callback2)
と目も当てられない醜さになります。何かを待つ必要がある
=> 何かが起きるまで処理を停める必要がある
=> 続く処理は別の関数に切り分ける必要がある
なわけです。でも先ほど出てきたsub_task()
を思い出して欲しいのですが
def sub_task():
print('sub: 処理1')
yield
print('sub: 処理2')
yield
print('sub: 処理3')
同じように途中で停まっていたにも拘らずcallback関数はどこにも現れていません。だから私はgenを使えばKivyでもcallback関数を無くせるんじゃないのかと考えるようになりました。
callback関数を無くせ(Kivy編)
これからその方法を考えるのですが、先に言っておきたいのが現在のKivyのmaster branchでは既にasyncioやtrioを用いた本格的な非同期programmingができるようになっているのでここでやる事は車輪の再発明だという事です。ただ当時はそうでは無かったのと単純にgenへの関心が強かったので自前で何とかする道を選びました。
一定時間停止
一旦buttonの事は置いておいてgenから数値が送られてきたらその分の時間だけgenを停める機能を実現しようと考えました。理由はたまたま見たBeeWareの動画でそのような事をやっていて格好良いなと思ったからです。
def some_task():
print('処理1')
yield 2 # 2秒待つ
print('処理2')
yield 1 # 1秒待つ
print('処理3')
上記のようなgenを期待通りに動かす方法を考えます。私は今の所
- Kivyで一定時間後に関数を呼んで欲しいときには
Clock.schedule_once()
を用いる - genを再開するには
gen.send()
を呼ぶ必要がある
事を知っています。じゃあyieldで停まっているgenの.send()
をClock.schedule_once()
に渡したcallback関数から呼んであげると良いのでは...
from kivy.clock import Clock
from kivy.app import App
from kivy.uix.widget import Widget
def start_gen(gen):
def step_gen(dt):
try:
Clock.schedule_once(step_gen, gen.send(None)) # C
except StopIteration:
pass
step_gen(None) # B
def some_task():
print('処理1')
yield 1 # D
print('処理2')
yield 2 # E
print('処理3')
class SampleApp(App):
def build(self):
return Widget()
def on_start(self):
start_gen(some_task()) # A
if __name__ == '__main__':
SampleApp().run()
正解でした。このcodeは以下のように動作します。
- アプリ開始時にgenが作られそれが
start_gen()
に渡される (A行) -
start_gen()
は直ちにstep_gen()
を呼ぶ (B行) -
step_gen()
はgen.send()
を呼ぶのでgenが動き出す (C行) - genは最初のyield式まで進んだ所で停まり1を送る (D行)
-
gen.send(None)
の評価結果が1になるのでstep_gen()
は自分自身が1秒後に再び呼ばれるように予約する事になる (C行) - これ以上する事が無いのでkivyのevent loopに処理が戻る
- 一秒後に
step_gen()
が呼ばれgen.send()
が呼ばれるのでgenは前回停まった位置から動き出す。 (C行) - genは二つ目のyield式まで進んだ所で停まり2を送る (E行)
- (以下省略)
たった7行の関数(start_gen
)を用意しただけでcallback関数を用いずに時間待機できるようになったことは衝撃でした。やる気を出した私はどんどんこれを改良していく事になります。
callback関数に渡された値を利用する
Clock.schedule_once()
に渡したcallback関数には実際に経過した時間が渡されます。せっかくなのでそれをsome_task()
側が受け取れるようにしました。やり方はstart_gen()
内のgen.send(None)
の部分をgen.send(dt)
に変えるだけです。これでsome_task()
側は以下のように実際の経過時間を得られるようになりました(code全体)。
def some_task():
print('処理1')
s = yield 1
print(f"1秒の停止を求めたところ、実際には{s:.03f}秒停止した")
print('処理2')
s = yield 2
print(f"2秒の停止を求めたところ、実際には{s:.03f}秒停止した")
print('処理3')
処理1
1秒の停止を求めたところ、実際には1.089秒停止した
処理2
2秒の停止を求めたところ、実際には2.003秒停止した
処理3
eventの待機
次はeventの待機で、gen側が以下のように書ければ理想的です。
def some_task(button):
print('処理1')
yield event(button, 'on_press') # buttonが押されるまで待つ
print('処理2')
eventの場合は結びつけたcallback関数を解く作業が要るためちょっと複雑にはなりましたが要領は同じで、event用のcallback関数からgenを再開する関数を呼ぶ事で実現できました。
def start_gen(gen):
def step_gen(*args, **kwargs):
try:
gen.send((args, kwargs, ))(step_gen)
except StopIteration:
pass
try:
gen.send(None)(step_gen)
except StopIteration:
pass
def event(ed, name):
bind_id = None
step_gen = None
def bind(step_gen_):
nonlocal bind_id, step_gen
bind_id = ed.fbind(name, callback) # callback関数を結びつける
assert bind_id > 0 # bindingに成功したか確認
step_gen = step_gen_
def callback(*args, **kwargs):
ed.unbind_uid(name, bind_id) # callback関数を解く
step_gen(*args, **kwargs) # genを再開
return bind
時間停止の時と大きく違うのはkivyに関わる処理を全てevent()
内に隠せたことです。おかげでstart_gen()
は一切kivyに依存しておらず、genから送られて来たcallableにstep_gen
を渡すだけという単純な処理で済んでいます。
汎用化
上の設計はとても良い気がするので時間停止もevent待機に倣ってkivyに関わる処理をstart_gen()
から除いて別の関数内に隠しました。
def sleep(duration):
return lambda step_gen: Clock.schedule_once(step_gen, duration)
これでsleep()
とevent()
を混ぜて使えるようになりました。
def some_task(button):
yield event(button, 'on_press') # buttonが押されるまで待つ
button.text = 'Pressed'
yield sleep(1) # 1秒待つ
button.text = 'Bye'
genがどのように再開するかはgenが送るcallableに完全に委ねられているので例えば以下のような物を送れば
def sleep_forever(step_gen):
return None
def some_task():
yield sleep_forever # 永遠に待つ
再開させない事も可能になっています。
threadを待機
さらに汎用性を確かめるためにKivyとは関係無いものも扱ってみました、threadです。
def thread(func, *args, **kwargs):
from threading import Thread
return_value = None
is_finished = False
def wrapper(*args, **kwargs):
nonlocal return_value, is_finished
return_value = func(*args, **kwargs)
is_finished = True
Thread(target=wrapper, args=args, kwargs=kwargs).start()
while not is_finished:
yield sleep(3)
return return_value
threadが終了しているかどうか定期的に見回るという鈍臭いやり方になってしまいましたが、これで渡された関数を別のthread上で実行してその終了をgen側が待てるようになりました。
def 重たい処理():
import time
for i in range(5):
time.sleep(1)
print(i)
class SampleApp(App):
def on_start(self):
start_gen(self.some_task())
def some_task(self):
button = self.root
button.text = 'start heavy task'
yield event(button, 'on_press') # buttonが押されるまで待つ
button.text = 'running...'
yield from thread(重たい処理) # 別thread上で '重たい処理' を実行し、その終了を待つ
button.text = 'done'
yieldとyield from、どっち?
ここで順調なようですがいくつか問題も見えてきました。一つは待つ対象が何であるかによってyieldとyield fromを使い分けないといけない事です。(sleep()
とevent()
がyield、thread()
がyield from)。しかもこれは実装に依存していて、もし仮にthreading.Thread
がthread終了をcallback関数で伝えてくれる仕組みを持っていたならthread()
もyieldで待てるように実装できていました。このように使い方が異なるのは良くない事なのでどちらかに統一する事にしました。
どちらに統一するかですが選択肢はyield from
しかないと思います。何故ならyieldで待てるものをyield fromで待てるようにするのは簡単ですがその逆はできるとは限らないからです。例えば
def some_gen():
yield 1
の1
は
def some_gen():
yield from one()
def one():
yield 1
とする事でyield from
で待てますが
def some_gen():
yield from another_gen()
def another_gen():
yield 1
yield 4
のanother_gen()
は多分yield another_gen()
で待てるようにはできないです。
yield fromへ統一
というわけでsleep()
とevent()
をyield fromで待てるように書き直しました。
def sleep(duration):
return (yield lambda step_coro: Clock.schedule_once(step_gen, duration))
def event(ed, name):
# 略
return (yield bind)
これで利用者側はyieldとyield fromを使い分けずに済むようになりました。
# 常にyield from
def some_task():
yield from sleep(2)
yield from event(button, 'on_press')
yield from thread(heavy_task)
callback関数に渡された引数の利用に関する問題
もう一つの問題がこれで、gen側は元々は確か以下のようにしてcallback関数に渡された引数を利用していたのでした。
def some_task():
s = yield 1
print(f"1秒の停止を求めたところ、実際には{s:.03f}秒停止した")
それが今どうなっているのかというと実は
def some_task():
args, kwargs = yield from sleep(1)
s = args[0]
print(f"1秒の停止を求めたところ、実際には{s:.03f}秒停止した")
となっていて、必要な値を取り出すまでが面倒くさくなっています。これはstep_gen()
があらゆる引数を受け取れるように仮引数をdef step_gen(*args, **kwargs):
としたせいです。でも幸い今しがたyield fromへの統一を行ったおかげでこのような処理はsleep()
側で済ませられます。
def sleep(duration):
args, kwargs = yield lambda step_coro: Clock.schedule_once(step_coro, duration)
return args[0]
これで利用者側は
def some_task():
s = yield from sleep(1)
print(f"1秒の停止を求めたところ、実際には{s:.03f}秒停止した")
で済むようになりました。
genの歴史とコルーチン
以上のようにgenがyield式で [自身がその後どのように再開するかを記した物] を送る事でcallback関数に依らない並行処理が可能と分かりました。(「並行処理?」と思う人も居るかもしれませんがKivyを実行している以上はKivyのevent loopという処理が既にあるので、それとsome_task()
の並行処理になっています)。genはyield式に達した時のみ停まり、逆にそうでない時は停まりません。プログラミング業界ではこのような並行処理の事を"協調的マルチタスク"と呼び、このようなgenを"コルーチン"と呼ぶようです。ここで言っているのプログラミング一般用語のコルーチンであり、Python言語のcoroとの混同を避けるために片仮名で表すことにします。なおcoroもコルーチンです。
genは最初からコルーチンとして使えたわけではないようです。yieldが文ではなく式になってgen.send()
が実装されたのは(つまり利用者側から値を送れるようになったのは)PEP342ですし、yield from
が実装されたのはPEP380です。なのでgenの潜在能力に気付いた人々によって徐々にコルーチン化していったんじゃないかと思います。そしてPEP492ではコルーチンを作るための専用の構文が加わりました。
async def async_fn():
await something
async def
で定義された関数はasync関数と呼び、戻り値は必ず"native coro"というコルーチンになります(PEP525でasync genが登場してからはそうとは限らない。async def
で始まる関数はasync関数かasync gen関数のどちらかであり、それぞれnative coroとasync genを返す)。対してコルーチンとして用いられているgenの方は"gen-based coro"と呼ばれているようなので、これから両者を区別をしない時はcoro、する時はそのそれぞれの呼び名を用いる事にします。
疑問は何故genが既にコルーチンとしての能力を有しているのに別のコルーチンが要ったのかです。後で分かった事なのですがnative coroとgen-based coroの内部動作は瓜二つです。なぜ似たものを別の構文で作れるようにしたのでしょうか?PEP492には理由として
- genをコルーチンとして使っている状況だとそれが(冒頭の
fibonacci
のような)普通のgen関数なのかそれともコルーチン目的なのかが一目では分からず混乱を招く。 - 関数の中身に
yield
があって初めてgen関数になるという構文が不親切。新しい構文では関数がasync def
で始まっていれば中身を見ずともコルーチン(native coro)を返す事が分かる。 - 文法上
yield
が許される場所でしか非同期呼び出しを行えないという制約が辛い。(async for
やasync with
相当の機能の実現が難しい)。
のような事が書かれていました。読むに専用構文の導入は必要な事だったようですが当時の私がこれらを理解していたわけではなく、新しい物を用意したからには何らかの理由があるんだろうぐらいの気持ちで手を伸ばしました。
async/await構文への対応
まずsleep()
とevent()
のようなfromの付かないyieldを含むgen関数は@types.coroutine
を付けました。これはこれらのようなreturn文を含むgen関数をasync関数として書き直す方法が分からなかったからです。(async関数がyield式を持ち、なおかつreturn文で値を返しているとasync gen関数と見做されてしまい構文errorになる: SyntaxError: 'return' with value in async generator
)
import types
@types.coroutine
def sleep(duration):
# 略
@types.coroutine
def event(ed, name):
# 略
対してthread()
やsome_task()
は
-
yield from
をawait
に -
def
をasync def
に
置き換えることで純async関数として書き直す事ができました。
async def thread(func, *args, **kwargs):
# 略
while not is_finished:
await sleep(3)
# 略
class SampleApp(App):
async def some_task(self):
# 略
await event(button, 'on_press')
button.text = 'running...'
await thread(heavy_task)
button.text = 'done'
最後に識別子に含まれるgen
という文字列をcoro
に置き換えて完成。
後は
- 完了を待ちたければ
await another_task()
、 - 待たずに並行させたければ
start_coro(another_task())
という風に使ってあげるだけです。
callback関数を無くせ(tkinter編)
続いてtkinterでも同じ事を試みたところ、要領はkivyと全く同じ(callback関数としてgen/coroを再開する関数を渡す)だったので滞りなくいきました。
時間待機(sleep)
Kivyの時と違うのは、Kivyの場合はkivy.clock.Clock
という唯一のobjectを使うのに対しtkinterでは各widgetの.after()
というmethodを使うことです。なので停止したい時間の他にどのwidgetの.after()
を呼ぶのかを指定する必要があります。
@types.coroutine
def sleep(widget, duration):
yield lambda step_coro: widget.after(duration, step_coro)
ということはsleep()
を内部で使っているthread()
にもwidgetを渡す必要があります。
async def thread(func, *, watcher):
# 略
while not is_finished:
await sleep(watcher, 3000) # 3000ms停止
return return_value
eventの待機
次はeventです。実装の前に実はtkinterのunbind()
には不具合があるようなのでlink先の情報を頼りに以下のように修正しました。
def _new_unbind(self, sequence, funcid=None):
if not funcid:
self.tk.call('bind', self._w, sequence, '')
return
func_callbacks = self.tk.call('bind', self._w, sequence, None).split('\n')
new_callbacks = [l for l in func_callbacks if l[6:6 + len(funcid)] != funcid]
self.tk.call('bind', self._w, sequence, '\n'.join(new_callbacks))
self.deletecommand(funcid)
def patch_unbind():
from tkinter import Misc
Misc.unbind = _new_unbind
patch_unbind()
を呼ぶことによって修正後のunbind()
に置き換えられます。問題はいつ呼ぶかですが、一応tkinter自体を改変してるわけですから勝手には行わない方が良い気がします。なので利用者側に明示的に呼んでもらう事にしました。そしてevent()
の実装は
@types.coroutine
def event(widget, name):
bind_id = None
step_coro = None
def bind(step_coro_):
nonlocal bind_id, step_coro
bind_id = widget.bind(name, callback, '+')
step_coro = step_coro_
def callback(*args, **kwargs):
widget.unbind(name, bind_id)
step_coro(*args, **kwargs)
return (yield bind)[0][0]
となりました。
使い方
# あらかじめasynctkinterをinstallしておく
# pip install asynctkinter
from tkinter import Tk, Label
import asynctkinter as at
at.patch_unbind() # unbind()のbugを修正
def heavy_task():
import time
for i in range(5):
time.sleep(1)
print('heavy task:', i)
root = Tk()
label = Label(root, text='Hello', font=('', 60))
label.pack()
async def some_task(label):
label['text'] = 'start heavy task'
event = await at.event(label, '<Button>') # labelが押されるのを待つ
print(event.x, event.y)
label['text'] = 'running...'
await at.thread(heavy_task, watcher=label) # 別thread上でheavy_task()を実行し、その終了を待つ
label['text'] = 'done'
await at.sleep(label, 2000) # 2秒待つ
label['text'] = 'close the window'
at.start(some_task(label))
root.mainloop()
おわりに
genを並行処理に用いる考え方は10年程前には既にあったようで知っている人も多かったかも知れませんが私にとっては1~2年前の新鮮な知識であり衝撃だったのでこうやって記事にするに至りました。おそらくですがKivyやtkinterのようにlibrary側がevent loopを実装している場合は今回のようなやり方で、pygameのように利用者側にevent loopの実装を委ねている場合はevent loopをasyncio
やtrio
上のいちtaskとして実装してしまえば基本なんにでもasync/awaitを導入できると思います。さようなら醜いcallback関数。
link集
- asynckivy ... 今回作った物の最新版
- asynctkinter ... 今回作った物の最新版
- So you want to be a Python expert? ... 私がgenの偉大さに気付くきっかけとなった動画。genやcoro以外にも素晴らしい物ばかり
- Fluent Python ... 私が知る日本語文献の中では一番gen/coroに関して詳しく書かれている。使っているpythonのversionが低いのが玉に瑕。現在(2023/02/05)は第二版が出ており その和訳が待ち遠しい。