本記事でやること
100000個のファイルコピーをThreadPoolExecutor/ProcessExecutorで検証.
検証環境
- MacBook Pro 2017
- 2.3 GHz デュアルコアIntel Core i5
- 8 GB 2133 MHz LPDDR3
- Python 3.7.3
とりあえず100000個のファイルを作る
ランダムなファイル名を持つファイルを./data1配下に作成.
create_random_file.py
import os
import random
import string
CREATE_FILES_NUM = 100000
FILE_NAME_NUM = 10
SAVE_DIR = 'data1/'
def random_file_name(n, extension='.txt'):
random_strs = [random.choice(string.ascii_letters + string.digits) for i in range(n)]
return ''.join(random_strs) + extension
def _create_file():
for _ in range(CREATE_FILES_NUM):
file_name = random_file_name(FILE_NAME_NUM)
with open(os.path.join(SAVE_DIR, file_name), 'w') as f:
f.write('')
if __name__ == '__main__':
_create_file()
並列処理を行わないコピーの場合
data1配下に作った1000個のファイルをdata2配下にコピー.
以下のコードの実行に約16.1秒かかった.
naive_copy.py
import os
import time
import shutil
def print_time(func):
def wrapper(*args, **kwargs):
t1 = time.time()
func(*args, **kwargs)
t2 = time.time()
print(f'{func.__name__}: {t2-t1}')
return wrapper
def _get_file_list(dir_name):
file_list = [os.path.join(dir_name, f) for f in os.listdir(dir_name) if \
os.path.isfile(os.path.join(dir_name, f))]
return file_list
def _copy_file(file_list, copy_to):
for f in file_list:
f_copy = copy_to + f.split('/')[-1]
shutil.copyfile(f, f_copy)
def main():
file_list = _get_file_list(dir_name='data1')
os.mkdir('data2/')
_copy_file(file_list, 'data2/')
shutil.rmtree('data2/')
os.mkdir('data2/')
if __name__ == '__main__':
main()
ThreadPoolExecutorを利用した場合
上記同様にコピーを実施. 以下コードを実行. 約15.1秒かかった.
thread_copy.py
import os
import time
import shutil
import numpy as np
from concurrent.futures import ThreadPoolExecutor
MAX_WORKERS = os.cpu_count()
print('MAX WORKERS:', MAX_WORKERS)
def print_time(func):
def wrapper(*args, **kwargs):
t1 = time.time()
func(*args, **kwargs)
t2 = time.time()
print(f'{func.__name__}: {t2-t1}')
return wrapper
def _get_file_list(dir_name):
file_list = [os.path.join(dir_name, f) for f in os.listdir(dir_name) if \
os.path.isfile(os.path.join(dir_name, f))]
return file_list
def _copy_file(file_list, copy_to):
for f in file_list:
f_copy = copy_to + f.split('/')[-1]
shutil.copyfile(f, f_copy)
@print_time
def _copy_file_concurrent_thread(file_list, copy_to):
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
split_file_list = np.array_split(np.array(file_list), MAX_WORKERS)
copy_to_list = [copy_to for _ in range(MAX_WORKERS)]
results = executor.map(_copy_file, split_file_list, copy_to_list)
def main():
file_list = _get_file_list(dir_name='data1')
_copy_file_concurrent_thread(file_list, 'data2/')
shutil.rmtree('data2/')
os.mkdir('data2/')
if __name__ == '__main__':
main()
ProcessPoolExecutorを利用した場合
以下コードを実行. 約9.9秒かかった.
process_copy.py
import os
import time
import shutil
import numpy as np
from concurrent.futures import ProcessPoolExecutor
MAX_WORKERS = os.cpu_count()
print('MAX WORKERS:', MAX_WORKERS)
def print_time(func):
def wrapper(*args, **kwargs):
t1 = time.time()
func(*args, **kwargs)
t2 = time.time()
print(f'{func.__name__}: {t2-t1}')
return wrapper
def _get_file_list(dir_name):
file_list = [os.path.join(dir_name, f) for f in os.listdir(dir_name) if \
os.path.isfile(os.path.join(dir_name, f))]
return file_list
def _copy_file(file_list, copy_to):
for f in file_list:
f_copy = copy_to + f.split('/')[-1]
shutil.copyfile(f, f_copy)
@print_time
def _copy_file_concurrent_process(file_list, copy_to):
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
split_file_list = np.array_split(np.array(file_list), MAX_WORKERS)
copy_to_list = [copy_to for _ in range(MAX_WORKERS)]
results = executor.map(_copy_file, split_file_list, copy_to_list)
def main():
file_list = _get_file_list(dir_name='data1')
_copy_file_concurrent_process(file_list, 'data2/')
shutil.rmtree('data2/')
os.mkdir('data2/')
if __name__ == '__main__':
main()
結論
ローカルの大量ファイルコピーにはProcessPoolExecutorがはやいよ.