1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[Python]非同期処理テクニックその1

Last updated at Posted at 2025-11-08

概要

このプロジェクトは、Pythonの同期関数から非同期関数を呼び出すための実装とテストケースを提供します。

対象読者

  • 既存の同期コードベースに非同期ライブラリ(例:OpenAI API、httpxなど)を組み込みたい方
  • Pythonのasync/awaitの制約に悩んでいる方
  • Django/Flaskなどの同期フレームワークから非同期処理を使いたい方
  • CLIツールやスクリプトで非同期関数を使いたい方

このプロジェクトに含まれるもの

  1. async_core.py: コア機能(実際のプロジェクトで使い回せる唯一のファイル)
  2. サンプル関数: 非同期関数の実装例
  3. テストケース: 正しい使い方と間違った使い方の具体例
  4. 詳細なドキュメント: 動作原理の完全な解説

なぜこの記事を作ったのか

Pythonの非同期処理は強力ですが、「同期から非同期を呼ぶ」という基本的な問題に対する明確なドキュメントが少ないため、多くの開発者が同じ問題に直面します。このプロジェクトは、その問題を実装からテストまで含めて完全に解説します。

: この実装は、AWS Strands Agentsの処理の一部を参考にしています。Strands Agentsでも同様の方法で同期コンテキストから非同期関数を実行しています。

📋 この記事の構成

この記事は、以下の構成で段階的に理解を深められるように構成されています:

1. 概要と問題提起(現在のセクション)

  • 対象読者の確認
  • 解決する問題の理解
  • なぜこの記事が必要なのか

2. ❓ なぜこのようなコードが必要なのか?

  • Python特有の同期/非同期の壁の説明
  • 他の言語(Go)との比較
  • いつこのコードが必要か/不要かの判断基準

3. 📦 実際のプロジェクトで使い回しできるファイル

  • async_core.py の機能紹介
  • run_async() vs run_async_stream() の使い分け
  • 基本的な使い方

4. 🔍 動作原理の詳細解説

  • ThreadPoolExecutorとasyncio.runの組み合わせ
  • ステップバイステップの動作説明
  • なぜlambdaが必要なのか

