1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

[Python] 並列並行処理のメモ

Last updated at Posted at 2021-05-18

参考にした記事

具体例で理解したい時に読む
Pythonの並列・並行処理サンプルコードまとめ

どんなときに使うか悩んだ時に読む
Pythonにおける非同期処理: asyncio逆引きリファレンス

理論を学ぶ

Pythonにはthreading、multiprocessing、asyncioとどれも並列処理に使えそうなパッケージが3つあります。これらの違いをまず押さえておきます。

これらのパッケージの違いは、そのまま「マルチスレッド」、「マルチプロセス」、「ノンブロッキング」の違いに相当します。まず、マルチスレッドとマルチプロセスの違いについて。

プロセスプール

スレッドではなくプロセス単位に分けることで、Global Interpreter Lock (GIL) の制約を受けなくなりマルチコアで動かせるようになります。
ただし、その分スレッドよりも規模が大きいプロセスを使うので、他の制約が増えることも。注意!

import time
import concurrent.futures


def func1():
    while True:
        print("func1")
        time.sleep(1)


def func2():
    while True:
        print("func2")
        time.sleep(1)


if __name__ == "__main__":
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    for i in range(10):
        if i % 2 == 0:
            executor.submit(func1)
        else:
            executor.submit(func2)
    print('loop end')

ワーカーを4にすると4個以上プロセスが起動しないことがわかる
func1
func2
func1
func2
loop end
func1
func2
.
.
.

複数のプロセスに複数のスレッドを建てる

マルチタスクを実現するために参考になる関係図
image.png
出典: https://slidesplayer.net/slide/11222023/

"""
プロセス1にスレッド2つ起動
プロセス2にスレッド2つ起動

目標
① 同じプロセスにあるスレッド間のデータ共有 (スレッド間通信)
② 異なるプロセス間のデータ共有 (プロセス間通信)

"""
import time
import threading
from multiprocessing import Process, Value
from functools import partial

class PS():
    # def __init__(self,ps_name,sl2,shm):
    def __init__(self,ps_name,sl2):
        self.ps_name=ps_name
        self.sl2=sl2
        self.is_quitting=False # ① スレッド間通信
        # self.shm=shm           # ② プロセス間通信
        self.th1=threading.Thread(target=self.fn1)
        self.th2=threading.Thread(target=self.fn2)

        # 各スレッドの起動
        print(f'{self.ps_name}: 1つ目のスレッド 起動')
        self.th1.start()
        print(f'{self.ps_name}: 2つ目のスレッド 起動')
        self.th2.start()
        print(f'現在起動中のスレッド: {threading.active_count()}')

        # 各スレッドの終了を待つ
        self.th1.join()
        print(f'{self.ps_name}: 1つ目のスレッド 停止')
        self.th2.join()
        print(f'{self.ps_name}: 2つ目のスレッド 停止')
        print(f'現在起動中のスレッド: {threading.active_count()}')

        # このプロセスが終了したことを他のプロセスへ知らせる
        # self.shm.value=1  # ② プロセス間通信
        shm.value=1  # ② プロセス間通信

    def fn1(self):
        """
        終了フラグが立つまで1秒間隔で標準出力する
        """
        time.sleep(1)
        while not self.is_quitting:  # ① スレッド間通信
            print(f'{self.ps_name}: fn1: 実行中')
            # if self.shm.value:
            if shm.value:
                print(f'{self.ps_name}: どこかのプロセスが終了している') # ② プロセス間通信
            time.sleep(1)

    def fn2(self):
        """
        スレッド2が起動して数秒後にスレッド1を終了させるためのフラグを立てる
        """
        time.sleep(self.sl2)
        print(f'{self.ps_name}: fn2: 実行中')
        self.is_quitting=True # ① スレッド間通信
        print(f'\n{self.ps_name}: {self.ps_name}のfn1を終了します\n')


# 標準出力でプロセスを識別するために使う
name_ps1='プロセス1'
name_ps2='プロセス2'

# 共有メモリ(shared memory)
"""
メインスレッドで作成された共有メモリは、共有されるオブジェクトとなるので、引数で渡したり返却値に指定したりは不要
"""
shm = Value('i', 0) # 第一引数のiはc言語のint型を表す (詳しくはctypesで検索)

# プロセス1
# process1 = Process(target=PS, args=(name_ps1,2,shm)) # プロセス起動後2秒後にプロセス終了
process1 = Process(target=PS, args=(name_ps1,2)) # プロセス起動後2秒後にプロセス終了


# プロセス2
# process2 = Process(target=PS, args=(name_ps2,7,shm)) # プロセス起動後5秒後にプロセス終了
process2 = Process(target=PS, args=(name_ps2,7)) # プロセス起動後5秒後にプロセス終了

print(f'{name_ps1}: start')
process1.start()
print(f'{name_ps2}: start')
process2.start()

process1.join()
print(f'\n{name_ps1}: end\n')
process2.join()
print(f'\n{name_ps2}: end\n')

プロセス1: start
プロセス2: start
プロセス1: 1つ目のスレッド 起動
プロセス1: 2つ目のスレッド 起動
プロセス2: 1つ目のスレッド 起動
プロセス2: 2つ目のスレッド 起動
プロセス1: fn1: 実行中
プロセス2: fn1: 実行中
プロセス1: fn1: 実行中
プロセス1: fn2: 実行中

プロセス1: プロセス1のfn1を終了します

プロセス2: fn1: 実行中
プロセス1: 1つ目のスレッド 停止
プロセス1: 2つ目のスレッド 停止
プロセス2: fn1: 実行中
プロセス2: どこかのプロセスが終了している

プロセス1: end

プロセス2: fn1: 実行中
プロセス2: どこかのプロセスが終了している
プロセス2: fn1: 実行中
プロセス2: どこかのプロセスが終了している
プロセス2: fn1: 実行中
プロセス2: どこかのプロセスが終了している
プロセス2: fn2: 実行中

プロセス2: プロセス2のfn1を終了します

プロセス2: 1つ目のスレッド 停止
プロセス2: 2つ目のスレッド 停止

プロセス2: end
1
3
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?