参考にした記事
具体例で理解したい時に読む
→ Pythonの並列・並行処理サンプルコードまとめ
どんなときに使うか悩んだ時に読む
→ Pythonにおける非同期処理: asyncio逆引きリファレンス
理論を学ぶ
Pythonにはthreading、multiprocessing、asyncioとどれも並列処理に使えそうなパッケージが3つあります。これらの違いをまず押さえておきます。
これらのパッケージの違いは、そのまま「マルチスレッド」、「マルチプロセス」、「ノンブロッキング」の違いに相当します。まず、マルチスレッドとマルチプロセスの違いについて。
プロセスプール
スレッドではなくプロセス単位に分けることで、Global Interpreter Lock (GIL) の制約を受けなくなりマルチコアで動かせるようになります。
ただし、その分スレッドよりも規模が大きいプロセスを使うので、他の制約が増えることも。注意!
import time
import concurrent.futures
def func1():
while True:
print("func1")
time.sleep(1)
def func2():
while True:
print("func2")
time.sleep(1)
if __name__ == "__main__":
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
for i in range(10):
if i % 2 == 0:
executor.submit(func1)
else:
executor.submit(func2)
print('loop end')
ワーカーを4にすると4個以上プロセスが起動しないことがわかる
func1
func2
func1
func2
loop end
func1
func2
.
.
.
複数のプロセスに複数のスレッドを建てる
マルチタスクを実現するために参考になる関係図
出典: https://slidesplayer.net/slide/11222023/
"""
プロセス1にスレッド2つ起動
プロセス2にスレッド2つ起動
目標
① 同じプロセスにあるスレッド間のデータ共有 (スレッド間通信)
② 異なるプロセス間のデータ共有 (プロセス間通信)
"""
import time
import threading
from multiprocessing import Process, Value
from functools import partial
class PS():
# def __init__(self,ps_name,sl2,shm):
def __init__(self,ps_name,sl2):
self.ps_name=ps_name
self.sl2=sl2
self.is_quitting=False # ① スレッド間通信
# self.shm=shm # ② プロセス間通信
self.th1=threading.Thread(target=self.fn1)
self.th2=threading.Thread(target=self.fn2)
# 各スレッドの起動
print(f'{self.ps_name}: 1つ目のスレッド 起動')
self.th1.start()
print(f'{self.ps_name}: 2つ目のスレッド 起動')
self.th2.start()
print(f'現在起動中のスレッド: {threading.active_count()}')
# 各スレッドの終了を待つ
self.th1.join()
print(f'{self.ps_name}: 1つ目のスレッド 停止')
self.th2.join()
print(f'{self.ps_name}: 2つ目のスレッド 停止')
print(f'現在起動中のスレッド: {threading.active_count()}')
# このプロセスが終了したことを他のプロセスへ知らせる
# self.shm.value=1 # ② プロセス間通信
shm.value=1 # ② プロセス間通信
def fn1(self):
"""
終了フラグが立つまで1秒間隔で標準出力する
"""
time.sleep(1)
while not self.is_quitting: # ① スレッド間通信
print(f'{self.ps_name}: fn1: 実行中')
# if self.shm.value:
if shm.value:
print(f'{self.ps_name}: どこかのプロセスが終了している') # ② プロセス間通信
time.sleep(1)
def fn2(self):
"""
スレッド2が起動して数秒後にスレッド1を終了させるためのフラグを立てる
"""
time.sleep(self.sl2)
print(f'{self.ps_name}: fn2: 実行中')
self.is_quitting=True # ① スレッド間通信
print(f'\n{self.ps_name}: {self.ps_name}のfn1を終了します\n')
# 標準出力でプロセスを識別するために使う
name_ps1='プロセス1'
name_ps2='プロセス2'
# 共有メモリ(shared memory)
"""
メインスレッドで作成された共有メモリは、共有されるオブジェクトとなるので、引数で渡したり返却値に指定したりは不要
"""
shm = Value('i', 0) # 第一引数のiはc言語のint型を表す (詳しくはctypesで検索)
# プロセス1
# process1 = Process(target=PS, args=(name_ps1,2,shm)) # プロセス起動後2秒後にプロセス終了
process1 = Process(target=PS, args=(name_ps1,2)) # プロセス起動後2秒後にプロセス終了
# プロセス2
# process2 = Process(target=PS, args=(name_ps2,7,shm)) # プロセス起動後5秒後にプロセス終了
process2 = Process(target=PS, args=(name_ps2,7)) # プロセス起動後5秒後にプロセス終了
print(f'{name_ps1}: start')
process1.start()
print(f'{name_ps2}: start')
process2.start()
process1.join()
print(f'\n{name_ps1}: end\n')
process2.join()
print(f'\n{name_ps2}: end\n')
プロセス1: start
プロセス2: start
プロセス1: 1つ目のスレッド 起動
プロセス1: 2つ目のスレッド 起動
プロセス2: 1つ目のスレッド 起動
プロセス2: 2つ目のスレッド 起動
プロセス1: fn1: 実行中
プロセス2: fn1: 実行中
プロセス1: fn1: 実行中
プロセス1: fn2: 実行中
プロセス1: プロセス1のfn1を終了します
プロセス2: fn1: 実行中
プロセス1: 1つ目のスレッド 停止
プロセス1: 2つ目のスレッド 停止
プロセス2: fn1: 実行中
プロセス2: どこかのプロセスが終了している
プロセス1: end
プロセス2: fn1: 実行中
プロセス2: どこかのプロセスが終了している
プロセス2: fn1: 実行中
プロセス2: どこかのプロセスが終了している
プロセス2: fn1: 実行中
プロセス2: どこかのプロセスが終了している
プロセス2: fn2: 実行中
プロセス2: プロセス2のfn1を終了します
プロセス2: 1つ目のスレッド 停止
プロセス2: 2つ目のスレッド 停止
プロセス2: end