5. ファイル構成

  • コア機能(async_core.py)の詳細説明
  • サンプル関数(async_functions.py)の使い方
  • テストケースの詳細(test_error.pytest_basic.pytest_streaming.pytest_executor_submit.py

6. 実行方法とテスト

  • 全テストの一括実行
  • 個別テストの実行方法
  • 推奨学習順序

7. 重要なポイント

  • 正しい使い方と間違った使い方
  • フロー図による視覚的理解

8. 📝 まとめと実用例

  • 核心的なアイデアの再確認
  • FastAPI/Flaskでの実用例
  • いつ使うべきか/使わなくていいか

9. 🔧 既存のライブラリ・代替手段

  • nest_asyncio などの代替案
  • それぞれの比較と推奨事項

推奨読み方:

  • すぐに使いたい方: セクション3「📦 実際のプロジェクトで使い回しできるファイル」から開始
  • 仕組みを理解したい方: セクション2「❓ なぜこのようなコードが必要なのか?」から順番に読む
  • 実装の詳細を知りたい方: セクション4「🔍 動作原理の詳細解説」を重点的に読む

解決する問題

Pythonでは、以下のようなコードは動きません:

# ❌ エラー:同期関数内でawaitは使えない
def my_sync_function():
    result = await some_async_function()  # SyntaxError!
    return result

このプロジェクトの async_core.py を使えば、以下のように書けます:

# ✅ 動く!
from async_core import run_async

def my_sync_function():
    result = run_async(lambda: some_async_function())
    return result

❓ なぜこのようなコードが必要なのか?

Python特有の問題:同期コンテキストから非同期関数を呼べない

Pythonでは、呼び出し元が同期関数か非同期関数かで大きく異なります。

📌 重要:async関数の呼び出しと実行の違い

多くの人が混乱するポイントですが、async関数は「呼び出す = 実行する」ではありません

async def func_a():
    await asyncio.sleep(1)
    print("実行されました!")
    return "結果"

# ❌ 呼び出すだけ(func_a())→ エラーにはならないが、何も実行されない
result = func_a()
print(type(result))  # <class 'coroutine'>
print(result)        # <coroutine object func_a at 0x...>
# → "実行されました!" は表示されない!

# ✅ 実際に実行するには await func_a() が必要
async def main():
    result = await func_a()  # ここで初めて実行される
    print(result)            # "結果"
    # → "実行されました!" が表示される

asyncio.run(main())

重要なポイント:

  • func_a() → コルーチンオブジェクトを返すだけ(遅延実行のための「予約」)
  • await func_a() → 実際に実行して結果を取得

つまり:

# 通常の関数
def normal_func():
    return "結果"

result = normal_func()  # ✅ 呼び出すだけで実行される
print(result)           # "結果"

# async関数
async def async_func():
    return "結果"

result = async_func()          # ❌ 実行されない(コルーチンオブジェクトが返される)
result = await async_func()    # ✅ 実行される(awaitが必要)

async関数は「awaitする = 実行する」なんです!

これを理解していないと、以下のような間違いをしてしまいます:

# ❌ 間違い:呼び出すだけでは実行されない
async def send_email(to: str):
    # メール送信処理
    await email_client.send(to)

def process_users(users):
    for user in users:
        # これだとメールは送信されない!
        send_email(user.email)  # コルーチンオブジェクトが作られるだけ

# ✅ 正しい:awaitで実行する
async def process_users(users):
    for user in users:
        await send_email(user.email)  # ちゃんと送信される

ケース1: 同期関数から非同期関数を呼ぶ(❌ 問題あり)

# 通常の同期関数
def main():
    # ❌ これはできない(awaitは非同期関数内でしか使えない)
    result = await invoke_async("メッセージ")
    
    # ❌ これもダメ(コルーチンオブジェクトが返されるだけで実行されない)
    result = invoke_async("メッセージ")
    print(result)  # <coroutine object invoke_async at 0x...>
    
    # ✅ この複雑な仕組みが必要
    result = run_async(lambda: invoke_async("メッセージ"))
    print(result)  # 処理完了: メッセージ

→ この問題を解決するのが async_core.py です!(コードは下の方にあります)

ケース2: 非同期関数から非同期関数を呼ぶ(✅ 問題なし)

# 非同期関数
async def main():
    # ✅ 非同期関数内なら普通にawaitできる
    result = await invoke_async("メッセージ")
    print(result)  # 処理完了: メッセージ
    
    # ✅ ストリーミングも簡単
    async for chunk in invoke_async_stream("テスト"):
        print(chunk)

# 非同期関数を実行する時だけasyncio.runが必要
if __name__ == "__main__":
    asyncio.run(main())

→ 非同期関数内なら複雑な処理は不要です!

まとめ:いつ async_core.py が必要?

呼び出し元 呼び出し先 方法 複雑さ
同期関数 同期関数 普通に呼ぶ ⭐ シンプル
非同期関数 非同期関数 await で呼ぶ ⭐ シンプル
非同期関数 同期関数 普通に呼ぶ ⭐ シンプル
同期関数 非同期関数 run_async() ⭐⭐⭐ 複雑

async_core.py が必要なのは最後のケースだけです。

他の言語では?

Go言語の場合、このような複雑な処理は不要です:

// goroutineで簡単に並行処理
func main() {
    go processAsync("メッセージ")  // これだけ!
}

func processAsync(message string) {
    time.Sleep(1 * time.Second)
    fmt.Println("処理完了:", message)
}

// チャネルでストリーミング
func streamData() <-chan string {
    ch := make(chan string)
    go func() {
        defer close(ch)
        for i := 0; i < 5; i++ {
            ch <- fmt.Sprintf("チャンク%d", i)
        }
    }()
    return ch
}

func main() {
    for chunk := range streamData() {
        fmt.Println("受信:", chunk)
    }
}

Python vs Go の比較

項目 Python Go
並行処理の起動 ThreadPoolExecutor + asyncio.run go func()
軽量性 スレッドは重い goroutineは超軽量(数万個可)
ストリーミング async for + yield channel + range
同期/非同期の壁 厳密に分かれている ほぼ意識不要
複雑さ 高い(このコードが必要) 低い(go一つで完結)

結論: この async_core.pyPython特有の制約を解決するため のコードです。
Go言語などでは、このような複雑な仕組みは必要ありません。

📦 実際のプロジェクトで使い回しできるファイル

使い回しできるのは async_core.py のみです。

他のファイルは全て、async_core.py の使い方を説明するためのサンプル/テストコードです。
実際のプロジェクトでは、async_core.py だけをコピーして使用してください。

async_core.py の主要な機能

1. run_async() - 非同期関数を同期的に実行

通常の非同期関数を同期コンテキストから実行し、結果を返します。

def run_async(async_lambda: Callable = None) -> Any:
    """同期コンテキストから非同期関数を実行"""
    # ... 実装

使用例:

result = run_async(lambda: fetch_data())

特徴:

  • ✅ 簡単に使える
  • ❌ 全ての処理が完了するまでブロッキング
  • ❌ フロントエンドへのストリーミング送信は不可

2. run_async_stream() - 非同期ジェネレーターを同期ジェネレーターに変換

⭐️ フロントエンドへのストリーミング対応 - 新機能!

非同期ジェネレーターを同期ジェネレーターに変換し、リアルタイムでデータを yield します。

def run_async_stream(async_gen_lambda: Callable) -> Generator[Any, None, None]:
    """
    非同期ジェネレーターを同期ジェネレーターとして実行
    フロントエンドへのストリーミングレスポンスに使用可能
    """
    # ... 実装(Queueとthreadingを使用)

使用例:

# フロントエンドへリアルタイム送信
for chunk in run_async_stream(lambda: get_llm_stream()):
    send_to_frontend(chunk)  # 各チャンクを即座に送信可能

特徴:

  • ✅ リアルタイムでチャンクを yield
  • ✅ FastAPI/Flaskのストリーミングレスポンスに対応
  • ✅ ユーザーは処理中も途中結果を確認できる

2つの機能の比較

機能 run_async() run_async_stream()
戻り値 最終結果のみ チャンクごとにyield
ブロッキング 完了まで待機 チャンクごとに返す
フロントエンド対応 ❌ 不可 ✅ 可能
使用場面 単純な非同期処理 LLMストリーミング、進捗表示

使い方

# async_core.pyをインポート
from async_core import run_async

# あなた自身の非同期関数
async def my_async_function():
    # 実際の処理
    ...

# 同期コンテキストから実行
result = run_async(lambda: my_async_function())

🔍 動作原理の詳細解説

なぜこのような仕組みが必要なのか?

Pythonの asyncio.run() は、すでにイベントループが実行中の場合にエラーを出します
これは、同じスレッド内でイベントループをネストできないためです。

# ❌ これはエラーになる
async def outer():
    # すでにイベントループが実行中
    asyncio.run(inner())  # RuntimeError: asyncio.run() cannot be called from a running event loop

async def inner():
    await asyncio.sleep(1)

解決策:別スレッドで新しいイベントループを作る

ThreadPoolExecutorを使って別のスレッドで実行することで、この問題を回避します。

事前知識:ThreadPoolExecutorとは?

ステップバイステップの動作説明に入る前に、このコードで使用しているThreadPoolExecutorについて理解しておきましょう。

スレッドとは?

スレッドは、プログラムの実行単位です。1つのプログラム内で複数のスレッドを動かすことで、複数の処理を並行して実行できます。

import threading
import time

def task(name):
    print(f"{name} 開始")
    time.sleep(2)
    print(f"{name} 完了")

# メインスレッド
print("メインスレッド開始")

# 別スレッドで実行
thread1 = threading.Thread(target=task, args=("タスクA",))
thread2 = threading.Thread(target=task, args=("タスクB",))

thread1.start()  # タスクAを別スレッドで開始
thread2.start()  # タスクBを別スレッドで開始

# 両方のスレッドが完了するまで待機
thread1.join()
thread2.join()

print("全てのタスク完了")

重要なポイント:

  • 各スレッドは独立して動作する
  • スレッドごとに独立したスタックとローカル変数を持つ
  • Pythonのイベントループは、各スレッドごとに独立している

ThreadPoolExecutorとは?

ThreadPoolExecutorは、複数のスレッドを効率的に管理するための仕組みです。

通常のスレッド管理の問題:

# ❌ スレッドを毎回作成・破棄するのは非効率
for i in range(100):
    thread = threading.Thread(target=task, args=(i,))
    thread.start()
    thread.join()  # 毎回完了を待つ

ThreadPoolExecutorを使った効率的な管理:

# ✅ スレッドプールを作成し、タスクを投げるだけ
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=5) as executor:
    # スレッドプールが自動的にタスクを割り振る
    for i in range(100):
        executor.submit(task, i)

