9
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

python multiprocessing の使い方(続2) apply_async とcallback

Last updated at Posted at 2020-04-18

目的

その後を調査を続けています。おじさんが欲しい使い方は、ぽちぽちと時々やってくるパケットに対して処理をマルチプロセスで動かす、ということを使いたいでした。お客様パケットを待たせたくないのでマルチプロセスにしたいのです。

本日のこれまでの軌跡:

apply_async を使おうと思った理由

先のPool.map だど、終了するまでblock されるようなので、どうも新しいのがきたときどうしたものかというのが気がかりです。あと、ずっとお客さんがくるかとうかListenもしていたいです。

ブロックせずにPool で動かすのにapply_async があるようなので動かしてみました。

  • multiprocessing.Pool を用意する。(単にコア数を指定して初期化するだけだが。)
  • apply_async() に関数と引数を渡す。このとき正常終了(callback)、エラーで終了(error_callback)の関数をそれぞれ設定し、終了したことを取得できる。
  • apply_async の返り値であるmultiprocessing.pool.AsyncResultから、プロセスが終了したか( ready() )、成功したか( successfule() )を取得できる

実装例

で、下記を書いて動かしてみました。

from multiprocessing import Pool
from time import sleep
from os import getpid, getppid, cpu_count

def my_err_cb(*args):
    print("error callback args={}".format(args))

def my_cb(*args):
    print("callback {}".format(args))

def f(*args, **kwargs):
    print("[{}---{}] args {} kwargs={}".format(getpid(), getppid(), args, kwargs))
    y = args[0]
    print("y=", y)        
    try:
        sleep(2)
        if y > 5:
            raise Exception("Exception From try of f {}".format( getpid()))
        return(y)
    except Exception as e:     
        raise Exception(e, "Exception in except of f {}".format(getpid()))

if __name__ == "__main__":
    print("main pid={} cpu_count={}".format(getpid(), cpu_count()))
    with Pool(3) as p:
        results_pool = []
        for arg in  range(0, 10):
            print(arg)
            result = p.apply_async(f, args=(arg,), kwds={"key":1}, callback=my_cb, error_callback=my_err_cb)
            results_pool.append(result)
        #for r in results_pool: print(r.get())
 
        while True:
            num = [ r.ready() for r in results_pool].count(True)
            print("ready count = {}".format(num))
            for r in results_pool:
                if r.ready():
                    print(r.successful())
            sleep(3)
            if num == len(results_pool):
                break

f の中で強引にException を発生させています。
実行してみると、最初は(y<=5 なら)正常終了します。

main pid=1075 cpu_count=8
ready count = 0
[1083---1075] args (0,) kwargs={'key': 1}
[1084---1075] args (1,) kwargs={'key': 1}
[1085---1075] args (2,) kwargs={'key': 1}
[1083---1075] args (3,) kwargs={'key': 1}
callback args=(0,)
[1084---1075] args (4,) kwargs={'key': 1}
[1085---1075] args (5,) kwargs={'key': 1}
callback args=(1,)

最後は

error callback args=(Exception(Exception('Exception From try of f pid=1085 args=(8,)',), 'Exception in except of f 1085'),) {}
error callback args=(Exception(Exception('Exception From try of f pid=1083 args=(9,)',), 'Exception in except of f 1083'),) {}
ready count = 10
True
True
True
True
True
True
False
False
False
False

となりました。ここから

  • 正常終了したときのcallback の引数は返り値が引数なっているらしい
  • エラーで終了したときのcallback 関数の引数は、raise したexception が引数になる

ようだと思いました。この辺り、常識なのかもしれないですが、文書で確認できませんでした。

なお、自分が動かした経験では apply_async の返り値に対して、

  • getを引数(Timeout)の設定なしで使うと、終了するまで待てる。(blockされる)
  • successful() を聞けるのは ready()がTrueである (つまりプロセスが終了している)ときのみ。さもなくばエラー。
    でした。

参考情報

まだまだ分からんこともあるが。これをクラスにできるかな。今日は調子にのって3回も投稿してしまったぞなもし。
(2020/04/18 23:53)

9
7
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?