0
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

【Python】非同期処理でmainスレッドと並列実行できるプラクティスを検討してみた(multiprocessing, asyncio)

Last updated at Posted at 2020-10-15

python非同期処理のプラクティスとして以下の要件を満たすものを検討してみました。

  • main処理と非同期処理を並列実行させたい。
  • main処理内の指定した箇所で非同期処理の実行完了までwaitさせたい。
  • 非同期処理の戻り値をfutureで取得したい。

コードはやや冗長ですが、そのままコピペして試せるようにロガー等も含めた全ソースを記載しています。

ケース1 mainスレッドと非同期スレッドを並列で処理(multiprocessing利用)

multiprocessing1.py
from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging

# ロガーの取得
def get_logger():
    logger = logging.getLogger("multiprocesssing_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

def async_func(name, sleep_time):
    # スレッドidを取得
    thread_id = th.get_ident()

    logger.info(f"thread_id:{thread_id} name:{name} async_func開始")
    time.sleep(sleep_time)
    logger.info(f"thread_id:{thread_id} name:{name} async_func終了")

    return f"{thread_id}-{name}"

if __name__ == "__main__":
    # スレッド実行用のスレッドプールを作成
    # processesでスレッドの最大同時
    pool = ThreadPool(processes=1)

    # スレッドidを取得
    thread_id = th.get_ident()

    # 非同期処理を実行する。第一引数に関数オブジェクト、第二引数に引数を指定する。
    logger.info(f"thread_id:{thread_id} mainから非同期処理をcall")
    future = pool.apply_async(async_func, ("スレッド1", 10))

    # 非同期処理と並行してmainスレッドで実行させたい処理
    logger.info(f"thread_id:{thread_id} main非同期処理実行中の処理開始")
    time.sleep(5)
    logger.info(f"thread_id:{thread_id} main非同期処理実行中の処理終了")

    # 非同期処理が終わるのを待って、結果を取得する。
    result = future.get()
    logger.info(f"thread_id:{thread_id} 非同期処理の結果取得:{result}")
    pool.close()

実行結果

2020-10-15 16:43:27,073 - thread_id:18440 mainから非同期処理をcall
2020-10-15 16:43:27,074 - thread_id:18440 main非同期処理実行中の処理開始
2020-10-15 16:43:27,074 - thread_id:18132 name:スレッド1 async_func開始
2020-10-15 16:43:32,074 - thread_id:18440 main非同期処理実行中の処理終了
2020-10-15 16:43:37,075 - thread_id:18132 name:スレッド1 async_func終了
2020-10-15 16:43:37,075 - thread_id:18440 非同期処理の結果取得:18132-スレッド1

ログから16:43:27に「main非同期処理実行中の処理」と「async_func開始」の処理が同時に並列で実行されているのが分かります。

ケース2 mainスレッドと複数の非同期スレッドを並列で処理(multiprocessing利用)

multiprocessing2.py
from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging

# ロガーの取得
def get_logger():
    logger = logging.getLogger("multiprocesssing_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

def async_func(name, sleep_time):
    # スレッドidを取得
    thread_id = th.get_ident()

    logger.info(f"thread_id:{thread_id} name:{name} async_func開始")
    time.sleep(sleep_time)
    logger.info(f"thread_id:{thread_id} name:{name} async_func終了")

    return f"{thread_id}-{name}"

if __name__ == "__main__":
    # スレッド実行用のスレッドプールを作成
    # processesでスレッドの最大同時
    pool = ThreadPool(processes=5)

    # スレッドidを取得
    thread_id = th.get_ident()

    # 非同期処理を実行する。第一引数に関数オブジェクト、第二引数に引数を指定する。
    logger.info(f"thread_id:{thread_id} mainから非同期処理をcall")
    futures = []
    for i in range(5):
        future = pool.apply_async(async_func, (f"スレッド{i + 1}", 10)) # Tuple of args for foo
        futures.append(future)

    # 非同期処理と並行してmainスレッドで実行させたい処理
    logger.info(f"thread_id:{thread_id} main非同期処理実行中の処理開始")
    time.sleep(5)
    logger.info(f"thread_id:{thread_id} main非同期処理実行中の処理終了")

    # 非同期処理が終わるのを待って、結果を取得する。
    results = [future.get() for future in futures]
    logger.info(f"thread_id:{thread_id} 非同期処理の結果取得:{results}")
    pool.close()

実行結果

2020-10-15 16:47:41,977 - thread_id:13448 mainから非同期処理をcall
2020-10-15 16:47:41,978 - thread_id:13448 main非同期処理実行中の処理開始
2020-10-15 16:47:41,979 - thread_id:23216 name:スレッド1 async_func開始
2020-10-15 16:47:41,979 - thread_id:21744 name:スレッド2 async_func開始
2020-10-15 16:47:41,979 - thread_id:21708 name:スレッド3 async_func開始
2020-10-15 16:47:41,979 - thread_id:21860 name:スレッド4 async_func開始
2020-10-15 16:47:41,979 - thread_id:22100 name:スレッド5 async_func開始
2020-10-15 16:47:46,980 - thread_id:13448 main非同期処理実行中の処理終了
2020-10-15 16:47:51,982 - thread_id:21744 name:スレッド2 async_func終了
2020-10-15 16:47:51,982 - thread_id:23216 name:スレッド1 async_func終了
2020-10-15 16:47:51,983 - thread_id:21708 name:スレッド3 async_func終了
2020-10-15 16:47:51,984 - thread_id:21860 name:スレッド4 async_func終了
2020-10-15 16:47:51,984 - thread_id:22100 name:スレッド5 async_func終了
2020-10-15 16:47:51,986 - thread_id:13448 非同期処理の結果取得:['23216-スレッド1', '21744-スレッド2', '21708-スレッド3', '21860-ス 
レッド4', '22100-スレッド5']

ログから16:47:41に「main非同期処理実行中の処理」と「async_func開始」の5つの処理が同時に並列で実行されているのが分かります。
またThreadPool(processes=3)などにしてプロセス数を少なくすると、まず3つスレッドが実行されて2つが待ち状態になり完了時に新しいスレッドが実行される挙動になります。

ケース3 mainスレッドと複数の非同期スレッドを並列で処理(asyncio利用)

asyncio1.py
import asyncio
import itertools
import time
import profile
import random
import time
import threading as th
import logging

# ロガーの取得
def get_logger():
    logger = logging.getLogger("asyncio_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

# task id的なものを取得
# ※asyncioは内部的にジェネレーターを利用しているので
# スレッドIDは同じになり、非同期処理のIDに相当するものの取得方法は以下となる。
_next_id = itertools.count().__next__
def get_task_id():
    return _next_id()

async def async_func(name, sleep_time):
    # タスクidを取得
    task_id = get_task_id()

    logger.info(f"task_id:{task_id} name:{name} async_func開始")
    await asyncio.sleep(sleep_time)
    logger.info(f"task_id:{task_id} name:{name} async_func終了")

    return f"{task_id}-{name}"

async def async_func_caller():
    # タスクidを取得
    task_id = get_task_id()

    # 非同期処理タスクを生成
    # ※この時点ではタスクを生成しているだけで実行されず、
    # loop.run_until_completeを呼び出した際に実行される。
    futures = [asyncio.ensure_future(async_func(f"task{i + 1}", 10)) for i in range(5)]

    # 非同期処理と並行してmainスレッドで実行させたい処理
    logger.info(f"task_id:{task_id} async_func_caller 非同期処理実行中の処理開始")
    await asyncio.sleep(5)
    logger.info(f"task_id:{task_id} async_func_caller 非同期処理実行中の処理終了")

    # 非同期処理が終わるのを待って、結果を取得する。
    results = await asyncio.gather(*futures)

    return results


if __name__ == "__main__":
    # 非同期処理実行用のスレッドプールを作成
    loop = asyncio.get_event_loop()

    logger.info(f"main非同期処理実行中の処理開始")

    # 非同期処理を実行し、終了までwait
    ret = loop.run_until_complete(async_func_caller())
    logger.info(f"main非同期処理実行中の処理終了 結果:{ret}")
    loop.close()

実行結果

2020-10-15 16:49:40,132 - main非同期処理実行中の処理開始
2020-10-15 16:49:40,134 - task_id:0 async_func_caller 非同期処理実行中の処理開始
2020-10-15 16:49:40,134 - task_id:1 name:task1 async_func開始
2020-10-15 16:49:40,135 - task_id:2 name:task2 async_func開始
2020-10-15 16:49:40,135 - task_id:3 name:task3 async_func開始
2020-10-15 16:49:40,136 - task_id:4 name:task4 async_func開始
2020-10-15 16:49:40,136 - task_id:5 name:task5 async_func開始
2020-10-15 16:49:45,138 - task_id:0 async_func_caller 非同期処理実行中の処理終了
2020-10-15 16:49:50,141 - task_id:2 name:task2 async_func終了
2020-10-15 16:49:50,142 - task_id:5 name:task5 async_func終了
2020-10-15 16:49:50,142 - task_id:4 name:task4 async_func終了
2020-10-15 16:49:50,144 - task_id:1 name:task1 async_func終了
2020-10-15 16:49:50,144 - task_id:3 name:task3 async_func終了
2020-10-15 16:49:50,145 - main非同期処理実行中の処理終了 結果:['1-task1', '2-task2', '3-task3', '4-task4', '5-task5']

ログから16:49:40に「main非同期処理実行中の処理」と「async_func開始」の5つの処理が同時に並列で実行されているのが分かります。

非同期処理を実装する際にご参考になれば幸いです。

0
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?