ThreadPoolExecutorの利点:

  1. スレッドの再利用: 毎回スレッドを作成/破棄しない
  2. 同時実行数の制御: max_workersで同時実行数を制限
  3. 簡単な結果取得: Futureオブジェクトで結果を受け取れる
  4. 自動クリーンアップ: with文で自動的にスレッドを終了

ThreadPoolExecutorの基本的な使い方

from concurrent.futures import ThreadPoolExecutor
import time

def calculate(x):
    print(f"計算開始: {x}")
    time.sleep(1)
    result = x * x
    print(f"計算完了: {x}{result}")
    return result

# スレッドプールを作成
with ThreadPoolExecutor(max_workers=3) as executor:
    # タスクを投げる(Futureオブジェクトが返される)
    future1 = executor.submit(calculate, 10)
    future2 = executor.submit(calculate, 20)
    future3 = executor.submit(calculate, 30)
    
    # 結果を取得(完了するまで待機)
    result1 = future1.result()  # 100
    result2 = future2.result()  # 400
    result3 = future3.result()  # 900
    
    print(f"結果: {result1}, {result2}, {result3}")

実行の流れ:

  1. executor.submit()でタスクを投げる → すぐに返る(ノンブロッキング)
  2. スレッドプールが利用可能なスレッドでタスクを実行
  3. future.result()で結果を取得 → 完了まで待機(ブロッキング)

なぜThreadPoolExecutorを使うのか?

このコードでは、以下の理由でThreadPoolExecutorを使用しています:

1. 別スレッドで新しいイベントループを作るため

# ❌ 同じスレッド内ではイベントループがネストできない
async def outer():
    asyncio.run(inner())  # RuntimeError!

# ✅ 別スレッドなら新しいイベントループを作れる
def run_in_thread():
    asyncio.run(inner())  # OK!

with ThreadPoolExecutor() as executor:
    executor.submit(run_in_thread)

2. 同期的に結果を取得するため

# 別スレッドで非同期処理を実行し、結果を同期的に取得
with ThreadPoolExecutor() as executor:
    future = executor.submit(asyncio.run, async_function())
    result = future.result()  # 完了まで待機して結果を取得
    return result  # 同期関数に結果を返す

3. リソースの自動管理

# with文を使えば自動的にスレッドをクリーンアップ
with ThreadPoolExecutor() as executor:
    # 処理...
    pass
# ここでスレッドプールが自動的に終了

まとめ:このコードにおけるThreadPoolExecutorの役割

def run_async(async_lambda):
    with ThreadPoolExecutor() as executor:
        # 別スレッドでexecute関数を実行
        future = executor.submit(execute, async_lambda)
        # 結果を同期的に取得
        return future.result()

def execute(async_func):
    # 別スレッド内で新しいイベントループを作成
    return asyncio.run(async_func())

3つの重要なポイント:

  1. 別スレッド = イベントループの衝突を回避
  2. Future = 非同期処理の結果を同期的に取得
  3. with文 = リソースの自動管理

これで、ThreadPoolExecutorがなぜ必要なのか、どのように動作するのかが理解できました。次のステップバイステップの説明で、実際の動作を見ていきましょう。

ステップバイステップの動作説明

result = run_async(lambda: my_async_function())

この1行が実行されると、以下の流れで処理されます:

ステップ1: lambda チェック

if async_lambda is None:
    raise ValueError("async_lambda引数は必須です")
  • lambdaがないと、非同期関数が即座に実行されてしまう
  • lambdaで包むことで「関数オブジェクト」として渡し、実行を遅延させる

ステップ2: ThreadPoolExecutor作成

