2
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

こんにちは!本記事では、Pythonにおける並列処理について、特にmultiprocessingconcurrent.futuresモジュールの活用法に焦点を当てて解説します。これらのモジュールを適切に使用することで、プログラムのパフォーマンスを大幅に向上させることができます。

1. 並列処理の基本

並列処理とは、複数のタスクを同時に実行することで、全体の処理時間を短縮する手法です。Pythonでは、GIL(Global Interpreter Lock)の制約により、マルチスレッドでの真の並列処理が難しいため、マルチプロセスを使用することが一般的です。

2. multiprocessingモジュール

multiprocessingモジュールは、プロセスベースの並列処理を実現するためのモジュールです。

2.1 基本的な使用方法

import multiprocessing
import time

def worker(num):
    print(f"Worker {num} started")
    time.sleep(2)
    print(f"Worker {num} finished")

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("All workers finished")

このコードは5つの独立したプロセスを生成し、各プロセスでworker関数を実行します。

2.2 プロセス間通信

multiprocessingモジュールは、プロセス間でデータを共有するための様々な方法を提供しています。

Queue

from multiprocessing import Process, Queue

def producer(queue):
    for i in range(5):
        queue.put(f"Item {i}")

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consumed {item}")

if __name__ == '__main__':
    q = Queue()
    prod_proc = Process(target=producer, args=(q,))
    cons_proc = Process(target=consumer, args=(q,))

    prod_proc.start()
    cons_proc.start()

    prod_proc.join()
    q.put(None)  # 終了シグナル
    cons_proc.join()

Pipe

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("Hello from sender")
    conn.close()

def receiver(conn):
    msg = conn.recv()
    print(f"Received: {msg}")
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

2.3 プロセスプール

multiprocessing.Poolを使用すると、タスクを複数のプロセスに効率的に分配できます。

from multiprocessing import Pool

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3, 4, 5]))

3. concurrent.futuresモジュール

concurrent.futuresモジュールは、非同期実行のための高レベルインターフェースを提供します。

3.1 ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import time

def task(n):
    time.sleep(1)
    return n * n

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=5) as executor:
        results = executor.map(task, range(10))
    
    for result in results:
        print(result)

ProcessPoolExecutormultiprocessing.Poolと似ていますが、より使いやすいインターフェースを提供します。

3.2 非同期実行とコールバック

from concurrent.futures import ProcessPoolExecutor
import time

def task(n):
    time.sleep(n)
    return f"Task {n} completed"

def callback(future):
    print(future.result())

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=3) as executor:
        for i in range(5):
            future = executor.submit(task, i)
            future.add_done_callback(callback)

    print("All tasks submitted")

submitメソッドを使用すると、タスクを非同期で実行し、完了時にコールバック関数を呼び出すことができます。

4. multiprocessingとconcurrent.futuresの比較

特徴 multiprocessing concurrent.futures
低レベル制御 -
使いやすさ -
プロセス間通信 -
非同期実行 -
コールバック -
プロセスプール

multiprocessingは低レベルの制御が必要な場合や、複雑なプロセス間通信が必要な場合に適しています。一方、concurrent.futuresは使いやすさと非同期実行が重要な場合に適しています。

5. 並列処理のベストプラクティス

  1. タスクの粒度: タスクが小さすぎると、プロセス生成のオーバーヘッドが大きくなります。適切な粒度のタスクを設計しましょう。

  2. プロセス数の最適化: 通常、CPUコア数と同じかそれよりやや多い程度のプロセス数が最適です。

  3. リソースの共有: 共有リソースへのアクセスには注意が必要です。ロックやセマフォを適切に使用しましょう。

  4. デッドロックの回避: 複数のプロセスが互いにリソースを待ち合う状況を避けましょう。

  5. エラーハンドリング: 各プロセスで発生する例外を適切に処理しましょう。

  6. メモリ使用量の管理: 大量のデータを扱う場合は、メモリ使用量に注意しましょう。

6. 実践的な例:画像処理の並列化

画像処理は並列化の恩恵を受けやすいタスクの一つです。以下に、複数の画像をグレースケールに変換する並列処理の例を示します。

import os
from concurrent.futures import ProcessPoolExecutor
from PIL import Image

def convert_to_grayscale(filename):
    with Image.open(filename) as img:
        grayscale_img = img.convert('L')
        output_filename = f"grayscale_{os.path.basename(filename)}"
        grayscale_img.save(output_filename)
    return output_filename

def process_images(image_dir):
    image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.endswith('.jpg')]
    
    with ProcessPoolExecutor() as executor:
        results = executor.map(convert_to_grayscale, image_files)
    
    for result in results:
        print(f"Processed: {result}")

if __name__ == '__main__':
    image_directory = "path/to/your/image/directory"
    process_images(image_directory)

この例では、ProcessPoolExecutorを使用して複数の画像を同時にグレースケールに変換しています。

7. パフォーマンス分析とプロファイリング

並列処理を実装した後は、実際にパフォーマンスが向上しているか確認することが重要です。PythonのcProfileモジュールやサードパーティのツール(例:line_profiler)を使用して、プログラムのボトルネックを特定し、最適化することができます。

import cProfile
import pstats

def run_with_profiling(func, *args, **kwargs):
    profiler = cProfile.Profile()
    profiler.enable()
    func(*args, **kwargs)
    profiler.disable()
    stats = pstats.Stats(profiler).sort_stats('cumulative')
    stats.print_stats()

# 使用例
if __name__ == '__main__':
    run_with_profiling(process_images, "path/to/your/image/directory")

まとめ

Pythonのmultiprocessingconcurrent.futuresモジュールは、並列処理を実現するための強力なツールです。multiprocessingは低レベルの制御が必要な場合に適しており、concurrent.futuresはより高レベルで使いやすいインターフェースを提供します。

並列処理を効果的に活用することで、CPUバウンドなタスクのパフォーマンスを大幅に向上させることができます。ただし、適切なタスクの粒度、プロセス数の最適化、リソース管理などに注意を払う必要があります。

実際のプロジェクトでは、タスクの性質や要件に応じて適切なモジュールと手法を選択し、必要に応じてプロファイリングを行いながら最適化を進めることが重要です。

並列処理は強力なツールですが、常に使用すべきというわけではありません。タスクの性質や規模、オーバーヘッドなどを考慮し、適切な場面で活用することで、Pythonプログラムの性能を最大限に引き出すことができるでしょう。

ハッピーな並列プログラミングを!

2
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?