目的
昨日からお勉強を続けていますが、ようやく使いたい形にまで近づけた気がするので、ここでまたメモしておきます。
- 非同期にくる入力に対して、処理を個別のプロセスでブロックすることなく動かしたい
- 終了をきちんととらえたい
ということを実現したかったです。昨日もメモを残しましたが、ここで一段落した感じです。
- python multiprocessing の使い方
- python multiprocessing の使い方(続) Pool編
- python multiprocessing の使い方(続3) Poolをメンバに持つclass で apply_async
仕様と実装方針
仕様としては
- 処理の入力を、パラメータを引数とするPush のメソドで呼ぶ。このメソドはブロックしない。
- 内部では、仕掛中、終了の処理の記録を持っている
- 終了を伝える必要があれば、それも実装できる
ということで、
- class を一つ実装する。処理だけは個別の関数とする。(外のファイルにイメージ)
-
multiprocessing.Pool
を利用する。これ以外に、apply_async
のresult を管理する。 - apply_async するときに設定するcallback関数もクラスのメソドにして、クラスのメンバであるqueue にアクセスできるよにする。
- (これは良いかわからないが)内部では、job_id を用いて管理を行う。投入された処理の要求にunique な数字になるように実装する。
実装
こんな感じに書いてみた。
from multiprocessing import Pool
from time import sleep
from os import getpid, getppid, cpu_count
from datetime import datetime, time, date, timedelta
import sys, json
def _data_handler(o):
""" json.dump で default=_data_handler として使う """
if isinstance(o, (datetime, date) ):
return o.strftime('%Y/%m/%d %H:%M:%S')
elif hasattr(o, "isoformat"):
return o.isoformat()
else:
return str(o)
class JobError(Exception):
def __init__(self, job_id:int, pid:int, msg:str, error):
self.job_id = job_id
self.pid = pid
self.msg = msg
self.error = error
def f(*args, **kwargs):
""" 指定された時間だけsleepする """
try:
print("[{}---{}] f(args {} kwargs={})".format(getpid(), getppid(), args, kwargs))
t = kwargs["params"]["sleep_time"]
if t == 0:
raise Exception("Exception!! sleep time = 0")
sleep(t)
return {"f_answer": 0.0, "pid": getpid(), "job_id": kwargs["job_id"]}
except Exception as e:
raise JobError(kwargs["job_id"], getpid(), "Exception in except in f", e)
class JobController(object):
def __init__(self, num_cpu:int=0):
"""
使用するコアの数を指定します。0ならばOSが管理するコア
"""
print("main pid={} cpu_count={}".format(getpid(), cpu_count()))
num_cpu = cpu_count()
self._pool = Pool(num_cpu)
self._working_jobs = {} # job_id をkeyとする。しかかり中
self._closed_jobs = {} # job_id を keyとする。終了
self._new_job_id = 0 # 次に投入されたjob に使用する job_id
def __del__(self):
pass # ゾンビを残さないために本来は何かするべき(まだ分かっていない).
def my_cb(self, *args):
""" 正常終了したJob の結果をバッファに移動させる """
print("callback args={} jobid={}".format(args, args[0]["job_id"]) )
try:
jobid = args[0]["job_id"]
self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
self._closed_jobs[jobid]["end_time"] = datetime.now()
self._closed_jobs[jobid]["successful"] = True
self._closed_jobs[jobid]["return"] = args
except:
pass
if len(self._closed_jobs) == 0:
self._pool.join()
def my_err_cb(self, args):
""" 異常終了したJob の結果をバッファに移動させる。args は JobError """
print("error callback args={} job_id={}".format(args, args.job_id) )
try:
jobid = args.job_id
self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
self._closed_jobs[jobid]["end_time"] = datetime.now()
self._closed_jobs[jobid]["successful"] = False
self._closed_jobs[jobid]["return"] = args
except:
pass
if len(self._closed_jobs) == 0:
self._pool.join()
def PushJob(self, params:dict):
""" Job を投下します。 引数はここでは辞書型のデータで与えます. """
print("PushJob ", getpid(), getppid())
res = self._pool.apply_async(f, args=(1,), kwds={"params":params, "job_id":self._new_job_id},
callback=self.my_cb, error_callback=self.my_err_cb)
self._working_jobs[self._new_job_id] = {"start_time": datetime.now(), "async_res": res}
self._new_job_id += 1
def GetCurrentWorkingJobCount(self):
""" 仕掛中(投入したけれど終了していない)Jobの数 """
return len(self._working_jobs)
def GetCurrentClosedJobCount(self):
""" 終了したJobの数 """
return len(self._closed_jobs)
if __name__ == "__main__":
try:
print("main pid = {} ppid={}".format(getpid(), getppid()))
job_controller = JobController(0)
# 0.5秒間隔でJob を投入します.
for i in range(10):
params = {"values": random.randn(3), "sleep_time": i % 7}
job_controller.PushJob(params)
sleep(0.5)
# しかかり中のJobがなくなるまで、様子を出力します。
while True:
print("working_jobs {}:", job_controller.GetCurrentWorkingJobCount())
print(json.dumps(job_controller._working_jobs, indent=2, default=_data_handler))
print("closed_jobs {}:", job_controller.GetCurrentClosedJobCount())
print(json.dumps(job_controller._closed_jobs, indent=2, default=_data_handler))
if job_controller.GetCurrentWorkingJobCount() == 0:
break
sleep(3)
except:
pass
結果
とりあえず想定通り動く。正常終了、異常終了ともにself._closd_jobs に積まれていき、最後は _working_jobs が0 になる。開始終了時刻も正しく記録されていた。
- 情報の受け渡しとプロセスの投入については問題なさそう
という感じであった。
課題として、以下ががある。
- 作業中のプロセスを途中で終了させる機能は未実装。
- Pool はクリアのような作業をせずに、ずっと使い続けて大丈夫なのか?が気になる。
- 途中でPoolで使用するコア数を変えられるのか。
- Kubernetes で使ったときどうなるのかなぁ。
が、とりあえず仕事は乗り切れるかなぁ。。。とりあえず満足して、終了します。
(2020/04/19, 18:17)
追記
- 実はThread を継承した自分の実装したクラスで使おうとしたら、
NotImplementedError: pool objects cannot be passed between processes or pickled
のようなエラーが出た。解決したのですが、自分は独自の型のデータ(GCP PubSub のメッセージ)を直接 **kwargs (辞書型のデータ)に入れていたためのようでした。Thread が関係するのかわかりません。
(2020/04/20)
- apply_async した関数の中で再度Pool apply_async をしようとしたら
AssertionError: daemonic processes are not allowed to have children
と言われてしまいました。これへの対応が課題です。涙 (2020/05/13)