with ThreadPoolExecutor() as executor:
  • スレッドプールを作成(デフォルトでCPUコア数分のワーカー)
  • with文で自動的にクリーンアップ

ステップ3: 別スレッドでexecuteを実行

future = executor.submit(execute, async_lambda)

executor.submit() とは?

executor.submit(関数, 引数1, 引数2, ...) は、指定した関数を別のスレッドで実行するメソッドです。

具体的な動作:

# この呼び出しは以下と同等
future = executor.submit(execute, async_lambda)

# 別スレッドで以下が実行される:
# execute(async_lambda)

3つの重要なポイント:

  1. 即座に返る(ノンブロッキング)

    • submit() はすぐに制御を返す
    • 別スレッドでの実行完了を待たない
    • 「タスクを投げる」だけ
  2. Futureオブジェクトが返される

    • Future = 「未来の結果」を表すオブジェクト
    • 別スレッドでの処理がまだ完了していなくてもOK
    • 結果の「引換券」のようなもの
  3. 引数の渡し方

    # 第1引数: 実行する関数オブジェクト(特別な役割)
    # 第2引数以降: その関数に渡す引数
    executor.submit(execute, async_lambda)
    #              ↑       ↑
    #              関数     引数
    

    重要な理解:

    # この2つは同じ意味
    executor.submit(func, arg1, arg2, arg3)
    # ↓ 別スレッドで実行される内容
    func(arg1, arg2, arg3)
    

    つまり:

    • 第1引数 = 「何を実行するか」(関数そのもの)
    • 第2引数以降 = 「その関数の引数」(関数に渡す値)

    具体例で確認:

    def add(a, b, c):
        return a + b + c
    
    # 通常の呼び出し
    result = add(10, 20, 30)  # → 60
    
    # executor.submitでの呼び出し
    future = executor.submit(add, 10, 20, 30)
    #                        ↑    ↑   ↑   ↑
    #                        関数  第1 第2 第3引数
    result = future.result()  # → 60(別スレッドで実行された結果)
    

具体例で理解する:

from concurrent.futures import ThreadPoolExecutor
import time

def heavy_task(name, duration):
    print(f"{name} 開始")
    time.sleep(duration)
    print(f"{name} 完了")
    return f"{name}の結果"

with ThreadPoolExecutor() as executor:
    # 3つのタスクを別スレッドで実行
    future1 = executor.submit(heavy_task, "タスクA", 1)
    future2 = executor.submit(heavy_task, "タスクB", 2)
    future3 = executor.submit(heavy_task, "タスクC", 3)
    
    print("全てのタスクを投げました(まだ完了していない)")
    
    # 結果を取得(ここでブロッキング)
    result1 = future1.result()  # タスクAが完了するまで待機
    result2 = future2.result()  # タスクBが完了するまで待機
    result3 = future3.result()  # タスクCが完了するまで待機
    
    print(result1, result2, result3)

このコードでの使い方:

  • メインスレッド: run_async() を実行中
  • 別スレッド: execute(async_lambda) を実行
  • Future: 別スレッドの結果を保持する入れ物

ステップ4: execute関数内で新しいイベントループ作成

def execute(async_func: Callable) -> Any:
    result = asyncio.run(async_func())
    return result
  • 別スレッド内なので、asyncio.run()が問題なく動作
  • 新しいイベントループを作成
  • async_func()を実行(ここで初めてlambdaが実行される)
  • awaitで非同期処理を完了まで待機
  • 結果を返す

ステップ5: 結果を取得

result = future.result()
  • future.result()は、別スレッドの処理が完了するまでブロッキングで待機
  • 別スレッドで実行されたexecute()の戻り値を取得
  • 呼び出し元(同期関数)に結果を返す

重要なポイント

  1. なぜlambdaが必要?

    lambdaは2つの重要な役割を果たします:

    ① 実行タイミングを遅延させる

    # ❌ lambda なし - 即座に実行されてコルーチンオブジェクトが返される
    result = run_async(invoke_async("メッセージ"))
    #                  ↑ この時点で invoke_async() が実行される
    
    # ✅ lambda あり - 実行を遅延させる
    result = run_async(lambda: invoke_async("メッセージ"))
    #                  ↑ 関数オブジェクトとして渡され、execute内で初めて実行される
    

    ② 引数付きで非同期関数を渡す

    # lambdaがないと引数を渡せない
    async def fetch_data(user_id, token):
        # 処理...
    
    # ✅ lambdaで引数をキャプチャ
    result = run_async(lambda: fetch_data("user123", "token456"))
    #                          ↑ 引数を含めて関数オブジェクトとして渡す
    

    まとめ:

    • 実行タイミング: execute() 内(別スレッド)で実行させるため
    • 引数の保持: 引数付きの関数呼び出しを「パッケージ化」して渡すため

    lambdaがないと、executor.submit(execute, ???) で非同期関数と引数を一緒に渡せません。

  2. なぜThreadPoolExecutorが必要?

    • メインスレッドにイベントループがある場合の衝突を避けるため
    • 新しいスレッドで独立したイベントループを作るため
  3. なぜasyncio.run()を別スレッドで実行?

    • 同じスレッド内でイベントループはネストできないため
    • 新しいスレッド = 新しいイベントループを作れる
  4. future.result()の役割

    • 別スレッドの処理完了を待つ(同期的に)
    • 非同期処理の結果を同期関数に返す

ファイル構成

コア機能

async_core.py - コア機能(実際のプロジェクトで使い回せる唯一のファイル)

