LoginSignup
0
2

More than 3 years have passed since last update.

python multiprocessing の使い方(続3) Poolをメンバに持つclass で apply_async

Last updated at Posted at 2020-04-19

目的

昨日からお勉強を続けていますが、ようやく使いたい形にまで近づけた気がするので、ここでまたメモしておきます。

  • 非同期にくる入力に対して、処理を個別のプロセスでブロックすることなく動かしたい
  • 終了をきちんととらえたい

ということを実現したかったです。昨日もメモを残しましたが、ここで一段落した感じです。

仕様と実装方針

仕様としては

  • 処理の入力を、パラメータを引数とするPush のメソドで呼ぶ。このメソドはブロックしない。
  • 内部では、仕掛中、終了の処理の記録を持っている
  • 終了を伝える必要があれば、それも実装できる

ということで、

  • class を一つ実装する。処理だけは個別の関数とする。(外のファイルにイメージ)
  • multiprocessing.Pool を利用する。これ以外に、apply_asyncのresult を管理する。
  • apply_async するときに設定するcallback関数もクラスのメソドにして、クラスのメンバであるqueue にアクセスできるよにする。
  • (これは良いかわからないが)内部では、job_id を用いて管理を行う。投入された処理の要求にunique な数字になるように実装する。

実装

こんな感じに書いてみた。

from multiprocessing import Pool
from time import sleep
from os import getpid, getppid, cpu_count
from datetime import datetime, time, date, timedelta
import sys, json

def _data_handler(o):
    """ json.dump で default=_data_handler として使う """
    if isinstance(o, (datetime, date) ):
        return o.strftime('%Y/%m/%d %H:%M:%S')
    elif hasattr(o, "isoformat"):
        return o.isoformat()
    else:
        return str(o)


class JobError(Exception):
    def __init__(self, job_id:int, pid:int, msg:str, error):
        self.job_id = job_id
        self.pid = pid
        self.msg = msg
        self.error = error


def f(*args, **kwargs):
    """ 指定された時間だけsleepする """
    try:
        print("[{}---{}] f(args {} kwargs={})".format(getpid(), getppid(), args, kwargs))
        t = kwargs["params"]["sleep_time"]
        if t == 0:
            raise Exception("Exception!! sleep time = 0")
        sleep(t)
        return {"f_answer": 0.0, "pid": getpid(), "job_id": kwargs["job_id"]}
    except Exception as e:
        raise JobError(kwargs["job_id"], getpid(), "Exception in except in f", e)


class JobController(object):

    def __init__(self, num_cpu:int=0):
        """
        使用するコアの数を指定します。0ならばOSが管理するコア
        """
        print("main pid={} cpu_count={}".format(getpid(), cpu_count()))
        num_cpu = cpu_count()
        self._pool = Pool(num_cpu)
        self._working_jobs = {} # job_id をkeyとする。しかかり中
        self._closed_jobs = {} # job_id を keyとする。終了
        self._new_job_id = 0 # 次に投入されたjob に使用する job_id

    def __del__(self):
        pass # ゾンビを残さないために本来は何かするべき(まだ分かっていない).


    def my_cb(self, *args):
        """ 正常終了したJob の結果をバッファに移動させる """
        print("callback args={} jobid={}".format(args, args[0]["job_id"]) )
        try:
            jobid = args[0]["job_id"]
            self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
            self._closed_jobs[jobid]["end_time"] = datetime.now()
            self._closed_jobs[jobid]["successful"] = True
            self._closed_jobs[jobid]["return"] = args
        except:
            pass
        if len(self._closed_jobs) == 0:
            self._pool.join()


    def my_err_cb(self, args):
        """ 異常終了したJob の結果をバッファに移動させる。args は JobError  """
        print("error callback args={} job_id={}".format(args, args.job_id) )
        try:
            jobid = args.job_id
            self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
            self._closed_jobs[jobid]["end_time"] = datetime.now()
            self._closed_jobs[jobid]["successful"] = False
            self._closed_jobs[jobid]["return"] = args
        except:
            pass
        if len(self._closed_jobs) == 0:
            self._pool.join()


    def PushJob(self, params:dict):
        """ Job を投下します。 引数はここでは辞書型のデータで与えます. """
        print("PushJob ", getpid(), getppid())
        res = self._pool.apply_async(f, args=(1,), kwds={"params":params, "job_id":self._new_job_id},
            callback=self.my_cb, error_callback=self.my_err_cb)
        self._working_jobs[self._new_job_id] = {"start_time": datetime.now(), "async_res": res}
        self._new_job_id += 1


    def GetCurrentWorkingJobCount(self):
        """ 仕掛中(投入したけれど終了していない)Jobの数 """
        return len(self._working_jobs)        


    def GetCurrentClosedJobCount(self):
        """ 終了したJobの数 """
        return len(self._closed_jobs)        


if __name__ == "__main__":
    try: 
        print("main pid = {} ppid={}".format(getpid(), getppid()))
        job_controller = JobController(0)
        # 0.5秒間隔でJob を投入します.
        for i in range(10):
            params = {"values": random.randn(3), "sleep_time": i % 7}
            job_controller.PushJob(params)
            sleep(0.5)
        # しかかり中のJobがなくなるまで、様子を出力します。
        while True:
            print("working_jobs {}:", job_controller.GetCurrentWorkingJobCount())
            print(json.dumps(job_controller._working_jobs, indent=2, default=_data_handler))
            print("closed_jobs {}:", job_controller.GetCurrentClosedJobCount())
            print(json.dumps(job_controller._closed_jobs, indent=2, default=_data_handler))
            if job_controller.GetCurrentWorkingJobCount() == 0:                
                break
            sleep(3)

    except:
       pass

結果

とりあえず想定通り動く。正常終了、異常終了ともにself._closd_jobs に積まれていき、最後は _working_jobs が0 になる。開始終了時刻も正しく記録されていた。

  • 情報の受け渡しとプロセスの投入については問題なさそう

という感じであった。

課題として、以下ががある。

  • 作業中のプロセスを途中で終了させる機能は未実装。
  • Pool はクリアのような作業をせずに、ずっと使い続けて大丈夫なのか?が気になる。
  • 途中でPoolで使用するコア数を変えられるのか。
  • Kubernetes で使ったときどうなるのかなぁ。

が、とりあえず仕事は乗り切れるかなぁ。。。とりあえず満足して、終了します。
(2020/04/19, 18:17)

追記

  • 実はThread を継承した自分の実装したクラスで使おうとしたら、
NotImplementedError: pool objects cannot be passed between processes or pickled

のようなエラーが出た。解決したのですが、自分は独自の型のデータ(GCP PubSub のメッセージ)を直接 **kwargs (辞書型のデータ)に入れていたためのようでした。Thread が関係するのかわかりません。
(2020/04/20)

  • apply_async した関数の中で再度Pool apply_async をしようとしたら AssertionError: daemonic processes are not allowed to have children と言われてしまいました。これへの対応が課題です。涙 (2020/05/13)
0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2