LoginSignup
3
1

More than 3 years have passed since last update.

concurrent.futuresのThreadPoolExecutor/ProcessPoolExecutorを試す

Posted at

本記事でやること

100000個のファイルコピーをThreadPoolExecutor/ProcessExecutorで検証.

検証環境

  • MacBook Pro 2017
  • 2.3 GHz デュアルコアIntel Core i5
  • 8 GB 2133 MHz LPDDR3
  • Python 3.7.3

とりあえず100000個のファイルを作る

ランダムなファイル名を持つファイルを./data1配下に作成.

create_random_file.py
import os
import random
import string


CREATE_FILES_NUM = 100000
FILE_NAME_NUM = 10
SAVE_DIR = 'data1/'


def random_file_name(n, extension='.txt'):
    random_strs = [random.choice(string.ascii_letters + string.digits) for i in range(n)]
    return ''.join(random_strs) + extension


def _create_file():
    for _ in range(CREATE_FILES_NUM):
        file_name = random_file_name(FILE_NAME_NUM)
        with open(os.path.join(SAVE_DIR, file_name), 'w') as f:
            f.write('')


if __name__ == '__main__':
    _create_file()

並列処理を行わないコピーの場合

data1配下に作った1000個のファイルをdata2配下にコピー.
以下のコードの実行に約16.1秒かかった.

naive_copy.py
import os
import time
import shutil


def print_time(func):
    def wrapper(*args, **kwargs):
        t1 = time.time()
        func(*args, **kwargs)
        t2 = time.time()
        print(f'{func.__name__}: {t2-t1}')
    return wrapper


def _get_file_list(dir_name):
    file_list = [os.path.join(dir_name, f) for f in os.listdir(dir_name) if \ 
        os.path.isfile(os.path.join(dir_name, f))]
    return file_list


def _copy_file(file_list, copy_to):
    for f in file_list:
        f_copy = copy_to + f.split('/')[-1]
        shutil.copyfile(f, f_copy)


def main():
    file_list = _get_file_list(dir_name='data1')
    os.mkdir('data2/')
    _copy_file(file_list, 'data2/')
    shutil.rmtree('data2/')
    os.mkdir('data2/')


if __name__ == '__main__':
    main()

ThreadPoolExecutorを利用した場合

上記同様にコピーを実施. 以下コードを実行. 約15.1秒かかった.

thread_copy.py
import os
import time
import shutil
import numpy as np
from concurrent.futures import ThreadPoolExecutor

MAX_WORKERS = os.cpu_count()
print('MAX WORKERS:', MAX_WORKERS)


def print_time(func):
    def wrapper(*args, **kwargs):
        t1 = time.time()
        func(*args, **kwargs)
        t2 = time.time()
        print(f'{func.__name__}: {t2-t1}')
    return wrapper


def _get_file_list(dir_name):
    file_list = [os.path.join(dir_name, f) for f in os.listdir(dir_name) if \
        os.path.isfile(os.path.join(dir_name, f))]
    return file_list


def _copy_file(file_list, copy_to):
    for f in file_list:
        f_copy = copy_to + f.split('/')[-1]
        shutil.copyfile(f, f_copy)


@print_time
def _copy_file_concurrent_thread(file_list, copy_to):
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        split_file_list = np.array_split(np.array(file_list), MAX_WORKERS)
        copy_to_list = [copy_to for _ in range(MAX_WORKERS)]
        results = executor.map(_copy_file, split_file_list, copy_to_list)


def main():
    file_list = _get_file_list(dir_name='data1')
    _copy_file_concurrent_thread(file_list, 'data2/')
    shutil.rmtree('data2/')
    os.mkdir('data2/')


if __name__ == '__main__':
    main()

ProcessPoolExecutorを利用した場合

以下コードを実行. 約9.9秒かかった.

process_copy.py
import os
import time
import shutil
import numpy as np
from concurrent.futures import ProcessPoolExecutor

MAX_WORKERS = os.cpu_count()
print('MAX WORKERS:', MAX_WORKERS)


def print_time(func):
    def wrapper(*args, **kwargs):
        t1 = time.time()
        func(*args, **kwargs)
        t2 = time.time()
        print(f'{func.__name__}: {t2-t1}')
    return wrapper


def _get_file_list(dir_name):
    file_list = [os.path.join(dir_name, f) for f in os.listdir(dir_name) if \
        os.path.isfile(os.path.join(dir_name, f))]
    return file_list


def _copy_file(file_list, copy_to):
    for f in file_list:
        f_copy = copy_to + f.split('/')[-1]
        shutil.copyfile(f, f_copy)


@print_time
def _copy_file_concurrent_process(file_list, copy_to):
    with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
        split_file_list = np.array_split(np.array(file_list), MAX_WORKERS)
        copy_to_list = [copy_to for _ in range(MAX_WORKERS)]
        results = executor.map(_copy_file, split_file_list, copy_to_list)


def main():
    file_list = _get_file_list(dir_name='data1')
    _copy_file_concurrent_process(file_list, 'data2/')
    shutil.rmtree('data2/')
    os.mkdir('data2/')


if __name__ == '__main__':
    main()

結論

ローカルの大量ファイルコピーにはProcessPoolExecutorがはやいよ.

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1