このファイルには2つの主要な関数があります:

  1. run_async(async_lambda: Callable) -> Any

    • 同期コンテキストから非同期関数を実行
    • 結果を返すまでブロッキング
    • 使用例: result = run_async(lambda: fetch_data())
  2. run_async_stream(async_gen_lambda: Callable) -> Generator

    • 非同期ジェネレーターを同期ジェネレーターに変換
    • リアルタイムでチャンクをyield
    • FastAPI/Flaskのストリーミングレスポンスに対応
    • 使用例: for chunk in run_async_stream(lambda: get_llm_stream()): ...

補助関数:

  • execute(async_func: Callable) -> Any: ThreadPoolExecutor内で実行される内部関数

サンプル関数

async_functions.py - サンプル非同期関数(テスト用)

テストやデモンストレーション用のサンプル関数群です。実際のプロジェクトではこれらを自分の非同期関数に置き換えます。

  1. invoke_async(message: str) -> str

    • 通常の非同期関数のサンプル
    • 1秒待機してメッセージを返す
    • 使用例: result = await invoke_async("こんにちは")
    async def invoke_async(message: str) -> str:
        await asyncio.sleep(1)
        return f"処理完了: {message}"
    
  2. invoke_async_stream(message: str) -> AsyncGenerator

    • 非同期ジェネレーターのサンプル(LLMストリーミングをシミュレート)
    • 単語単位でストリーミング配信
    • 使用例: async for word in invoke_async_stream("テスト"): ...
    async def invoke_async_stream(message: str):
        words = ["Pythonの", "非同期処理は", "強力な", "機能です。"]
        for word in words:
            await asyncio.sleep(0.3)
            yield word
    
  3. invoke_sync_stream(message: str) -> Generator

    • 通常のジェネレーター(非async)のサンプル
    • ⚠️ 注意: async forでは使えない(通常のforで使用)
    • 使用例: for chunk in invoke_sync_stream("テスト"): ...
    def invoke_sync_stream(message: str):
        chunks = ["[開始]", "[処理中]", "[完了]"]
        for chunk in chunks:
            yield chunk
    

テストケース

test_error.py - エラーパターン

間違った使い方とそのエラーを確認できるテストです。

テスト関数:

  1. test_without_lambda() - ❌ lambdaなしのエラー

    try:
        # Noneを渡すとValueErrorが発生
        result = run_async(None)
    except ValueError as e:
        print(f"❌ エラー: {e}")
        # → ValueError: async_lambda引数は必須です
    

    学習ポイント: lambdaなしでは非同期関数を遅延実行できない

  2. test_direct_call() - ❌ 非同期関数を直接呼び出す

    # awaitなしで直接呼び出し
    result = invoke_async("テストメッセージ")
    print(type(result))
    # → <class 'coroutine'>(コルーチンオブジェクト)
    print(result)
    # → <coroutine object invoke_async at 0x...>
    

    学習ポイント: 非同期関数は直接呼び出すだけでは実行されない

実行方法:

python test_error.py

test_basic.py - 基本的な非同期処理

基本的な非同期関数の呼び出しパターンをテストします。

テスト関数:

  1. test_with_lambda() - ✅ 正しいパターン

    # lambda内で非同期関数を呼び出す
    result = run_async(lambda: invoke_async("こんにちは"))
    # → 結果: 処理完了: こんにちは
    
  2. test_multiple_async() - ✅ 複数の非同期処理を順次実行

    async def process_multiple():
        result1 = await invoke_async("処理1")
        result2 = await invoke_async("処理2")
        return f"{result1} | {result2}"
    
    result = run_async(lambda: process_multiple())
    # → 結果: 処理完了: 処理1 | 処理完了: 処理2
    

📌 重要:asyncio.gather() で複数の非同期処理を並列実行

上記の例では、複数の非同期処理を順次実行していますが、並列実行したい場合は asyncio.gather() を使います。

順次実行 vs 並列実行の違い:

# ❌ 順次実行(遅い)- 各処理が完了するまで次に進まない
async def sequential_process():
    result1 = await fetch_data("API1")  # 2秒待つ
    result2 = await fetch_data("API2")  # 2秒待つ
    result3 = await fetch_data("API3")  # 2秒待つ
    return [result1, result2, result3]
    # 合計: 6秒かかる

# ✅ 並列実行(速い)- 全て同時に実行
async def parallel_process():
    results = await asyncio.gather(
        fetch_data("API1"),  # 同時に実行
        fetch_data("API2"),  # 同時に実行
        fetch_data("API3"),  # 同時に実行
    )
    return results
    # 合計: 2秒で完了(最も遅い処理の時間)

asyncio.gather() の使い方:

import asyncio

async def fetch_user(user_id: int):
    await asyncio.sleep(1)  # API呼び出しをシミュレート
    return f"ユーザー{user_id}のデータ"

async def main():
    # ✅ 複数の非同期処理を並列実行
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )
    print(results)
    # → ['ユーザー1のデータ', 'ユーザー2のデータ', 'ユーザー3のデータ']
    # 1秒で完了(順次なら3秒かかる)

asyncio.run(main())

リストを使った動的な並列実行:

async def process_all_users(user_ids: list[int]):
    # リスト内包表記でタスクを作成
    tasks = [fetch_user(user_id) for user_id in user_ids]
    
    # 全てのタスクを並列実行
    results = await asyncio.gather(*tasks)
    return results

# 使用例
user_ids = [1, 2, 3, 4, 5]
results = await process_all_users(user_ids)

エラーハンドリング:

