187
165

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【3.14対応】Pythonをいろんな意味で高速化する個人的TIPS30選

Posted at

はじめに

こんにちは。
私は某SIerに入社し5年目となりますが、これまでざっくりと以下のような業務に取り組んでまいりました。

  • 数理最適化エンジンの開発、システム導入
  • 量子コンピュータ周辺技術のR&D
  • AIエージェント/生成AIシステムの開発・導入

また、プライベートで以下にも取り組みました。

  • 競技プログラミング(AtCoder等)
  • 書籍翻訳(O'Reilly)

私は、これらの多くの場面で、Pythonを利用してきましたが、その中でPythonの良いところ/悪いところを多く学ばせていただきました。この記事では、これまで学んできたPythonのTIPS、特にPythonでよく課題となる速度にフォーカスし、様々な観点でPythonを速くするためにはどうすれば良いのかについて、紹介したいと思います。

そもそも、「性能が求められる場面でPythonを利用するな」というツッコミもある気がしますが(とても効くのでやめてください)、一方で、特に昨今ブームである生成AIの文脈で、とりあえずPythonでスピーディに開発をした後、スケールさせようとして性能問題に直面するというような事例もちらほら見かけるので、意外と需要があるのではないかと思っています。

あくまで個人的なTIPSであるため、もし「こっちの方が速いですよ」とか、「こういうケースの時は、この書き方の方がよい」等ありましたら、コメントいただけると嬉しいです :bow:

本記事は長いです。

そのため、全てを読むのが辛い方は、LLMに食わせてみるのが良いと思います(画像や埋め込みの図表は無く、すべてテキストベースで書かれておりますので、読み込ませやすいと思います)。

なお、LLMについては、諸々の規約・ルールに従ったうえでご利用をお願いします。

TIPS一覧

本記事では計8編に分け各編2~6個ずつ合計30TIPSを紹介します。具体的に紹介するTIPSは以下の表の通りです。★の数は、主にシステム開発を頭に浮かべつつ、効果と適用のしやすさを考慮したなんとなくのおすすめ度です。

TIPS(記法編)

# TIPS名
1 [★★]for文を使う際に内包表記を検討する
2 [★★★] if hoge in listを連続で使う場合、if hoge in setを検討する
3 [★★★] 二つのリストの共通要素を求めたい場合、setの集合演算利用を検討する
4 [★★★] 文字列の結合は+=ではなく.joinを使う

TIPS(並行・並列処理編)

# TIPS名
5 [★★★] CPUバウンドな処理を複数同時に実行したいときは、マルチスレッドではなく、マルチプロセスで行う
6 [★] (Python3.13~)GILが解除されたFree-Threadingを検討する
7 [★★] IOバウンドな処理を複数同時に実行したいときはマルチスレッドでもよい
8 [★★★] IOバウンドな処理を大量に同時実行したいときは非同期処理を検討する
9 [★★★] 非同期関数内でIO待ちの長い同期処理を実行しないようにする
10 [★★] それでも非同期関数内でIO待ちの長い同期処理を実行したい時は、asyncio.to_threadを活用する

TIPS(Webフレームワーク編)

# TIPS名
11 [★★] CPUバウンドな処理を行うエンドポイントに対してはワーカー数増加を検討する
12 [★★] IOバウンドな処理を行うエンドポイントに対してはスレッド数増加を検討する
13 [★★] FastAPIでIOバウンドな処理を扱う時は非同期エンドポイントの利用を検討する

TIPS(アルゴリズム編)

# TIPS名
14 [★★★] ソート済みのリストを検索するときは二分探索を利用する
15 [★★] 最小値(最大値)の取得に優先度付きキューを検討する
16 [★★] 効率的なアルゴリズムを使えないか検討する
17 [★] 数理最適化ソルバーの利用を検討する

TIPS(他言語の活用編)

# TIPS名
18 [★★★] 複数データの操作にNumpyを利用する
19 [★★] 数値計算にScipyを利用する
20 [★★★] バリデーションにPydanticを利用する
21 [★★] Pandasの代わりにPolarsを利用する
22 [★] 自力でC・C++・Rustで実装して、Pythonでラップする

TIPS(言語編)

# TIPS名
23 [★★★] Pythonのバージョンアップをする
24 [★] 別のPython実装・コンパイラの利用を検討する
25 [★] (Python3.13~)JITコンパイラ導入を検討する
26 [★] CPythonのビルドオプションを変更する

TIPS(ハード編)

# TIPS名
27 [★★] サーバーのスペックを見直す
28 [★] GPUの利用を検討する

TIPS(番外編)

# TIPS名
29 [★★★] プロジェクト作成・ライブラリ管理にuvを使う
30 [★★★] リンター・フォーマッターとしてRuffを使う

本記事で紹介する・しない内容

本記事では紹介する内容は以下の通りです。

  • 筆者の個人的なPythonの高速化TIPSとその概要
  • 紹介する処理が速い理由(初中級者向けにざっくり)
  • 遅い処理との比較用コードと実測時間 ※一部の内容のみ

逆に、以下は触れないつもりです。

  • Pythonの速度以外のパフォーマンス(メモリ等)に関するTIPS
  • Pythonの高速化方法の体系的・網羅的な説明
  • 紹介する処理が速い/遅い理由の詳細・厳密な仕様の説明

この記事では触れない、効率的なPythonの体系的なTIPSや、詳細・厳密な仕様について知りたい方は、他書籍・他記事をご参照いただけるとよいと思います。個人的なおすすめ書籍、記事を以下に示しますので、よろしければご覧ください。

実行環境

一部のTIPSについては実際にコードを実行し、その時間の測定・検証を行っています。これらの検証は、注記がない限り、以下の環境で実施しています。

  • OS: Windows 11
  • CPU: 12th Gen Intel(R) Core(TM) i5-1235U (2.50 GHz)
  • メモリ: 16.0 GB
  • Python 3.14.0

せっかくなので、今年(2025年)の10月にリリースされたばかりのPython 3.14を使っています。

なお、Pythonの仮想環境の構築、ライブラリの追加はuvを利用しました(uvについてはこの後のTIPSでも紹介しています)。uvのバージョンは以下の通りです。

> uv --version
uv 0.9.9 (4fac4cb7e 2025-11-12)

また、検証のためインストールしたライブラリのバージョンは以下の通りです。

fastapi 0.122.0
httpx 0.28.1
numpy 2.3.4
pandas 2.3.3
polars 1.35.2
uvicorn 0.38.0

一応、検証に利用したコードは以下にあげています。

TIPS(記法編)

まずは、基本的な記法についてのTIPSです。すでに他の記事で触れられているような内容も多い気がしますが悪しからず。

1. [★★] for文を使う際に内包表記を検討する

for文でリスト・辞書・setを作成する場合、内包表記を使うことで、多少処理速度が向上します。
内包表記とは、以下のように[]の内部にfor文が記述されるような記法で、辞書やsetでも利用できます。

内包表記の例
# 0~9までのリストを作成
example_list = [n for n in range(10)]

# キーとバリューが0~9の辞書を作成
example_dict = {n: n for n in range(10)}

# 0~9までのsetを作成
example_set = {n for n in range(10)}

例えば、0~100000000までの偶数のリストを作成するコードで、内包表記の処理時間を見てみます。

0~100000000までの偶数のリストを作成するコード
import time

N = 100000000

# 内包表記を使わない場合
start_time = time.perf_counter()
result = []
for i in range(N + 1):
    if i % 2 == 0:
        result.append(i)
elapse_time = time.perf_counter() - start_time
print(f"内包表記を使わない場合: {elapse_time:.6f}s")

# 内包表記を使う場合
start_time = time.perf_counter()
result = [i for i in range(N + 1) if i % 2 == 0]
elapse_time = time.perf_counter() - start_time
print(f"内包表記を使う場合: {elapse_time:.6f}s")

以上のコードを実行すると、下記が出力されます。内包表記を使うことで、3-4割ほど処理速度が向上していますね。

実行時間
内包表記を使わない場合: 8.470001s
内包表記を使う場合: 5.606873s

個人的には、処理が速いうえにコード量も減るので、内包表記を使える箇所は使うべきだと思っておりますが、特に演算やif文の条件が複雑だったりネストしてたりすると、可読性が下がることもあるので、内包表記を使う程度はなんとなく自分の中でor開発メンバーと足並みを整えておくとよいかもです。

ちなみに、偶数のリストを作るだけであれば、以下の記法の方が速いです。
手元で計測したところ、実行時間は1-2秒程度でした。

もっと速い版
result = list(range(0, N + 1, 2))

2. [★★★] if hoge in listを連続で使う場合、if hoge in setを検討する

例えば、以下の二つのリストがあるとしましょう。

  • file_ids_a: ストレージAに保存されたファイルのIDのリスト
  • file_ids_b: ストレージBに保存されたファイルのIDのリスト

コードで示すと以下の通りです。(ここでは、それぞれIDが0~9999と5000~14999の一意なIDを持つファイル群を考えます。)

ファイルIDのリスト
import random
    
N = 10000
file_ids_a = list(range(10000))
file_ids_b = list(range(5000, 15000))
# シャッフル
random.shuffle(file_ids_a)
random.shuffle(file_ids_b)

この時、ストレージAB両方に含まれるかつファイルIDが偶数のファイルの数を求めるコードを考えます。
シンプルに考えると以下のようなコードになるかもしれません。

in listを利用
count = 0
for file_id in file_ids_a:
    if file_id in file_ids_b and file_id % 2 == 0:
        count += 1

しかし、ストレージBに含まれるかどうかを判定するif file_id in file_ids_b線形探索(つまり、リストの最初の要素から順に比較)となり、これをストレージAのファイルごとにしているため、計算コストがかかります。計算量1でいうと$O(N)$ 、全体で$O(N^2)$ですね。
今回の例だと、file_ids_aが1万件、file_ids_bも1万件であるため、最悪で1万 × 1万 = 1億回の比較が発生してしまいます。

これは、setを使うことで改善できます。具体的には以下に示す通りで、file_ids_bを事前にset化しておき、if文でin listの代わりに、in setを利用します。

in setを利用
file_ids_b_set = set(file_ids_b)  # set化
count = 0
for file_id in file_ids_a:
    if file_id in file_ids_b_set and file_id % 2 == 0:  # ここでset利用
        count += 1

setに対するin演算はハッシュテーブルを使うため、リストのそれよりも高速に行えます。計算量は$O(1)$です。
実際、実行時間を比較すると以下のようになります。なんと、数百倍も速くなっていますね。

実行時間
in listを利用: 0.511834s
in setを利用: 0.001097s

個人的には、内包表記と同様、積極的に利用すべきだと思いますが、注意点として以下があります。

  • set化するには、リスト内の要素がハッシュ可能である必要あるため、特にオブジェクトが要素であるときは注意が必要
  • set化自体にコストがかかるので、リストの要素が少なかったり比較回数が少ないと効果が出ないこともある
  • リストとは異なり、setは順序保持をしないため、リスト内の順番が重要な処理では、リストとset両方呼び出せるようにしておく等の工夫が必要

3. [★★★] 二つのリストの共通要素を求めたい場合、setの集合演算利用を検討する

二つのリストの共通要素を求めたい場合、in listin setを利用しても実装できますが、特に要素の重複が多い場合setの集合演算を使うことでさらに高速化できます。

例えば、先ほどの例と関連して、以下のようなリストを考えます。

  • file_tags_a: ストレージAのファイルに付加されたタグのリスト
  • file_tags_b: ストレージBのファイルに付加されたタグのリスト

具体的には、以下を考えます(ファイルの数10000に対し、タグの種類は10種類であるため、それなりにタグの重複が発生する例となっています)。

タグのリスト
file_tags_a = [random.randint(0, 10) for _ in range(10000)]
file_tags_b = [random.randint(5, 15) for _ in range(10000)]

これらのリストに対し、両方のストレージで使われているタグを求めるコードを考えます。

この時、setの集合演算を利用すると以下のように記述できます。

setの集合演算を利用
file_tags_ab = list(set(file_tags_a) & set(file_tags_b))

set同士を&で接続することで、いわゆる積集合を求める形で、ストレージAB両方で使われているタグを求めています。(ほかにも、|を利用することで和集合を、-を利用することで差集合を求めることができます。)

実行時間の計測コードと、結果は以下の通りです。in setと比較し半分以下の実行時間になっています。

実行時間の計測コード(折り畳み)
import random
import time

file_tags_a = [random.randint(0, 10) for _ in range(10000)]
file_tags_b = [random.randint(5, 15) for _ in range(10000)]

# in listを利用
start_time = time.perf_counter()
file_tags_ab = [file_tag for file_tag in file_tags_a if file_tag in file_tags_b]
elapse_time = time.perf_counter() - start_time
print(f"in listを利用: {elapse_time:.6f}s")

# in setを利用
start_time = time.perf_counter()
file_tags_b_set = set(file_tags_b)
file_tags_ab = [file_tag for file_tag in file_tags_a if file_tag in file_tags_b_set]
elapse_time = time.perf_counter() - start_time
print(f"in setを利用: {elapse_time:.6f}s")

# set演算を利用
start_time = time.perf_counter()
file_tags_ab = list(set(file_tags_a) & set(file_tags_b))
elapse_time = time.perf_counter() - start_time
print(f"set演算を利用: {elapse_time:.6f}s")
実行時間
in listを利用: 0.301911s
in setを利用: 0.000796s
set演算を利用: 0.000271s

ちなみに、リストの要素に重複がない場合はin setが速いケースの方が多いと思いますが、それでもその差は誤差であることと、setの集合演算を利用した方が可読性も高いと思うので、setの集合演算を使えるのであれば、重複があるかどうかは気にせず、使っちゃってもよい気がしてます。

4. [★★★] 文字列の結合は+ではなく.joinを使う

Pythonでは、文字列結合を+で記述することができます。
例えば、リストword_listに格納された文字列をカンマで結合する処理は以下のようになります。

+=で文字列結合
result = ""
for word in word_list:
    result += word + ","  # ※最後にもカンマがついてしまいますが、検証なのでこのまま

個人的には直観的で分かりやすいように感じますが、このコードは以下の理由で遅いです。

  • 文字列型(str型)はイミュータブル(不変)
  • よって、毎回の連結(+=)毎に、新しいオブジェクトが生成2される
    • 例えば、word_listの長さが100000の時、99999回文字列オブジェクトが再生成される

この問題に対しては、join関数が有効です。先ほどの例をjoin関数で書き換えた例を以下に示します。

join関数で文字列結合
result = ",".join(word_list)

join関数は、内部でまず最初に必要なメモリを確保するとのことで、オブジェクト作成も一回で済み、結合が高速に行えるようです。

word_listの長さが100000の下での、実行時間を以下に示しますが、join関数を使うことで数千倍になっていることが分かるかと思います。

実行時間
+=を利用: 1.424383s
joinを利用: 0.001037s

TIPS(並行・並列処理編)

記法編では、ある意味シングルスレッドの下で処理を速くする方法を紹介しましたが、ここでは、マルチスレッド、マルチプロセス、非同期処理といった並行・並列処理を活用して、実行時間を短縮するTIPSについて述べたいと思います。

5. [★★★] CPUバウンドな処理を複数同時に実行したいときは、マルチスレッドではなく、マルチプロセスで行う

時に、複数の処理を並列で実行したいときもあるでしょう。そのような時、マルチスレッドの利用が検討されるかと思いますが、Pythonでは注意が必要です。

例えば、以下の文字列を連結する関数を考えます。この処理は、プログラムの速度がCPUの性能に依存するいわゆるCPUバウンドな処理です。

CPUバウンドな処理を実行する関数
# 文字列をn回連結
def cpu_bound_task(n):
    s = ""
    for _ in range(n):
        s += "A"  # ※検証のため、あえてjoinではなく、遅い`+`を使用
    return s

この関数を、2スレッドで1回ずつ、合計2回実行したいと思います。Pythonにおいて、マルチスレッドの実装はconcurrent.futuresモジュールのThreadPoolExecutorが利用できます。具体的なコードは以下の通りです(以下では2スレッドの下で10の7乗回の文字列連結を2回実行します。)

マルチスレッドの実装例
import time
from concurrent.futures import ThreadPoolExecutor, wait

def test_multithreading(exe_num, max_workers):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        tasks = [executor.submit(cpu_bound_task, 10**7) for _ in range(exe_num)]
        wait(tasks)

# ※2スレッド・2回実行の場合は、以下のように呼び出す
# test_multithreading(2, 2)

比較対象として、関数cpu_bound_taskを逐次的に2回実行する関数も用意します。コードは以下の通りです。

逐次実行の実装例
def test_sequential(exe_num):
    for _ in range(exe_num):
        cpu_bound_task(10**7)

# ※2回実行の場合は、以下のように呼び出す
# test_sequential(2)

これら逐次実行とマルチスレッドでの実行時間を計測すると以下のようになります。

実行時間
逐次実行: 7.835927s
マルチスレッド: 8.177336s

なんと、マルチスレッドの方が遅くなるという結果になりました。これはPythonのGIL(Global Interpreter Lock) と呼ばれる仕掛けが原因です。GILの説明の詳細は省略しますが簡単に言うと、GILによって、Pythonでは同一プロセスで複数のCPU処理(バイトコード)を同時に実行できないようになっています。そのうえで、スレッド生成のコストがかかるため、マルチスレッドの方が遅くなったと考えられます。

もし、それでもPythonで、同時に複数の処理を並列で実行したいときは、マルチプロセスを利用するのが良いでしょう。Pythonでは、同じくconcurrent.futuresモジュールのProcessPoolExecutorを利用してマルチプロセスを実装できます。

マルチプロセスの実装例
from concurrent.futures import ProcessPoolExecutor, wait

def test_multiprocessing(exe_num, max_workers):
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        tasks = [executor.submit(cpu_bound_task, 10**7) for _ in range(exe_num)]
        wait(tasks)

# ※2プロセス・2回実行の場合は、以下のように呼び出す
# test_multithreading(2, 2)

マルチプロセス(2プロセス)を含む3条件で実行時間を計測すると以下のようになります。マルチスレッドとは異なり、マルチプロセスの実行時間が逐次実行の半分程度になっており、2つのcpu_bound_taskが同時に実行されていることが分かるかと思います。

実行時間の計測コード(折り畳み)
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, wait


# 文字列をn回連結
def cpu_bound_task(n):
    s = ""
    for _ in range(n):
        s += "A"  # ※検証のため、あえてjoinではなく、遅い`+`を使用
    return s


def test_sequential(exe_num):
    for _ in range(exe_num):
        cpu_bound_task(10**7)


def test_multithreading(exe_num, max_workers):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        tasks = [executor.submit(cpu_bound_task, 10**7) for _ in range(exe_num)]
        wait(tasks)


def test_multiprocessing(exe_num, max_workers):
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        tasks = [executor.submit(cpu_bound_task, 10**7) for _ in range(exe_num)]
        wait(tasks)


if __name__ == "__main__":  # if __name__ == "__main__"内で記述しないとエラーになる
    num_exe = 2
    max_workers = 2

    # 逐次実行
    start_time = time.perf_counter()
    test_sequential(num_exe)
    sequential_time = time.perf_counter() - start_time
    print(f"逐次実行: {sequential_time:.6f}s")

    # マルチスレッド
    start_time = time.perf_counter()
    test_multithreading(num_exe, max_workers)
    threading_time = time.perf_counter() - start_time
    print(f"マルチスレッド: {threading_time:.6f}s")

    # マルチプロセス
    start_time = time.perf_counter()
    test_multiprocessing(num_exe, max_workers)
    multiprocessing_time = time.perf_counter() - start_time
    print(f"マルチプロセス: {multiprocessing_time:.6f}s")

実行時間
逐次実行: 7.835927s
マルチスレッド: 8.177336s
マルチプロセス: 4.137919s

ただし、マルチプロセスの利用する際は、いくつか注意点があります。以下にその一例を示します。

  • 読んで字のごとくプロセスを複数使うため、その分CPU・メモリ等のリソースを多く消費する
  • プロセスの起動にオーバーヘッドがかかるため、短い処理の場合は効果がない(なんなら遅くなる)

これらの注意点を認識したうえで、活用するのが良いと思います。

その他、Pythonのマルチプロセスを利用する上でのお作法は、公式のプログラミングガイドラインをご参照ください。

6. [★] (Python3.13~)GILが解除されたFree-Threadingを検討する

ところで、Python 3.13より、GILが解除されたバージョン(Free-Threaded) が実験的に導入され、3.14からオプションとしてサポート3されるようになりました。

ここでも、先ほどのCPUバウンドな処理をFree-Threaded版で実行してみましょう。
現状、GIL解除は3.14のデフォルト機能ではないため、別のバージョンとしてインストールする必要があります。(例えばuvでは3.14tと指定することで、インストールできます。)

Free-Threaded版をインストールした後、先ほどのCPUバウンドな処理を、同じ条件の下、マルチスレッド・マルチプロセスの両方で実行すると、実行時間4は下記のようになります。先ほどは逐次実行よりも時間がかかっていたマルチスレッドが、逐次実行の半分程度の実行時間になっています。

実行時間
逐次実行: 0.907857s
マルチスレッド: 0.465390s
マルチプロセス: 0.570125s

注意点として、スレッドセーフであるかに気を付ける必要がある(場合によっては排他ロックなどを追加する必要ある)こと、また、まだデフォルトの機能ではなくライブラリによってはFree-Threaded版に追従できてないケースもある5とのことなので、2025年12月時点では、特に本番でのFree-Threaded版の採用は非推奨かと思います。

7. [★★] IOバウンドな処理を複数同時に実行したいときはマルチスレッドでもよい

CPUバウンドな処理の場合は、GILの影響でマルチスレッドを利用しても効果が無いことをTIPS#5で確認しました。それでは、IOバウンドな処理(ファイル入出力やDB操作、LLMリクエストといったIO処理に依存する処理)の場合はどうでしょうか。

ここでは、最も単純なIO処理であるsleep関数を利用し検証したいと思います。CPUバウンドの検証と同様に、逐次実行とマルチスレッドの実行時間を比較します。比較用のコードは以下の通りです(以下では、スリープ10秒を2回実行しており、マルチスレッドでは2スレッド使用しています)。

実行時間の計測コード(折り畳み)
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, wait


# 10秒スリープ
def io_bound_task():
    time.sleep(10)


# 逐次実行
def test_sequential(exe_num):
    for _ in range(exe_num):
        io_bound_task()


# マルチスレッド
def test_multithreading(exe_num, max_workers):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        tasks = [executor.submit(io_bound_task) for _ in range(exe_num)]
        wait(tasks)


# 非同期
async def io_bound_task_async():
    # await asyncio.sleep(10)
    # time.sleep(10)
    await asyncio.to_thread(time.sleep, 10)


async def test_async(exe_num):
    # タスクを作成
    tasks = [io_bound_task_async() for _ in range(exe_num)]

    # すべてのタスクを並行実行
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    exe_num = 2
    max_workers = 2

    # 逐次実行
    start_time = time.perf_counter()
    test_sequential(exe_num)
    sequential_time = time.perf_counter() - start_time
    print(f"逐次実行: {sequential_time:.6f}s")

    # マルチスレッド
    start_time = time.perf_counter()
    test_multithreading(exe_num, max_workers)
    threading_time = time.perf_counter() - start_time
    print(f"マルチスレッド: {threading_time:.6f}s")

結果は以下の通りになります。

実行時間
逐次実行: 20.000643s
マルチスレッド: 10.002704s

CPUバウンドな処理とは異なり、IOバウンドではマルチスレッドでも実行時間が逐次実行の半分となっています。これもGILの仕様で、大抵のIO処理では、IO待ちの間GILが解放されます。この仕様により、IOバウンドな処理をマルチスレッドで実行すると、同時に実行され、実行時間が短縮されます。

8. [★★★] IOバウンドな処理を大量に同時実行したいときは非同期処理を検討する

先ほどはsleep関数を2スレッドのもとで2回実行しましたが、大量に実行する場合はどうなるでしょうか。例えば、2スレッドの下でsleep関数(10秒) を10回実行してみます。すると、実行時間は以下のようになります。

実行時間
マルチスレッド: 50.006558s

これは、各sleep関数(10秒)の実行が完了されるまでスレッドが解放されず、2スレッドで5回ずつ順番に実行されてしまっていることを示しています。そのため、各スレッドのIO待ちの間の時間が無駄になってしまっています。

この例では、せいぜい10回なのでスレッド上限を10にすればよさそうですが、もっと大量に実行する場合は、スレッド作成等のオーバーヘッドが大きくなってしまいます。そんな時は、マルチスレッドではなく、非同期関数(async def) を活用するのが良いです。非同期関数を利用することで、先ほど発生したIO待ちの時間を有効活用できます。

例えば、先ほどのsleep処理の場合、非同期にすると以下のようになります(先ほどとは異なり、asyncioモジュールのsleep関数を利用することに注意してください)。

スリープ処理をasyncにした例
import asyncio


async def io_bound_task_async():
    await asyncio.sleep(10)


async def test_async(exe_num):
    tasks = [io_bound_task_async() for _ in range(exe_num)]
    await asyncio.gather(*tasks)

上記の実装で、スリープを10回実行させると、実行時間は以下のようになります。

実行時間
非同期: 10.022089s

非同期関数を利用することで、10回のsleep処理を待ち時間なくさばけていることが分かるかと思います。

注意点として、ライブラリによっては非同期処理を実装していないこともあるため、なんでもかんでも非同期化できるわけではないことに留意してください。また、後述のブロッキングを発生されるような実装をしてしまうと、大幅な性能劣化につながるため、ある程度async周りの仕様を理解したうえで利用することをお勧めします。

9. [★★★] 非同期関数内でIO待ちの長い同期処理を実行しないようにする

ところで、先ほどの関数io_bound_task_asyncの中で、非同期のsleep(asyncio.sleep)ではなく同期sleep(time.sleep)を呼び出すとどうなるでしょうか。

ここでは、以下のコードを利用し検証します。中でasyncio.sleepを実行するtest_async1と、time.sleepを実行するtest_async2を用意しています。

asyncの中で非同期sleepと同期sleep
import asyncio
import time


# asyncio.sleepを利用
async def io_bound_task_async1():
    await asyncio.sleep(10)


async def test_async1(exe_num):
    tasks = [io_bound_task_async1() for _ in range(exe_num)]
    await asyncio.gather(*tasks)


# time.sleepを利用
async def io_bound_task_async2():
    time.sleep(10)


async def test_async2(exe_num):
    tasks = [io_bound_task_async2() for _ in range(exe_num)]
    await asyncio.gather(*tasks)

exe_num=2を指定(つまりスリープ処理を2回実行)し、それぞれの関数の実行時間を計測すると以下のようになりました。

実行時間
非同期(asyncio.sleep): 10.018486s
非同期(time.sleep): 20.018492s

なんと、time.sleepを実行した方は20秒かかってしまっています。これは、非同期関数(async def)内で同期的なIO処理を呼び出すと、イベントループ(非同期処理の制御元)がブロックされてしまうことが原因です(そういえば、同期IO処理はブロッキングIOとも呼ばれますね)。

非同期処理では、awaitを使うことで「この処理は待ち時間がある」ことをイベントループに伝え、その間に他のタスクを実行できるような仕組みとなっています。しかし、time.sleep(10)のような同期処理はawaitできないため、イベントループにIO待ちであることを伝えられず、その10秒間は他のタスクが一切実行できなくなってしまいます

そのため、test_async2では2つのタスクが順次実行され、結果として合計20秒かかってしまったということです。

余談ですが、これはFastAPIの非同期エンドポイント実装の時も意識すべきで、ブロッキングIOを非同期エンドポイントの中で実装してしまうと、そのIO待ちの間は(そのワーカー内での)リクエストを受け付けなくなってしまいます

10. [★★] それでも非同期関数内でIO待ちの長い同期処理を実行したい時は、asyncio.to_threadを活用する

一方で、利用したい関数やライブラリが非同期に対応してない場合等、非同期関数内でIO待ちの長い同期処理を実行したい時もあるでしょう。

そんな時は、Python 3.9以降であれば、asyncio.to_threadが有効です。例えば、先ほどのio_bound_task_async2asyncio.to_threadを適用すると以下のようになります。

asyncio.to_threadを適用
async def io_bound_task_async2():
    await asyncio.to_thread(time.sleep, 10)

これにより、スリープ処理を別のスレッドに投げ、ブロッキングIOによるイベントループのブロックを回避することができます。

実際、TIPS#9と同じ処理に対する実行時間を計測してみると、以下のようになり、10秒程度で実行できていることが分かります。

実行時間
非同期(time.sleep+asyncio.to_thread): 10.010455s

より低レベルの関数として、loop.run_in_executorがあります。これは、実行するスレッドプールを個別に指定したり、マルチプロセスで処理させる等、asyncio.to_threadと比較し細かい制御が可能です。

asyncio.to_threadは内部でThreadPoolExecutorを使用しており、このスレッド上限数が少なめなので、大量の同期IO処理を同時に実行する場合は、loop.run_in_executorでスレッド上限を拡張させたスレッドプールを使う等の工夫が必要です。詳細は、公式ドキュメントや、asyncio.to_threadの利用によりフリーズしてしまった事例の記事6をご覧ください。

TIPS(Webフレームワーク編)

Pythonにも著名なWebフレームワークがいくつか提供されています。ここでは、私が触った経験のあるFlaskFastAPIについて、特にレスポンスタイムの短縮に着目したTIPSを少し紹介します。

11. [★★] CPUバウンドな処理を行うエンドポイントに対してはワーカー数増加を検討する

Webアプリケーションのエンドポイント処理がCPUバウンドな場合、TIPS#5でも述べた通り、GILにより1プロセスでは1つのCPUコアしか活用できません。

そのため、複数のCPUコアを活用し、同時にさばくリクエスト数を上げたい場合は、ワーカー数(プロセス数)を増やす必要があります。

Flaskのサーバーとしてよく利用されるGunicornでは、以下のように起動コマンドのオプションでワーカー数を指定できます7。以下では4ワーカーでサーバーが起動されます。

4ワーカーでサーバー起動(gunicorn)
gunicorn -w 4 app:app

FastAPIの場合、Gunicorn利用の場合は上記オプションを、Uvicorn利用の場合は--workersオプションでワーカー数を設定できます8

4ワーカーでサーバー起動(uvicorn)
uvicorn app:app --workers 4

適切なワーカー数の設定については公式ドキュメント9を参照ください。ただし、処理内容や環境によって適切な数は異なるはずなので、実際は負荷テストを行いながら最適な値を見つけるのが良いと思います。

本番環境でFastAPIアプリを起動する場合、↑のuvicornコマンドではなく、gunicornコマンドでワーカークラスにuvicorn.workers.UvicornWorkerを指定する方法がuvicornの公式ドキュメントで推奨されています10。具体的なコマンドは以下の通りです。

gunicorn app:app -w 4 -k uvicorn.workers.UvicornWorker

12. [★★] IOバウンドな処理を行うエンドポイントに対してはスレッド数増加を検討する

エンドポイント処理がIOバウンドな場合、スレッド上限を増やすことで、IO待ち中に処理できるリクエスト数を増やすことできます(ここはTIPS#7で紹介した話と同じような話です)。

Gunicornでは、起動コマンド実行時に--threadsオプションを指定することでスレッド上限を指定できます(以下はスレッド上限2の例です)。

スレッド上限2でサーバー起動(gunicorn)
gunicorn -w 4 --threads 2 app:app

一方FastAPIの場合は、起動オプションではなく、内部的に使われているAnyIOのオプションを変更することで、スレッド上限を制御できます11。具体的には、以下のようにlifespanを使いアプリ起動前に設定できます(以下はスレッド上限を120にしています。)。

スレッド上限120でサーバー起動(FastAPI)
from contextlib import asynccontextmanager

import anyio.to_thread
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 起動時の設定
    limiter = anyio.to_thread.current_default_thread_limiter()
    limiter.total_tokens = 120
    yield

app = FastAPI(lifespan=lifespan)

なお、FastAPIでこのオプションが関わるのは、基本的に同期エンドポイント(defで定義されたエンドポイント)のみで、非同期エンドポイント(async def)の場合は、内部でスレッドに投げる処理を記述していない限りは、直接の影響は無いことに注意してください。

13. [★★] FastAPIでIOバウンドな処理を扱う時は非同期エンドポイントの利用を検討する

FastAPIでは、async defでエンドポイントを定義し、内部のIO処理を非同期化することで、さらに効率的なIO処理が可能になります(以下はHTTPXの非同期クライアントを使用したコードの例です)。

非同期IO処理を行うエンドポイントの例(FastAPI)
from fastapi import FastAPI
import httpx

app = FastAPI()

@app.get("/fetch-data")
async def fetch_data():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com/data")
        return response.json()

非同期エンドポイントを使うことで、IO待ち中にイベントループが他のリクエストを処理できるため、スレッド数を増やすよりも効率的にリクエストを捌けるようになります。

ただし、TIPS#9で述べた通り、非同期エンドポイント内で同期的なIO処理を実行すると、イベントループがブロックされてしまうため注意が必要です。

非同期エンドポイント内で同期的なIO処理を実行したい場合は、TIPS#9で紹介したasyncio.to_threadではなく、fastapi.concurrency.run_in_threadpool(もしくは、starlette.concurrency.run_in_threadpool)を使うのが良いと思います12。これは、TIPS#12でAnyIOのスレッド上限を設定しましたが、そのスレッドプールと、asyncio.to_threadで利用されるスレッドプールが異なるためです。

Flaskでもバージョン2.0以降でasync defによるエンドポイントの定義が可能ですが、内部的には各リクエストごとに新しいイベントループを作成して実行する仕組みのため、複数リクエスト間でのイベントループ共有ができず、FastAPIのようなレスポンスタイム短縮の効果はありません

もしFlaskライクな記法で効果を得たい場合は、Flaskと互換性がある非同期フレームワークであるQuartの利用を検討すると良いと思います13

TIPS(アルゴリズム編)

並行・並列処理を活用しなくとも、アルゴリズムを改善することで、パフォーマンスを改善できることもあります。ここでは、効率的なアルゴリズムとPythonでの実装例を紹介します(複雑な話もあるので、できるだけキャッチーな例題を添えて紹介したいと思います)。

14. [★★★] ソート済みのリストを検索するときは二分探索を利用する

例えば、ログファイルから指定した時刻以降に出力されたログを検索したい時と考えます。ここでは、簡単のため以下のような時刻情報を含む辞書のリスト形式から検索することを考えます。

ログ情報を含む辞書のリスト
logs = [
 {'level': 'WARNING',
  'message': 'Log message 0',
  'timestamp': datetime.datetime(2024, 1, 1, 0, 0)},
 {'level': 'ERROR',
  'message': 'Log message 1',
  'timestamp': datetime.datetime(2024, 1, 1, 0, 0, 1)},
 {'level': 'ERROR',
  'message': 'Log message 2',
  'timestamp': datetime.datetime(2024, 1, 1, 0, 0, 2)},
 # 省略、↑の形式でログが続くイメージ
]

この時、シンプルに考えると以下のように古いログから順に見ていき該当時刻以降のログを検索する実装になるかと思います。

該当時刻以降のログを検索(シンプル)
def find_logs_linear(logs, target_time):
    result = []
    for log in logs:
        if log['timestamp'] >= target_time:
            result.append(log)
    return result

このような、順番に並んだ(ソート済みである)データ群の探索は、二分探索を使うことで高速化できます。二分探索自体の説明は他記事に委ねますが、Pythonではbisectモジュールを使うことで簡単に二分探索を実装できます。

該当時刻以降のログを検索(二分探索を利用)
import bisect

def find_logs_binary(logs, target_time):
    start_index = bisect.bisect_left(logs, target_time, key=lambda x: x['timestamp'])
    return logs[start_index:]

↑のコードでは、bisect.bisect_left関数を使用していますが、他にbisect.bisect_right関数もあります。これらの違いは、find_logs_binary関数の返り値のリストに target_timeと同じ時刻のログが含まれるかどうか で、bisect.bisect_leftの場合は含まれるようになります(bisect.bisect_rightの場合は含まれません)。

実際に100万行のダミーのログデータを作成し、それぞれの実行時間を計測すると以下のようになりました。24倍ほど高速化されています。

実行時間の計測コード(折り畳み)
実行時間の計測コード(二分探索)
import bisect
import random
import time
from datetime import datetime, timedelta


def find_logs_linear(logs, target_time):
    result = []
    for log in logs:
        if log["timestamp"] >= target_time:
            result.append(log)
    return result


def find_logs_binary(logs, target_time):
    start_index = bisect.bisect_left(logs, target_time, key=lambda x: x["timestamp"])
    return logs[start_index:]


def generate_dummy_logs(num_logs=1000000):
    base_time = datetime(2024, 1, 1, 0, 0, 0)
    return [
        {
            "timestamp": base_time + timedelta(seconds=i),
            "level": random.choice(["INFO", "WARNING", "ERROR"]),
            "message": f"Log message {i}",
        }
        for i in range(num_logs)
    ]


# ダミーのログデータを生成
logs = generate_dummy_logs()

# 中間あたりの時刻を検索対象とする
target_time = logs[len(logs) // 2]["timestamp"]

# 線形探索の計測
start = time.perf_counter()
result_linear = find_logs_linear(logs, target_time)
elapsed_linear = time.perf_counter() - start

# 二分探索の計測
start = time.perf_counter()
result_binary = find_logs_binary(logs, target_time)
elapsed_binary = time.perf_counter() - start

print(f"線形探索: {elapsed_linear:.6f}s")
print(f"二分探索: {elapsed_binary:.6f}s")
実行時間
線形探索: 0.070348s
二分探索: 0.002930s

注意点として、検索対象のリストはソート済みである必要があります。ソートされてないリストを、ソートしてから二分探索してしまうと、反って遅くなってしまいます。

15. [★★] 最小値(最大値)の取得に優先度付きキューを検討する

最小値(最大値)を取得したい時、よくmin関数(max関数)が使われるかと思いますが、特に最小値(または最大値)を取得したい対象のデータ群が動的に変わる中で何度も取得したい場合は、優先度付きキューが有効です。

Pythonにおいて、優先度付きキューは heapq モジュールに実装されています。これは最小ヒープ14のため、最大値を取得したい場合はヒープに加える値を負の値にすることで最大ヒープとして使用できます。

例えば、救急外来の例を考えます。救急外来では、到着順ではなく患者の症状の緊急度に基づいて治療の順番を決定することもあるようです15
ここでは、単純化して以下のような処理を繰り返すとします:

  1. 新しい患者が来院する(待機リストに追加)
  2. 待機中の患者の中から最も緊急度の高い患者を選んで治療する(待機リストから選択)

このように、患者の追加と緊急度が最も高い患者の選択を繰り返す処理であるため、優先度付きキューによって効率的に処理できそうです。

以下のコードでは、「5人の患者が到着、かつ待機中の患者の中から最も緊急度の高い1人を選ぶ」という操作を1イテレーションとして、5000回イテレーションを回す処理の実行時間を、リストとmax関数を使った実装と、heapqを使った実装で比較しています(最大値の取得のため、優先度付きキューに加える患者データの緊急度を負にしています)。

実行時間の計測コード(折り畳み)
実行時間の計測コード(優先度付きキュー)
import heapq
import random
import time


# 優先度付きキューを使わない実装
def treat_without_heap(waiting_patients):
    if len(waiting_patients) == 0:
        return None
    # 最も緊急度の高い患者を取得
    patient = max(waiting_patients, key=lambda p: p["severity"])
    # 待機リストから削除
    waiting_patients.remove(patient)
    return patient


def arrive_without_heap(arrival, waiting_patients):
    # 患者を待機リストに追加
    waiting_patients.append(arrival)


def test_without_heap(arrivals, num_iterations):
    waiting_patients = []
    treated_list = []

    arrival_index = 0
    for _ in range(num_iterations):
        # 5人到着
        for _ in range(5):
            if arrival_index < len(arrivals):
                arrive_without_heap(arrivals[arrival_index], waiting_patients)
                arrival_index += 1

        # 1人治療
        patient = treat_without_heap(waiting_patients)
        if patient is not None:
            treated_list.append(patient)

    return treated_list


# 優先度付きキューを使う実装
# 比較のため、あえて優先度付きキューを使わない実装と似た構成にしてます
def treat_with_heap(waiting_heap):
    if len(waiting_heap) == 0:
        return None
    # 最も緊急度の高い患者を取得&待機用ヒープから削除
    _, _, patient = heapq.heappop(waiting_heap)
    return patient


def arrive_with_heap(arrival, waiting_heap, index):
    # 待機用ヒープに患者を追加
    # ・緊急度を負の値にする
    # ・緊急度が同じときはindexが小さい方が有利となる
    heapq.heappush(waiting_heap, (-arrival["severity"], index, arrival))


def test_with_heap(arrivals, num_iterations):
    waiting_heap = []
    treated_list = []

    arrival_index = 0
    for _ in range(num_iterations):
        # 5人到着
        for _ in range(5):
            if arrival_index < len(arrivals):
                arrive_with_heap(arrivals[arrival_index], waiting_heap, arrival_index)
                arrival_index += 1

        # 1人治療
        patient = treat_with_heap(waiting_heap)
        if patient is not None:
            treated_list.append(patient)

    return treated_list



if __name__ == "__main__":
    # テストデータの準備
    num_iterations = 5000  # イテレーション回数
    num_patients = num_iterations * 5  # 患者数(25,000人が到着)
    patients = [
        {
            "name": f"patient_{i}",
            "severity": random.randint(1, 10),
        }
        for i in range(num_patients)
    ]

    # 優先度付きキューを使わない場合
    start_time = time.perf_counter()
    test_without_heap(patients.copy(), num_iterations)
    time_without_heap = time.perf_counter() - start_time
    print(f"優先度付きキューなし: {time_without_heap:.6f}s")

    # 優先度付きキューを使う場合
    start_time = time.perf_counter()
    test_with_heap(patients.copy(), num_iterations)
    time_with_heap = time.perf_counter() - start_time
    print(f"優先度付きキュー使用: {time_with_heap:.6f}s")

計算時間は以下の通りになります。300倍の高速化となっています。

実行時間
優先度付きキューなし: 3.936145s
優先度付きキュー使用: 0.017181s

この辺り馴染みの無い人は「使い道あるのか?」と思うかもしれませんが、他にもプリンタのジョブ管理システムやネットワークルーターのパケット処理にも応用されているそうです16。あと、アルゴリズムの中で優先度付きキューを使うものもありますね(最短路問題のダイクストラ法や最小全域木問題のプリム法など)。

16. [★★] 効率的なアルゴリズムを使えないか検討する

複雑な処理をする際は、効率的なアルゴリズムを使えないか検討してみると良いと思います。最近だと生成AIにアドバイスを求めるのもよいかもしれませんね。

以下、ぱっと思いついた活用例と具体的なアルゴリズムの例です。

また個人的にそんな例もあるんだと思ったものを以下に示します。

  • 野球の自力優勝消滅判定18
    • 最大流問題として計算
  • 披露宴の席次決定19
    • 最大輸送問題として計算

システム開発の文脈においては、0からアルゴリズムを品質高く実装できるチームを組むのは中々大変だと思うので、既存のライブラリの実装があれば、そちらを活用するのが良いと思います(例えば、ハンガリアン法は、TIP#19でも紹介するScipyに実装があったりします20)。

17. [★] 数理最適化ソルバーの利用を検討する

複雑な問題に直面したとき、自分でアルゴリズムを実装するのではなく、数理最適化ソルバーを利用することで効率的に解決できる場合があります。

私も愛読させていただいている「しっかり学ぶ数理最適化21」では、「数理最適化」の説明を以下のように述べています。

与えられた制約条件の下で目的関数の値を最小(もしくは最大)にする解を求める最適化問題を通じて、現実社会における意思決定や問題解決を実現する手段である。

そして、数理最適化ソルバーとは、最適化問題を解くためのアルゴリズムを実装したソフトウェアのことです。

数理最適化問題をソルバーで解く際は、ソルバーを直接実行するのではなく、インプットとなる問題のモデリングやソルバーの計算結果取得を楽にするツール/インターフェースを介して利用することが多いです。これらはよくPythonのライブラリとして提供されています。以下にライブラリの例を示します。

  • PuLP
    • CBC 等のソルバーを手掛けるCOIN-ORから提供されているPythonライブラリ
    • デフォルトではCBCがインストールされるが、別のソルバーを使うこともできる
    • 無料でライセンスはMIT license
  • Python-MIP
    • PuLPと同じく、COIN-ORから提供されているライブラリ(こちらの方が新しく、高性能22だが、ドキュメントが少ない印象)
    • こちらも無料で、Eclipse Public License 2.0
  • gurobipy
    • 商用ソルバーGurobiのPythonライブラリ
    • gurobipyのインストールはできるが、中身のソルバーを実行するためにはライセンス契約が必要

ただし、利用する上で、モデリングやアルゴリズム等、数理最適化に対する一定の理解が必要となるため、システム開発での汎用的な利用という意味ではハードルが高いというのが現状でしょうか。私も勉強中の身ではありますが、とても勉強になった書籍・Webページを以下に記載しますので、もし勉強したいという方はご参考にしていただければと思います。

  • 書籍(敬称略)
    • 梅谷俊治. 『しっかり学ぶ数理最適化 モデルからアルゴリズムまで』講談社. 21
    • 岩永二郎・石原響太・西村直樹・田中一樹.『Pythonではじめる数理最適化 ケーススタディでモデリングのスキルを身につけよう』オーム社 23
  • Webページ(敬称略)
    • 久保幹雄. 『Python言語による実務で使える100+の最適化問題』 24

TIPS(他言語の活用編)

場合によっては、ロジック全てをPython記述するのではなく、中身の処理が高速な言語(C++/Rust等)で記述された処理やライブラリを活用するのが有効なケースもあるかと思います。ここでは、そのような他言語を活用した例を紹介します。

18. [★★★] 複数データの操作にNumpyを利用する

皆さんご存じ、昨今のPython人気に大きな影響を与えたと思われるNumpyです。Numpyの中身はC言語で書かれています。

大量のデータを扱う際は、Pythonの標準的なリストよりも、Numpyの配列(ndarray)を使った方が効率の良いことが多いです。極々簡単な例として、0から1000万までの数値を格納したリストに対して、全要素に2を掛け1を加算する処理を考えてみましょう。

Pythonの標準リストを使う場合の実装を以下に示します。

標準リストで実装
def pure_python_calc():
    data = list(range(10**7))
    result = [x * 2 + 1 for x in data]
    return result

一方で、Numpyで実装すると以下のようになるかと思います。(Pythonの標準リストとは異なり、for文を使わず、リストの全要素を一括で操作しています。)

Numpyで実装
import numpy as np

def numpy_calc():
    data = np.arange(10**7)
    result = data * 2 + 1
    return result

これらの実行結果は以下の通りです。10倍以上高速になっています。

実行時間
純粋なPython: 0.916196s
Numpy: 0.063326s

ただし、Pythonの標準リストからndarrayへの変換がコストになることがあるので、他ライブラリとの連携等の理由で、リストからの変換が多くなるようであれば、無理にndarrayを使わなくてもよいかなと思います。

19. [★★] 数値計算にScipyを利用する

よくNumpyとセットで使われるのがScipyです。Scipyは主に科学技術計算向けの様々な実装を用意しており、線形代数、最適化、統計処理、信号処理などの高度な数値計算が可能です。内部ではC・C++やFortranの実装も利用されているそうです。

Pythonのプレーンな実装との速度比較は一概に行うのが難しいため、ここでは割愛しますが、他記事25では数十倍~数百倍速いと述べられていたりします。

20. [★★★] バリデーションにPydanticを利用する

Pydanticは、Pythonのデータバリデーションライブラリです。データモデルを定義し、型ヒントを使って自動的にバリデーションを実行できます。

以下の例では、Userクラスのそれぞれのフィールドの型が想定通りかに加え、nameフィールドが100文字以内であることを自動で検証しています。

Pydanticモデルとバリエーションの例
from pydantic import BaseModel, Field, ValidationError

class User(BaseModel):
    name: str = Field(..., max_length=100)  # 最大100文字
    age: int = Field(..., ge=0, le=150)
    email: str

# 正常なデータ
user = User(name="山田太郎", age=30, email="yamada@example.com")

# 以下はバリデーションエラーになる例(nameが101文字)
# invalid_user = User(name="あ" * 101, age=30, email="invalid")

Pydanticの内部実装はv2からRustとなり、大幅な高速化が実現されています。こちらも、実際のコードによる比較は省略しますが、Pydantic v2リリース当時に執筆された記事26では、「v1と比べて4倍から50倍速くなっていて、通常、約17倍速くなっている」と述べられています。

21. [★★] Pandasの代わりにPolarsを利用する

PandasはNumpyに並ぶPythonを代表するライブラリで、主にデータ分析の文脈でよく使われます。昨今、そのPandasと同等の役割を果たすライブラリとして、Polarsが注目されています。PolarsはRustで実装されており、その実行時間はPandasの100倍に上ることもあるとのことです27

ここでは、100万行のDataFrameをCSVに保存しそれを再読み込みする処理をPandasとPolarsで実装して比較してみましょう。

実行時間の計測コード(折り畳み)
PolarsとPandasの比較コード
import time

import numpy as np
import pandas as pd
import polars as pl

# 100万行×10列のデータを生成
n_rows = 1000000
n_cols = 10

data = {
    f"col_{i}": np.random.randn(n_rows)
    if i % 2 == 0
    else np.random.choice(["A", "B", "C", "D", "E"], n_rows)
    for i in range(n_cols)
}


# Polars
df_polars = pl.DataFrame(data)

# CSV保存
start = time.perf_counter()
df_polars.write_csv("polars_output.csv")
polars_write_time = time.perf_counter() - start
print(f"Polars CSV保存時間: {polars_write_time:.6f}s")

# CSV読み込み(確認用)
start = time.perf_counter()
df_read = pl.read_csv("polars_output.csv")
polars_read_time = time.perf_counter() - start
print(f"Polars CSV読み込み時間: {polars_read_time:.6f}s")


# Pandas
df_pandas = pd.DataFrame(data)

# CSV保存
start = time.perf_counter()
df_pandas.to_csv("pandas_output.csv")
pandas_write_time = time.perf_counter() - start
print(f"Pandas CSV保存時間: {pandas_write_time:.6f}s")

# CSV読み込み(確認用)
start = time.perf_counter()
df_read = pd.read_csv("pandas_output.csv")
pandas_read_time = time.perf_counter() - start
print(f"Pandas CSV読み込み時間: {pandas_read_time:.6f}s")

実行結果は以下の通りです。CSV保存時間についてはPandasの約20倍、CSVの読み込み時間は約7倍高速という結果になりました。

実行時間
Polars CSV保存時間: 0.266396s
Polars CSV読み込み時間: 0.153843s
Pandas CSV保存時間: 5.437147s
Pandas CSV読み込み時間: 1.073464s

Polarsは、Pandasと比較するとまだまだコミュニティが成熟しきってないとは思いますが、Githubのスター数でPandasに猛追(2025年12月時点でPandasが約4.7万、約Polarsが3.6万) していたり、バージョンアップも頻繁に行われている状況であり、もしこれからデータ分析系のプロジェクトが始まるのであれば、ぜひ使ってみてほしいライブラリです。

22. [★] 自力でC・C++・Rustで実装して、Pythonでラップする

最終手段として、パフォーマンスを向上させたい処理を他言語で自前実装し、Pythonから呼び出す方法があります。
例えば、C/C++とRustについては、Pythonからの呼び出しをサポートする以下のようなツールが提供されています。

  • C/C++
    • SWIG
      • GitHubに様々な使用例があります28
    • pybind11
      • GitHubにビルドツールごとの例があります29
  • Rust
    • maturin
      • 手前味噌ですみませんが弊記事30でも紹介しています

注意点として、Pythonだけを利用した開発に比べて、両言語の知見を要する点、またクロスプラットフォーム対応が必要な点等の理由で、開発/保守難易度が大幅に上がってしまうため、Pythonを使わざるを得ない場面でパフォーマンスの要求が高い場合にのみ取るべき手段かなと思います。

TIPS(言語編)

今までは、ソースコードレベルの話をいろいろ述べてきましたが、ここでは、Pythonの言語自体の機能や実装にフォーカスした内容について述べたいと思います。

23. [★★★] Pythonのバージョンアップをする

Pythonはバージョンアップごとに性能を向上させているため、ライブラリや環境面等の特別な事情がない限りは、できるだけ最新のバージョンを使うべきでしょう。

2025年12月現在、3.14までが正式リリースされており、3.15がプレリリースといった状況です。各バージョンごとのstatusは以下で確認できます(もう3.9がEOLなんですねえ・・・)。

バージョンごとの性能差については様々な記事で述べられているため、あえてこの記事で比較は行いません。以下の記事では3.9~3.14に加え、PyPy(後述)等他の実装との比較をいくつかのベンチマークで行っており、バージョンごとの差異を広く確認できます。

例えば、シングルスレッドで40番目のフィボナッチ数を求めるベンチマークにおいては、Python3.10では16.24sかかるのに対し、3.14では6.59sと実行時間が半分以下になっていることが確認できます。

24. [★] 別のPython実装・コンパイラの利用を検討する

Python自体の標準的な実装はCPythonと呼ばれています。これはC言語で書かれたPythonの標準インタプリタです。CPythonが、C++/Rust等と比較し遅い理由を挙げると、例えば以下があるでしょう:

  • インタプリタがバイトコードを1つずつ解釈・実行するため、毎回オーバーヘッドが発生
    • バイトコードの命令ごとに処理を切り替えるために、都度巨大なSwitch文が動いているイメージとのこと31
  • 動的型付けであるため、実行時の型チェックでオーバーヘッドが発生

これらの課題を解決するため、実装レベルでの代替案がいくつか存在します。ここでは、PyPyNumbaCythonについて簡単に紹介します:

  • PyPy
    • CPythonのまるっと代替するインタプリタ実装
    • 後述のuvでもインストール可能
    • JITコンパイラを搭載しており、CPythonと比較し、平均で3倍程度の高速化
  • Numba
    • JITコンパイラを利用し、関数単位で高速化可能(@numba.jit等を追加するだけ)
    • CPython上で動作し、pip installでインストールできる
    • 数十倍~数百倍の高速化が見込めるという記事も32
  • Cython
    • PythonをC言語に変換し、Cコンパイラで機械語に変換するツール
    • Numbaと同様、pip installでインストール可能だが、追加でCコンパイラも必要
    • 型宣言等の追加の記述が必要だが、C言語に変換するので速い

なお、JITコンパイラとは、プログラム実行時に頻繁に使われる部分を機械語に変換し、次回から高速実行する仕組みで、PyPyやNumbaは、これにより高速化を実現しています。

ただし、実行環境やライブラリに制限があったり、もとのPythonとは異なる独自の記法が必要な場合もあるので、採用の際は慎重に吟味したうえ利用することを推奨します。

↑以外にも、Pythonライクで高速な言語があるそうなので、気になる人は調べてみてください(CodonMojoNim等、自分も追い切れてないですが・・・)。

ちなみに、2年前に執筆されたこれらを比較した記事33によると、Codonが一番速い結果となっていました。

ちなみに、競技プログラミングのAtCoderでは2025年現在、実行基盤としてCPythonのほかPyPyやCython、Codon(2025年10月から)も指定できるそうです。私自身、CPythonでTLE(タイムアウト)したけど、PyPyだとAC(成功)みたいなことになった覚えがあります笑

25. [★] (Python3.13~)JITコンパイラ導入を検討する

ところで、Python 3.13からJITコンパイラのオプションが導入されました。

バイナリインストールではなく、CPythonのビルド時にオプションを設定することで、JITコンパイラを利用できます。

ちなみに、Python 3.14では実験的にJITコンパイラ組み込み済みのバイナリが公式提供されているそうです34

ただし、性能改善は2~9% と述べているブログ35もあれば、場合によっては性能が低下したと述べているブログ36もあり、今後に期待といった機能だと思います。

26. [★] CPythonのビルドオプションを変更する

他にも、CPythonには、パフォーマンスに関連するビルドオプション(--enable-optimizations--with-ltoなど)がいくつか用意されています。詳細は以下に載っています。

個人的に気になるのが、3.14から試験的に導入された--with-tail-call-interpというオプションで、これは新型インタプリタを有効にするもののようです37

TIPS#24で、CPythonの仕組みとして、バイトコード毎に「巨大なSwitch文が動いているイメージ」と述べましたが、そちらを改善したとのこと。オプション名の通り Tail Call(末尾呼び出し) を活用したらしいです。

こちらも導入により、平均で3%~5%の改善が見られたようですが、まだ試験的機能であるため、JITコンパイラと同じく今後に期待といった内容かと思います。

TIPS(ハード編)

ここでは、ハードウェアに関する性能改善のためのTIPSアプローチを紹介します。これまで紹介したソフト的な最適化を行ったうえでさらなる性能向上が必要な場合や、応急処置的な対応が求められる場合の選択肢として検討してください。

27. [★★] サーバーのスペックを見直す

決して、ネタ切れというわけではありません!

ここへきて原点回帰のようなTIPSですが、特にクラウド利用の下で、コストに余裕があり、かつ応急処置的な対応でよい場合、スケールアップなどPythonを実行するサーバーのスペック強化が、手っ取り早いケースも多いと思います。

特に、Webアプリのような並列・並行処理を扱っており、それらの並列・並行数を現状のスペックの限界を超えた値に設定したい場合は、スペックをあげた上で、コア数やスレッド数上限を引き上げればよいかと思います(ここはTIPS#11やTIPS#12で述べたような話です)。

注意点として、あくまで対症療法であり、根本的な性能改善が必要な場合は、これまで述べたようなソフト的な改善を優先的に検討するのが良いと思います。

28. [★] GPUの利用を検討する

大規模で並列可能な数値計算を行う場合、GPU(Graphics Processing Unit) を活用することで、劇的な高速化を実現できるケースもあると思います。

主なGPUのユースケースとして3DCGや、ディープラーニング/生成AIの印象が強いかと思いますが、NumPy互換のCuPyや、Pandas互換のcuDF等、汎用的に使えそうなライブラリも提供されていたりします。

ただ、当然CPUよりもコストがかかること、また環境構築の難易度も上がることに注意が必要です(自分もよく詰まってます・・・)。

TIPS(番外編)

最後にPython自体ではなく、Python開発を速くするであろうツールを紹介します。

29. [★★★] プロジェクト作成・ライブラリ管理にuvを使う

uvは、Astral社提供のRustで書かれた高速なPythonパッケージマネージャー・プロジェクト管理ツールです。uvの主な特徴は以下の通りです。

  • Rustで実装されており高速。公式ドキュメントでは、従来のpipやpoetryと比べて、10〜100倍高速に動作すると言われている
  • 一つのツールで、仮想環境作成、パッケージ管理、Pythonバージョン管理が可能
  • 既存のrequirements.txtやpyproject.tomlをそのまま利用可能

本記事の検証環境構築時にも使っていますが、動作も速く、非常に使いやすいです。
もし、まだ使っていない方がいたら、ぜひ使ってほしいツールです(以下にインストール方法が載っています)。

30. [★★★] リンター・フォーマッターとしてRuffを使う

Ruff とは、uvと同じくAstral社提供のリンター・フォーマッターツールで、Flake8、Black、isort、pylintなどの代替となるツールです。Ruffも、Rustで実装されており、高速に動作することが特徴です(公式ドキュメントによると既存のツールの10~100倍高速とのことです)。

Ruffの設定自体、かなり柔軟に設定可能で、Flake8などの他のツールに対応した設定も可能です。詳細は、以下のページをご覧ください:

また、VSCodeの拡張機能も提供されており、こちらもおすすめです。

ちなみに、tyという、高速な型チェックツール(mypyの高速版のようなツール)もAstral社から提供されています(すごいですね)。

最後に

これで以上です。軽い気持ちで始めた記事ですが、最終的にとんでもない文量になってしまいました(もしすべてに目を通した方がいれば、本当にありがとうございますm(__)m)。読んでいただいた皆様にとって、少しでも参考になった箇所があればとても嬉しいです。

一応、いろいろチェックしていますが、もし誤りなどありましたらコメント等でご指摘いただけると幸いです。

改めて、ありがとうございました!

  1. Pythonのデータ構造における計算量については以下が参考になります:
    https://qiita.com/Hironsan/items/68161ee16b1c9d7b25fb

  2. https://docs.python.org/3/faq/programming.html#what-is-the-most-efficient-way-to-concatenate-many-strings-together

  3. https://docs.python.org/3.14/whatsnew/3.14.html#pep-779-free-threaded-python-is-officially-supported

  4. そもそも、Free-Threaded版に変更したことにより、実行時間が数倍速くなっていますが、おそらくこれは参照カウント周りの仕様変更によるものなんじゃないかと思っています(違ったらご指摘くださいm(__)m)。詳しくは以下を参照:
    https://docs.python.org/3/howto/free-threading-python.html#immortalization

  5. https://qiita.com/__Kat__/items/a09838da8fa3d40b9167

  6. https://zenn.dev/o_kai/articles/0f82eb5920a590

  7. https://msiz07-flask-docs-ja.readthedocs.io/ja/latest/deploying/gunicorn.html#running

  8. https://fastapi.tiangolo.com/ja/deployment/server-workers

  9. https://docs.gunicorn.org/en/stable/design.html#how-many-workers その他、この内容に触れた記事も参考になるかもです:https://tech.visasq.com/gunicorn-tuning

  10. https://uvicorn.dev/#running-with-gunicorn

  11. https://anyio.readthedocs.io/en/stable/threads.html#adjusting-the-default-maximum-worker-thread-count

  12. run_in_threadpoolの使用例はこちら:https://zenn.dev/unemployed/articles/fast-api-event-loop#

  13. https://msiz07-flask-docs-ja.readthedocs.io/ja/latest/async-await.html#when-to-use-quart-instead

  14. 最小ヒープや最大ヒープについては以下などを参照
    https://medium.com/@yasufumy/data-structure-heap-ecfd0989e5be

  15. このような手法はトリアージと呼ばれるそうです。トリアージについては下記等を参照
    https://kango.mynavi.jp/contents/nurseplus/career_skillup/20220322-2147624/

  16. https://trends.codecamp.jp/blogs/media/terminology492

  17. BF法は近似アルゴリズムであることに注意してください。

  18. https://coursera.cs.princeton.edu/algs4/assignments/baseball/specification.php

  19. https://zenn.dev/akira_t/articles/seat-opt-via-gw

  20. https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.linear_sum_assignment.html

  21. https://www.kspub.co.jp/book/detail/5212707.html 2

  22. https://python-mip.readthedocs.io/en/latest/bench.html

  23. https://www.ohmsha.co.jp/book/9784274231759/ 第2版がでているようですね。

  24. Webページ:https://scmopt.github.io/opt100/ なお、他に書籍も3種発売されています(←のページに書籍のリンクも載っています)

  25. https://kawap23.hatenablog.com/entry/2019/10/06/143159https://qiita.com/jabberwocky0139/items/57637c1368ea1ebc9b45

  26. https://qiita.com/ksato9700/items/053e06f795d8a9b5d706

  27. https://pola.rs/posts/benchmarks/

  28. https://github.com/swig/swig/tree/master/Examples/python

  29. https://github.com/pybind/python_examplehttps://github.com/pybind/cmake_example

  30. https://qiita.com/nukipei/items/f096a1df6c8074b16150

  31. https://tonybaloney.github.io/posts/python-gets-a-jit.html

  32. https://zenn.dev/maiwb/articles/10f0b96202bba8

  33. https://qiita.com/n4mlz/items/fbadc02b7864f62b9622#6-nim

  34. https://docs.python.org/3.14/whatsnew/3.14.html#binary-releases-for-the-experimental-just-in-time-compiler

  35. https://tonybaloney.github.io/posts/python-gets-a-jit.html#is-it-faster

  36. https://fidget-spinner.github.io/posts/jit-reflections.html#could-be-improved-inaccurate-coverage

  37. https://docs.python.org/3.14/using/configure.html#cmdoption-with-tail-call-interp

187
165
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
187
165

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?