はじめに
Pythonの並列処理、threading vs multiprocessing vs asyncio、どれを使えばいい?
この記事では、実測比較とフローチャートで最適な選択肢を示します。
たった3行で9倍速くなった実装例も公開。
結論
処理のボトルネックが、CPUバウンドなのかI/Oバウンドなのかで、最適な処理方式やライブラリが決まってきます。
-
CPUバウンド (CPU-bound) :
プログラムの実行速度が CPU の計算能力によって制限されている状態。- 例: 数値計算、機械学習、画像変換、正規表現による複雑なテキスト処理
-
I/Oバウンド (I/O-bound) :
プログラムの実行速度がディスク読み書きやネットワーク通信などの入出力待ち時間によって制限されている状態。- 例: APIリクエスト、スクレイピング、DBクエリ、ファイル読み書き
【30秒診断】あなたの処理に最適なライブラリは?
以下のチェックリストで判定してください。
✅ チェック1:タスクマネージャーでCPU使用率を確認
- 100%近い → CPUバウンド(計算処理)
- 低い (10~30%) → I/Oバウンド(待機処理)
✅ チェック2:同時処理数は?
- 10~100個 → ThreadPoolExecutor
- 1000個以上 → asyncio
✅ チェック3:既存コードは?
- 同期的 (
requestsなど) → ThreadPoolExecutor - 非同期 (
async/await) → asyncio
💡 迷ったら? → まずThreadPoolExecutorを試す(最も簡単)
※詳細な選定フローはこちらを参照ください。
用語
並列処理と並行処理
並列処理と並行処理は、どちらも複数のタスクを実行する方式のことを指します。
複数のタスクを処理するために用意する処理装置が複数か単一かで区別されます。
- 並列処理 (Parallel processing): 複数のタスクに対して、複数の処理装置を用いて、同時に処理することを指します。
- 並行処理 (Concurrent processing): 複数のタスクに対して、単一の処理装置を共有し、同時に処理することを指します。
例えば、「塗り絵を完成させる」といったタスクを考えるとします。
- 並列処理: 複数の人で、手分けして同時に塗る。 (ハードウェアリソースを複数使う)
- 並行処理: 一人が複数のペンを持ち替えながら、隙間時間なく塗る。 (待ち時間を有効活用する)
プロセスとスレッド
プロセスとスレッドは、どちらもプログラムの実行単位を指しています。
プログラムをどのような単位で扱うかによって区別されます。
プロセス (Process) とは、コンピュータ上で実行されている「プログラムの実行単位」のことを指します。より正確に言うなら、プログラムをメインメモリ上に展開して処理を実行できる状態 (インスタンス化) したもののことです。
プロセスは、プログラムの実行に必要な様々な情報を管理しています。
これらの情報をリソース (Resource) といいます。
リソースとして以下のようなものがあります。
-
コードセグメント (Code Segment)
実行可能なバイナリコードのこと -
データセグメント (Data Segment)
グローバル変数や静的変数を格納する領域のこと -
ヒープ (Heap)
動的メモリの割り当て領域のこと -
スタック (Stack)
ローカル変数や関数パラメータ、関数の呼び出しのこと -
レジスタ (Registers)
CPU内部の高速な一時データ格納領域のこと
マルチプロセス (Multi process) とは、複数のプロセスを同時に、または並行して実行することを指します。
スレッド (Thread) とは、プロセス内の処理実行単位のことを指します。
プロセスの持つスレッドが単一か複数かによって、以下の二種類に分けられます。
-
シングルスレッド (Single thread)
単一のスレッドを持つプロセスを指します。 -
マルチスレッド (Multi thread)
複数のスレッドを持つプロセスを指します。
マルチスレッドでは、複数のスレッドを同時に動作させることができます。
プロセス内のスレッドは、各々がレジスタとスタックを有していますが、コードセグメント、データセグメント、ヒープについては、プロセス内で共有されているリソースになります。
そのため、同じプロセス内の異なるスレッド間であっても、共有されたメモリ領域を利用することで効率よく通信することが可能です。
ただし、このようなスレッド間通信は、複数スレッドを同時に動作させる場合に、意図しない不具合を起こす可能性があります。
【重要】Pythonにおけるプロセスとスレッドの違い
Pythonで開発する上で最も意識すべき違いは「メモリ空間の独立性」と「起動コスト」です。
- プロセス: メモリ空間が別々。データのやり取りには「通信 (直列化/Pickle) 」が必要でコストが高い。起動も遅い。
- スレッド: メモリ空間を共有。データのやり取りは高速だが、競合 (レースコンディション) に注意が必要。起動は速い。
GIL (Global Interpreter Lock)
※Pythonの並列処理を理解する上で最も重要な概念です。
GIL (Global Interpreter Lock) とは、Pythonインタプリタ (CPython) が一度に1つのスレッドしかバイトコードを実行できないようにする排他ロックの仕組みです。
この制約により、Pythonのマルチスレッドは、純粋なCPU計算 (CPUバウンドな処理) を並列化しても高速化しません (むしろロックのオーバーヘッドで遅くなることさえあります) 。
- CPUバウンドを高速化したい場合 → マルチプロセス (GILの影響を受けない別プロセスを立てる)
- I/Oバウンドを高速化したい場合 → マルチスレッド / asyncio (I/O待ちの間はGILが解放されるため有効)
同期処理と非同期処理
同期処理と非同期処理は、どちらも複数のタスクを実行する方式のことを指します。
複数のタスクをどのような実行順序で処理していくかによって区別されます。
-
同期処理 (Synchronous processing) とは、タスクを1つずつ順番に実行していく方式です。
- メリット: プログラム通りの順序でタスクが実行され、タスクが終了するまでは次のタスクに移行しないため、処理全体を把握しやすいです。
- デメリット: タスクが多ければ多いほど処理終了までに時間がかかってしまいます。
-
非同期処理 (Asynchronous processing) とは、あるタスクが実行している間に、その処理を止めることなく別のタスクを実行できる方式を指します。
- メリット: 複数タスクを同時に処理することができるため、たとえタスクが多い場合でも比較的早く処理が終わります。
- デメリット: どのタスクがいつのタイミングで処理されるかを追いにくくなってしまうため、バグが発生すると修正に時間がかかりやすくなってしまいます。
技術選定基準
選定フローチャート
詳細な選定基準
結論としては、標準ライブラリの concurrent.futures モジュール (ProcessPoolExecutor / ThreadPoolExecutor) を第一選択肢として検討してください。
CPUバウンド: マルチプロセス編
-
前提
PythonのGIL (Global Interpreter Lock) を回避するため、プロセスを分ける必要があります。
| ライブラリ | 位置づけ | 選定基準 | 難点 |
|---|---|---|---|
concurrent.futures.ProcessPoolExecutor |
最適解・モダンで標準 | ・単純に関数に引数を渡して、並列実行して、結果を受け取りたい。 ・インターフェースを ThreadPoolExecutor と統一したい。・ Future オブジェクトで非同期的に状態管理したい。 |
・細かなプロセス間通信 (Queue, Pipe) を制御できない。・共有メモリを細かく制御できない。 ・ Context Manager非対応の古いPython環境は非対応。 |
multiprocessing.Pool |
レガシーだが強力 | ・map, starmap, imap など、リスト処理系APIがどうしても便利。・タスク投入順序と結果取得順序を厳密に制御したい。 ・プロセスの初期化処理 ( initializer) を手軽に書きたい。 |
・APIが少し古い。 ・例外処理が futures に比べて直感的でない場合がある。 |
multiprocessing.Process |
低レイヤー | ・計算処理ではなく、**「常駐する別の働き手」**を作りたい (例:ログ監視プロセス) 。 ・プロセス間で複雑なシグナル交換や同期が必要。 |
・単に「速くしたい」だけの計算処理にはコードが複雑すぎる。 ・プロセスの管理・終了処理を自前で書く必要がある。 |
I/Oバウンド: 非同期・マルチスレッド編
-
前提
CPUは暇しているため、通信などの待ち時間を有効活用します。
| ライブラリ | 位置づけ | 選定基準 | 難点 |
|---|---|---|---|
concurrent.futures.ThreadPoolExecutor |
最適解 | ・既存の同期処理 (requests 等) をそのまま並列化したい。・スクレイピングやAPI叩きをサクッと高速化したい。 ・スレッド管理 (生成・破棄) を自動化したい。 |
・スレッド間での複雑なデータのやり取りや、細かいロック制御が必要。 ・数千以上の同時接続だと、スレッド生成コストが重くなる。 |
threading |
低レイヤー | ・クラス内部でスレッドを保持し、細かいライフサイクル管理をしたい。 ・ Lock, Event, Condition, Semaphore などで精密な排他制御が必要。・GUIアプリのバックグラウンド処理。 |
・単に「Web APIを並列で叩きたい」だけなら、記述量が多すぎてバグの温床になる。 |
asyncio |
高性能・高コスト | ・WebSocketサーバー、チャットボット、数万規模の同時クローリングを実装したい。 ・最初から非同期設計 ( async/await) で開発している。・パフォーマンスが絶対的な正義である。 |
・既存コードが同期的な場合は、呼び出し元含め全て書き換えが必要である。 ・チームメンバーが非同期処理の概念 (イベントループ、コルーチン) を理解する必要がある。 |
実装例
I/Oバウンド:100個のURLから画像をダウンロード
使用ライブラリ: requests, aiohttp
pip install requests aiohttp
import time
import requests
from concurrent.futures import ThreadPoolExecutor
import asyncio
import aiohttp
# 共通設定
URLS = [f'https://picsum.photos/200/300?random={i}' for i in range(100)]
def download_sync(url):
"""1枚ダウンロードする同期関数"""
response = requests.get(url)
return len(response.content)
# 方法0:逐次処理(ベースライン)
def baseline():
start = time.time()
results = [download_sync(url) for url in URLS]
print(f"逐次処理: {time.time() - start:.2f}秒")
print(f"取得サイズ合計: {sum(results) / 1024:.1f}KB")
# 方法1:ThreadPoolExecutor(最も簡単)
def method_threadpool():
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(download_sync, URLS))
print(f"ThreadPoolExecutor: {time.time() - start:.2f}秒")
print(f"取得サイズ合計: {sum(results) / 1024:.1f}KB")
# 方法2:asyncio(最速)
async def download_async(session, url):
"""非同期ダウンロード"""
async with session.get(url) as response:
return len(await response.read())
async def method_asyncio():
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [download_async(session, url) for url in URLS]
results = await asyncio.gather(*tasks)
print(f"asyncio: {time.time() - start:.2f}秒")
print(f"取得サイズ合計: {sum(results) / 1024:.1f}KB")
# 実行
if __name__ == "__main__":
baseline()
method_threadpool()
asyncio.run(method_asyncio())
📊 実測結果まとめ
逐次処理: 66.46秒
取得サイズ合計: 963.1KB
ThreadPoolExecutor: 4.57秒
取得サイズ合計: 1005.3KB
asyncio: 1.09秒
取得サイズ合計: 945.9KB
| 方法 | 実行時間 | 高速化倍率 | コード変更量 |
|---|---|---|---|
| 逐次処理 | 66.46秒 | 1x | - |
ThreadPoolExecutor |
4.57秒 | 14.5x | ★☆☆(3行追加) |
asyncio |
1.09秒 | 61x | ★★★(全面書き直し) |
CPUバウンド:1000回のハッシュ計算 × 10000
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import hashlib
# 重い計算タスク:1000回のハッシュ計算 × 10000
def heavy_hash(n):
"""重い計算:SHA256を1000回繰り返す"""
data = str(n).encode()
for _ in range(1000):
data = hashlib.sha256(data).digest()
return data
NUMBERS = range(10000) # 10000個のタスク(各タスクが重い)
# 方法0:逐次処理
def baseline_cpu():
start = time.time()
results = [heavy_hash(n) for n in NUMBERS]
print(f"逐次処理: {time.time() - start:.2f}秒")
# 方法1:ProcessPoolExecutor
def method_processpool():
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(heavy_hash, NUMBERS))
print(f"ProcessPoolExecutor: {time.time() - start:.2f}秒")
# 失敗例:ThreadPoolExecutorでCPUバウンド
def method_threadpool():
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(heavy_hash, NUMBERS))
print(f"ThreadPoolExecutor: {time.time() - start:.2f}秒")
if __name__ == "__main__":
baseline_cpu()
method_processpool()
method_threadpool()
逐次処理: 4.21秒
ProcessPoolExecutor: 1.46秒
ThreadPoolExecutor: 4.64秒
📊 実測結果まとめ
| 方法 | 実行時間 | 高速化倍率 | なぜ? |
|---|---|---|---|
| 逐次処理 | 4.21秒 | 1x | - |
| ProcessPoolExecutor | 1.46秒 | 4x | ⭕ 4コア全部使える |
| ThreadPoolExecutor | 4.64秒 | 1x | ❌ GILで1コアしか使えない |
⚠️ 注意:multiprocessingが遅くなるケース
プロセス生成コストに注意
multiprocessingは万能ではありません。タスクが軽すぎると、プロセス生成のオーバーヘッドで逆に遅くなります。
失敗例:素数判定(計算が軽すぎる)
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
NUMBERS = range(100000, 200000) # 100000個の大きな数
def is_prime(n):
"""素数判定(重い計算)"""
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True
# 方法0:逐次処理
def baseline_cpu():
start = time.time()
results = [is_prime(n) for n in NUMBERS]
print(f"逐次処理: {time.time() - start:.2f}秒")
print(f"素数の個数: {sum(results)}")
# 方法1:ProcessPoolExecutor
def method_processpool():
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(is_prime, NUMBERS))
print(f"ProcessPoolExecutor: {time.time() - start:.2f}秒")
print(f"素数の個数: {sum(results)}")
# 失敗例:ThreadPoolExecutorでCPUバウンド
def method_threadpool_fail():
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(is_prime, NUMBERS))
print(f"ThreadPoolExecutor: {time.time() - start:.2f}秒")
# 実行
if __name__ == "__main__":
baseline_cpu()
method_processpool()
method_threadpool_fail()
逐次処理: 0.15秒
ProcessPoolExecutor: 37.39秒
ThreadPoolExecutor: 0.74秒
なぜこうなる?
- データのPickle化:大量の数値を直列化するコスト
- プロセス間通信:結果を回収するコスト
multiprocessingを使うべき基準
✅ 使うべき: 1タスクが重い。実行時間が長い。
- 画像処理(フィルタ、リサイズ)
- 暗号化・ハッシュ計算(大量データ)
- 機械学習の推論(バッチ処理)
- 動画エンコード
❌ 使うべきでない: 1タスクが軽い。実行時間が短い。
- 単純な数値計算
- 文字列操作
- 軽いループ処理
実際に確かめてみる
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import hashlib
# 重い計算タスク:10万回のハッシュ計算
def heavy_hash(n):
"""重い計算:SHA256を1000回繰り返す"""
data = str(n).encode()
for _ in range(1000):
data = hashlib.sha256(data).digest()
return data
def is_prime(n):
"""素数判定(重い計算)"""
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True
def main():
n = 1000
start = time.time()
heavy_hash(n)
print(f"heavy_hash: {time.time() - start:.6f}秒")
start = time.time()
is_prime(n)
print(f"is_prime: {time.time() - start:.6f}秒")
# 実行
if __name__ == "__main__":
main()
heavy_hash: 0.000466秒
is_prime: 0.000006秒
最後に
ここまで様々なライブラリを紹介してきましたが、並列処理における最大の教訓は「並列化すれば必ず速くなるとは限らない」ということです。
プロセス生成のオーバーヘッド、スレッドのコンテキストスイッチ、通信コスト……これらが計算時間の短縮分を上回れば、「苦労して並列化したのに、シングルスレッドより遅くなった」という悲劇が起こります。
並列処理は強力な武器ですが、用法用量を守って使いましょう。
迷ったときは、このページのフローチャートに戻ってくるか、有識者に相談してください!
次のステップ
この記事で並列処理の基本を理解したら、次はこれ:
-
エラーハンドリングを学ぶ
-
Future.exception()の使い方 - タイムアウトの実装
-
-
プロセス間通信を学ぶ
-
Queueでデータをやり取り -
Managerで共有メモリ
-
-
実践プロジェクトに挑戦
- Webスクレイピングの並列化
- 画像処理の並列化
- APIの並列呼び出し