async def safe_parallel_process():
    # return_exceptions=True で、エラーが発生しても他の処理を続行
    results = await asyncio.gather(
        fetch_data("API1"),
        fetch_data("API2"),
        fetch_data("API3"),
        return_exceptions=True  # エラーもリストに含まれる
    )
    
    # 結果を処理
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"タスク{i}でエラー: {result}")
        else:
            print(f"タスク{i}の結果: {result}")

run_async() と組み合わせる:

from async_core import run_async

def sync_function():
    """同期関数から複数の非同期処理を並列実行"""
    async def parallel_tasks():
        results = await asyncio.gather(
            invoke_async("タスク1"),
            invoke_async("タスク2"),
            invoke_async("タスク3"),
        )
        return results
    
    # 並列実行された結果を取得
    results = run_async(lambda: parallel_tasks())
    print(results)
    # → ['処理完了: タスク1', '処理完了: タスク2', '処理完了: タスク3']

まとめ:

  • 順次実行: await func1()await func2()await func3() (合計時間が加算される)
  • 並列実行: await asyncio.gather(func1(), func2(), func3()) (最も遅い処理の時間だけ)
  • 用途: 複数のAPI呼び出し、複数のDB クエリ、複数のファイル読み込みなど

実行方法:

python test_basic.py

test_streaming.py - ストリーミング処理

LLMのストリーミングレスポンスやリアルタイム処理のパターンをテストします。

テスト関数:

  1. test_async_streaming() - ✅ LLMレスポンスのストリーミング

    async def process_stream():
        print("🤖 AI: ", end="", flush=True)
        async for word in invoke_async_stream("ストリーミングテスト"):
            print(word, end="", flush=True)  # リアルタイム表示
        return "完了"
    
    result = run_async(lambda: process_stream())
    # 出力: 🤖 AI: これは ストリーミング テストです
    

    用途: チャットボットのリアルタイム表示

  2. test_sync_stream_with_for() - ✅ データ処理の進捗表示

    async def process_sync_stream():
        for chunk in invoke_sync_stream("データ処理"):
            print(f"⏳ 進捗: 処理中...")
        return "完了"
    
    result = run_async(lambda: process_sync_stream())
    

    用途: バッチ処理の進捗表示

  3. test_frontend_streaming() - ✅ フロントエンドへのストリーミング ⭐️重要

    # フロントエンドへリアルタイム送信
    for chunk in run_async_stream(lambda: invoke_async_stream("フロントエンド")):
        print(f"📡 チャンク送信: {chunk}")  # 各チャンクを即座に送信
    

    用途: FastAPI/Flaskのストリーミングレスポンス(推奨パターン)

  4. test_comparison_blocking_vs_streaming() - ✅ ブロッキング vs ストリーミングの比較

    • run_async(): 全て完了するまで待機(❌ フロントエンドは待たされる)
    • run_async_stream(): チャンクごとに送信(✅ リアルタイム表示可能)
  5. test_sync_stream_with_async_for() - ❌ エラー例

    # 通常のジェネレーターをasync forで使うとエラー
    async for chunk in invoke_sync_stream("テスト"):  # TypeError!
        print(chunk)
    

実行方法:

python test_streaming.py

test_executor_submit.py - ThreadPoolExecutorの動作理解

executor.submit() の動作を具体例で確認できるテストです。
async_core.py の仕組みを理解するための補助教材として使用します。

テスト関数:

  1. test1_basic_submit() - 基本的なsubmitの使い方

    with ThreadPoolExecutor() as executor:
        future = executor.submit(heavy_task, "タスクA", 2)
        # → すぐに返る(ノンブロッキング)
        
        # メインスレッドで他の処理ができる
        print("他の処理...")
        
        # 結果を取得(完了まで待機)
        result = future.result()
    

    学習ポイント: submit() は即座に返り、result() でブロッキング

  2. test2_multiple_tasks() - 複数タスクの並列実行

    future1 = executor.submit(heavy_task, "タスクA", 1)
    future2 = executor.submit(heavy_task, "タスクB", 2)
    future3 = executor.submit(heavy_task, "タスクC", 3)
    # → 全て並列で実行される
    

    学習ポイント: 複数タスクが同時に別スレッドで実行される

  3. test3_sequential_vs_parallel() - 逐次実行 vs 並列実行の比較

    • 逐次実行: 3秒かかる(1秒 + 1秒 + 1秒)
    • 並列実行: 1秒で完了(同時実行)
      学習ポイント: ThreadPoolExecutorの高速化効果を体感
  4. test4_with_arguments() - 複数の引数を渡す方法

    def calculate(operation: str, a: int, b: int):
        # 計算処理
    
    future = executor.submit(calculate, "add", 10, 20)
    #                        ↑          ↑    ↑   ↑
    #                        関数       引数1 引数2 引数3
    

    学習ポイント: submit(関数, 引数1, 引数2, ...) の使い方

  5. test5_exception_handling() - エラーハンドリング

    future = executor.submit(task_with_error, True)
    try:
        result = future.result()
    except Exception as e:
        print(f"エラー: {e}")
    

    学習ポイント: 別スレッドのエラーは result() で受け取れる

実行方法:

python test_executor_submit.py

このテストファイルの目的:

  • async_core.py で使用している executor.submit() の動作を理解する
  • ThreadPoolExecutorの基本的な使い方を学ぶ
  • 「なぜ別スレッドで実行するのか」を体感する

実行スクリプト

  • run_all_tests.py - 全てのテストを一括実行

実行方法

全てのテストを一括実行

python run_all_tests.py

実行内容:

  • test_error.py - エラーパターンのテスト
  • test_basic.py - 基本的な非同期処理のテスト
  • test_streaming.py - ストリーミング処理のテスト
  • test_executor_submit.py - ThreadPoolExecutorの動作確認

