以前、multiprocessingモジュールのpool周りではまってしまった。
その際にconcurrentのProcessPoolExecutorと動作比較などを行うために作成した雛形が便利そうだったので、残しておく。
雛形と使い方
gnuのmakeでmake mulutiprocess_test
すれば、動作比較できる。
$ tree
.
├── Dockerfile
├── Makefile
├── concurrent_test.py
├── multiprocess_starmap_async_test.py
└── multiprocess_starmap_test.py
Makefile
mulutiprocess_test:
docker build . -t python37
- docker run --rm -it -v `pwd`:/usr/src python37 concurrent_test.py
- docker run --rm -it -v `pwd`:/usr/src python37 multiprocess_starmap_test.py
- docker run --rm -it -v `pwd`:/usr/src python37 multiprocess_starmap_async_test.py
Dockerfile
FROM python:3.7-stretch
WORKDIR /usr/src/
ENTRYPOINT ["/usr/local/bin/python3"]
CMD ["default.py"]
concurrent_test.py
import time
from concurrent.futures.process import ProcessPoolExecutor
from logging import getLogger, basicConfig, DEBUG
basicConfig(level=DEBUG)
logger = getLogger(__name__)
def task(param1, param2):
logger.debug(param1)
time.sleep(3)
logger.debug(param2)
return True
def error_task(param1, param2):
logger.debug(param1)
time.sleep(3)
logger.debug(param2)
raise Exception('例外だよ!')
def wrapper(args):
return task(*args)
def error_wrapper(args):
return error_task(*args)
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=2) as executor:
for result in executor.map(wrapper, [['1', '2'], ['3', '4']]):
logger.debug(result)
with ProcessPoolExecutor(max_workers=2) as executor:
for result in executor.map(error_wrapper, [['5', '6'], ['7', '8']]):
logger.debug(result)
multiprocess_starmap_test.py
import multiprocessing as mp
import time
from logging import getLogger, basicConfig, DEBUG
basicConfig(level=DEBUG)
logger = getLogger(__name__)
def task(param1, param2):
logger.debug(param1)
time.sleep(3)
logger.debug(param2)
return True
def error_task(param1, param2):
logger.debug(param1)
time.sleep(3)
logger.debug(param2)
raise Exception('例外だよ!')
if __name__ == '__main__':
with mp.Pool(2) as pool:
result = pool.starmap(task, [['1', '2'], ['3', '4']])
logger.debug(result)
with mp.Pool(2) as pool:
result = pool.starmap(error_task, [['5', '6'], ['7', '8']])
logger.debug(result)
multiprocess_starmap_async_test.py
import multiprocessing as mp
import time
from logging import getLogger, basicConfig, DEBUG
basicConfig(level=DEBUG)
logger = getLogger(__name__)
def task(param1, param2):
logger.debug(param1)
time.sleep(3)
logger.debug(param2)
return True
def error_task(param1, param2):
logger.debug(param1)
time.sleep(3)
logger.debug(param2)
raise Exception('例外だよ!')
if __name__ == '__main__':
with mp.Pool(2) as pool:
result = pool.starmap_async(task, [['1', '2'], ['3', '4']])
logger.debug(result.get())
with mp.Pool(2) as pool:
result = pool.starmap_async(error_task, [['5', '6'], ['7', '8']])
logger.debug(result.get())
動作はこうなる
docker run --rm -it -v `pwd`:/usr/src python37 concurrent_test.py
DEBUG:__main__:1
DEBUG:__main__:3
DEBUG:__main__:2
DEBUG:__main__:4
DEBUG:__main__:True
DEBUG:__main__:True
DEBUG:__main__:7
DEBUG:__main__:5
DEBUG:__main__:8
DEBUG:__main__:6
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/local/lib/python3.7/concurrent/futures/process.py", line 232, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/usr/local/lib/python3.7/concurrent/futures/process.py", line 191, in _process_chunk
return [fn(*args) for args in chunk]
File "/usr/local/lib/python3.7/concurrent/futures/process.py", line 191, in <listcomp>
return [fn(*args) for args in chunk]
File "concurrent_test.py", line 28, in error_wrapper
return error_task(*args)
File "concurrent_test.py", line 20, in error_task
raise Exception('例外だよ!')
Exception: 例外だよ!
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "concurrent_test.py", line 39, in <module>
for result in executor.map(error_wrapper, [['5', '6'], ['7', '8']]):
File "/usr/local/lib/python3.7/concurrent/futures/process.py", line 476, in _chain_from_iterable_of_lists
for element in iterable:
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 586, in result_iterator
yield fs.pop().result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
Exception: 例外だよ!
make: [mulutiprocess_test] Error 1 (ignored)
docker run --rm -it -v `pwd`:/usr/src python37 multiprocess_starmap_test.py
DEBUG:__main__:1
DEBUG:__main__:3
DEBUG:__main__:2
DEBUG:__main__:4
DEBUG:__main__:[True, True]
DEBUG:__main__:5
DEBUG:__main__:7
DEBUG:__main__:6
DEBUG:__main__:8
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 47, in starmapstar
return list(itertools.starmap(args[0], args[1]))
File "multiprocess_starmap_test.py", line 20, in error_task
raise Exception('例外だよ!')
Exception: 例外だよ!
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "multiprocess_starmap_test.py", line 29, in <module>
result = pool.starmap(error_task, [['5', '6'], ['7', '8']])
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 276, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 657, in get
raise self._value
Exception: 例外だよ!
make: [mulutiprocess_test] Error 1 (ignored)
docker run --rm -it -v `pwd`:/usr/src python37 multiprocess_starmap_async_test.py
DEBUG:__main__:1
DEBUG:__main__:3
DEBUG:__main__:2
DEBUG:__main__:4
DEBUG:__main__:[True, True]
DEBUG:__main__:5
DEBUG:__main__:7
DEBUG:__main__:8
DEBUG:__main__:6
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 47, in starmapstar
return list(itertools.starmap(args[0], args[1]))
File "multiprocess_starmap_async_test.py", line 20, in error_task
raise Exception('例外だよ!')
Exception: 例外だよ!
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "multiprocess_starmap_async_test.py", line 30, in <module>
logger.debug(result.get())
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 657, in get
raise self._value
Exception: 例外だよ!
make: [mulutiprocess_test] Error 1 (ignored)
これだけだと特に違いはないですね。