目的
その後を調査を続けています。おじさんが欲しい使い方は、ぽちぽちと時々やってくるパケットに対して処理をマルチプロセスで動かす、ということを使いたいでした。お客様パケットを待たせたくないのでマルチプロセスにしたいのです。
本日のこれまでの軌跡:
apply_async を使おうと思った理由
先のPool.map だど、終了するまでblock されるようなので、どうも新しいのがきたときどうしたものかというのが気がかりです。あと、ずっとお客さんがくるかとうかListenもしていたいです。
ブロックせずにPool で動かすのにapply_async があるようなので動かしてみました。
-
multiprocessing.Pool
を用意する。(単にコア数を指定して初期化するだけだが。) -
apply_async()
に関数と引数を渡す。このとき正常終了(callback)、エラーで終了(error_callback)の関数をそれぞれ設定し、終了したことを取得できる。 - apply_async の返り値である
multiprocessing.pool.AsyncResult
から、プロセスが終了したか(ready()
)、成功したか(successfule()
)を取得できる
実装例
で、下記を書いて動かしてみました。
from multiprocessing import Pool
from time import sleep
from os import getpid, getppid, cpu_count
def my_err_cb(*args):
print("error callback args={}".format(args))
def my_cb(*args):
print("callback {}".format(args))
def f(*args, **kwargs):
print("[{}---{}] args {} kwargs={}".format(getpid(), getppid(), args, kwargs))
y = args[0]
print("y=", y)
try:
sleep(2)
if y > 5:
raise Exception("Exception From try of f {}".format( getpid()))
return(y)
except Exception as e:
raise Exception(e, "Exception in except of f {}".format(getpid()))
if __name__ == "__main__":
print("main pid={} cpu_count={}".format(getpid(), cpu_count()))
with Pool(3) as p:
results_pool = []
for arg in range(0, 10):
print(arg)
result = p.apply_async(f, args=(arg,), kwds={"key":1}, callback=my_cb, error_callback=my_err_cb)
results_pool.append(result)
#for r in results_pool: print(r.get())
while True:
num = [ r.ready() for r in results_pool].count(True)
print("ready count = {}".format(num))
for r in results_pool:
if r.ready():
print(r.successful())
sleep(3)
if num == len(results_pool):
break
f の中で強引にException を発生させています。
実行してみると、最初は(y<=5 なら)正常終了します。
main pid=1075 cpu_count=8
ready count = 0
[1083---1075] args (0,) kwargs={'key': 1}
[1084---1075] args (1,) kwargs={'key': 1}
[1085---1075] args (2,) kwargs={'key': 1}
[1083---1075] args (3,) kwargs={'key': 1}
callback args=(0,)
[1084---1075] args (4,) kwargs={'key': 1}
[1085---1075] args (5,) kwargs={'key': 1}
callback args=(1,)
最後は
error callback args=(Exception(Exception('Exception From try of f pid=1085 args=(8,)',), 'Exception in except of f 1085'),) {}
error callback args=(Exception(Exception('Exception From try of f pid=1083 args=(9,)',), 'Exception in except of f 1083'),) {}
ready count = 10
True
True
True
True
True
True
False
False
False
False
となりました。ここから
- 正常終了したときのcallback の引数は返り値が引数なっているらしい
- エラーで終了したときのcallback 関数の引数は、raise したexception が引数になる
ようだと思いました。この辺り、常識なのかもしれないですが、文書で確認できませんでした。
なお、自分が動かした経験では apply_async
の返り値に対して、
- getを引数(Timeout)の設定なしで使うと、終了するまで待てる。(blockされる)
- successful() を聞けるのは ready()がTrueである (つまりプロセスが終了している)ときのみ。さもなくばエラー。
でした。
参考情報
- なんといってもドキュメント multiprocessing --- プロセスベースの並列処理
まだまだ分からんこともあるが。これをクラスにできるかな。今日は調子にのって3回も投稿してしまったぞなもし。
(2020/04/18 23:53)