個別のテストを実行

エラーパターンのテスト

python test_error.py

内容:

  • ❌ lambdaなしのエラー
  • ❌ 非同期関数を直接呼び出すエラー

基本的な非同期処理のテスト

python test_basic.py

内容:

  • ✅ lambdaを使った基本パターン
  • ✅ 複数の非同期処理の順次実行

ストリーミング処理のテスト

python test_streaming.py

内容:

  • ✅ LLMレスポンスのストリーミング表示
  • ✅ データ処理の進捗表示
  • ✅ フロントエンドへのストリーミング(重要)
  • ✅ ブロッキング vs ストリーミングの比較

ThreadPoolExecutorの動作確認

python test_executor_submit.py

内容:

  • 基本的なsubmitの使い方
  • 複数タスクの並列実行
  • 逐次実行 vs 並列実行の比較
  • 複数の引数を渡す方法
  • エラーハンドリング

推奨学習順序:

  1. まず test_error.py で間違った使い方を確認(何がNGかを理解)
  2. 次に test_basic.py で正しい基本パターンを学習
  3. その後 test_executor_submit.py で ThreadPoolExecutor の動作を理解(オプション)
  4. 最後に test_streaming.py でストリーミング処理を学習(実用的)

重要なポイント

✅ 正しい使い方

# lambdaで非同期関数をラップ
result = run_async(lambda: invoke_async("メッセージ"))

# async/awaitを使った非同期ジェネレーターはasync forで
# LLMレスポンスのストリーミング表示
async def process():
    print("🤖 AI: ", end="", flush=True)
    accumulated_text = ""
    async for word in invoke_async_stream("テスト"):
        print(word, end="", flush=True)  # リアルタイム表示
        accumulated_text += word
    print()
    return accumulated_text

❌ 間違った使い方

# lambdaなしで呼び出し
result = run_async(None)  # ValueError

# 非同期関数を直接呼び出し(awaitなし)
result = invoke_async("テスト")  # コルーチンオブジェクトが返される

# 通常のジェネレーターをasync forで使用
async for chunk in invoke_sync_stream("テスト"):  # TypeError
    print(chunk)

フロー図

📝 まとめ

このコードが解決する問題

Pythonの async/await には「同期関数から非同期関数を呼べない」という根本的な制約があります。
このコードは、その制約を ThreadPoolExecutorasyncio.run を組み合わせることで解決します。

核心的なアイデア

  1. lambda で非同期関数をラップ: すぐに実行せず、関数オブジェクトとして渡す
  2. 別スレッドで実行: ThreadPoolExecutorで新しいスレッドを起動
  3. 新しいイベントループを作成: asyncio.runで独立したイベントループを作る
  4. 結果を同期的に取得: future.resultで結果を待機して返す

いつ使うべきか

使うべき場合:

  • 既存の同期コードベースに非同期ライブラリを統合する時
  • フレームワークの制約で同期関数しか使えない時
  • メイン関数が同期で、一部だけ非同期処理を使いたい時

使わなくていい場合:

  • プロジェクト全体を非同期で書ける時(asyncio.run(main()) で完結)
  • 非同期関数内から別の非同期関数を呼ぶ時(await で十分)
  • Go言語など、並行処理がシンプルな言語を使える時

重要な注意点

  • オーバーヘッド: スレッド起動とイベントループ作成にコストがかかる
  • パフォーマンス: 高頻度の呼び出しには向かない
  • 設計: できれば非同期関数内で完結させる設計を推奨

実用例

例1: 非同期APIクライアント

# FastAPIのような非同期フレームワークを同期CLIで使う
from async_core import run_async
from my_async_api import fetch_data

def cli_command(user_id: str):
    """CLIコマンド(同期)"""
    # 非同期APIクライアントを同期コンテキストで使用
    result = run_async(lambda: fetch_data(user_id))
    print(f"データ取得: {result}")

例2: 同期フレームワークから非同期DB

# 同期的なWebフレームワークから非同期DBを使う
def handle_request(request):
    """Django/Flaskのビュー関数(同期)"""
    async def query_db():
        async with async_db.connect() as conn:
            return await conn.fetch("SELECT * FROM users")
    
    users = run_async(lambda: query_db())
    return render_template("users.html", users=users)

例3: FastAPIでのストリーミングレスポンス ⭐️ 推奨

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from async_core import run_async_stream
from openai import AsyncOpenAI

app = FastAPI()

@app.get("/chat")
def chat_endpoint(prompt: str):
    """
    FastAPIのストリーミングエンドポイント
    ⭐️ run_async_stream を使うことでフロントエンドへリアルタイム送信
    """
    async def get_openai_stream():
        client = AsyncOpenAI()
        stream = await client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            stream=True
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                yield f"data: {content}\n\n"  # SSE形式
    
    # ✅ run_async_streamで同期ジェネレーターに変換
    return StreamingResponse(
        run_async_stream(lambda: get_openai_stream()),
        media_type="text/event-stream"
    )

# フロントエンド側(JavaScript)
# const response = await fetch('/chat?prompt=こんにちは');
# const reader = response.body.getReader();
# while (true) {
#     const {done, value} = await reader.read();
#     if (done) break;
#     console.log(new TextDecoder().decode(value)); // リアルタイム表示
# }

例4: Flaskでのストリーミングレスポンス

from flask import Flask, Response
from async_core import run_async_stream
from openai import AsyncOpenAI

app = Flask(__name__)

@app.route('/chat')
def chat_endpoint():
    """
    Flaskのストリーミングエンドポイント
    ⭐️ run_async_stream を使うことでフロントエンドへリアルタイム送信
    """
    prompt = request.args.get('prompt')
    
    async def get_openai_stream():
        client = AsyncOpenAI()
        stream = await client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            stream=True
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
    
    # ✅ run_async_streamで同期ジェネレーターに変換
    return Response(
        run_async_stream(lambda: get_openai_stream()),
        mimetype='text/event-stream'
    )

例5: CLIツールでのブロッキング処理(run_async)

# CLIツールでは、ストリーミングが不要な場合もある
from async_core import run_async
from openai import AsyncOpenAI

def chat_cli(prompt: str):
    """CLIツール(ストリーミング不要の場合)"""
    async def get_response():
        client = AsyncOpenAI()
        stream = await client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            stream=True
        )
        
        full_response = ""
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                full_response += chunk.choices[0].delta.content
        
        return full_response
    
    # ❌ ブロッキング処理:完了まで待機
    return run_async(lambda: get_response())

# 使用例
response = chat_cli("Pythonの非同期処理について教えて")
print(f"レスポンス: {response}")

このコードは Pythonの同期/非同期の壁を越えるための橋渡し です。
シンプルではありませんが、避けられない制約を実用的に解決します。

🔧 既存のライブラリ・代替手段

「このような複雑な処理は毎回同じような形になるのでは?」という疑問は正しいです。
実際、いくつかのライブラリや代替手段があります。

⚠️ 重要: このコードの目的は「同期関数から非同期関数を呼ぶ」ことです。
逆の「非同期関数から同期関数を呼ぶ」ための標準機能(asyncio.to_thread() など)とは用途が異なります

このコードと同じ用途(同期→非同期)のライブラリ

1. nest_asyncio

イベントループのネスト問題を解決するライブラリ。

pip install nest_asyncio
import asyncio
import nest_asyncio

# グローバルに一度だけ適用
nest_asyncio.apply()

# 同期関数から非同期関数を呼べるようになる
def sync_function():
    # 通常はエラーだが、nest_asyncioで可能になる
    result = asyncio.run(async_function())
    return result

async def async_function():
    await asyncio.sleep(1)
    return "完了"

メリット:

  • コードがシンプル
  • 既存コードの変更が少ない

デメリット:

  • グローバルにイベントループの動作を変更する
  • 予期しない副作用の可能性がある
  • Jupyter Notebookなど、既にイベントループが動いている環境で問題になることも

使用場面: JupyterやIPythonで非同期コードを実行する時

2. asyncio.run() の繰り返し呼び出し

標準ライブラリのみで、シンプルに使う方法。

import asyncio

def sync_function():
    # 毎回新しいイベントループで実行
    result = asyncio.run(async_function())
    return result

async def async_function():
    await asyncio.sleep(1)
    return "完了"

メリット:

  • 依存ゼロ
  • 非常にシンプル

デメリット:

  • 既にイベントループが動いている場合は使えない
  • 毎回イベントループを作成/破棄するオーバーヘッド

使用場面: CLIツールやシンプルなスクリプト

3. カスタムソリューション(このコード)

# このプロジェクトのasync_core.py
from async_core import run_async

# 同期関数から非同期関数を実行
def sync_function():
    result = run_async(lambda: async_function())
    return result

メリット:

  • 依存関係なし(標準ライブラリのみ)
  • 既存のイベントループがあっても動作する
  • 完全なコントロール
  • 仕組みが理解しやすい

デメリット:

  • 自分で実装・メンテナンスが必要
  • ThreadPoolExecutorのオーバーヘッド

使用場面: 学習目的、既存の同期コードベースへの統合

参考:逆方向(非同期→同期)のライブラリ

以下は逆方向(非同期関数内から同期関数を呼ぶ)の場合に使います。

asyncio.to_thread() (Python 3.9+)

# ❌ このコードの代替にはならない
async def async_function():
    # 同期関数を別スレッドで実行(逆方向)
    result = await asyncio.to_thread(sync_blocking_function)

asyncio.run_in_executor()

# ❌ このコードの代替にはならない
async def async_function():
    loop = asyncio.get_event_loop()
    # 同期関数を別スレッドで実行(逆方向)
    result = await loop.run_in_executor(None, sync_blocking_function)

比較表

方法 用途 依存 イベントループ衝突 複雑さ 推奨度
async_core.py 同期→非同期 なし 解決済み ⭐⭐⭐⭐⭐
nest_asyncio 同期→非同期 外部 解決済み ⭐⭐⭐⭐
asyncio.run() 同期→非同期 なし 問題あり ⭐⭐⭐
asyncio.to_thread() 非同期→同期 なし N/A (用途が違う)
run_in_executor() 非同期→同期 なし N/A (用途が違う)

推奨事項

このコードと同じ用途(同期→非同期)の場合:

  1. 学習・理解目的async_core.py を使う
  2. シンプルなCLI/スクリプトasyncio.run() を直接使う
  3. Jupyter/IPythonnest_asyncio を使う
  4. 本番環境・既存コード統合async_core.py をコピーして使う

結論

実は、「同期→非同期」の橋渡しのための汎用的な標準ライブラリは存在しません
これは設計上の理由で、Pythonの非同期は「最初から非同期で設計する」ことが推奨されているためです。

そのため、このようなカスタム実装nest_asyncio のような特殊なライブラリが必要になります。

重要なのは、どのソリューションも根本的には同じアプローチ(別スレッドで新しいイベントループを作る)を使っているということです。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?