0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FastAPIで学ぶPythonの非同期IOの極意

Last updated at Posted at 2024-12-27

0_06.png

Pythonはインタプリタ言語であるため、バックエンド開発に使用する場合、例えばPython + Djangoの組み合わせでは、Java + Springと比べると、その応答時間が少し長くなります。ただし、コードが適切であれば、その違いはあまり顕著ではありません。Djangoがマルチプロセスモードを使用しても、その並行処理能力は依然としてかなり弱いです。Pythonには並行処理能力を向上させるいくつかの解決策があります。たとえば、非同期フレームワークのFastAPIを使用すると、その非同期機能により、I/O集中型タスクの並行処理能力を大幅に強化できます。FastAPIは最速のPythonフレームワークの一つです。

FastAPIを例に

まず、FastAPIの使い方を簡単に見てみましょう。

例1: デフォルトのネットワーク非同期I/O

インストール:

pip install fastapi

シンプルなサーバー側コード:

# main.py
from typing import Union

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def read_root():
    return {"Hello": "World"}

起動:

uvicorn main:app --reload

他のフレームワークと比べると、FastAPIのインターフェイスには追加のasyncキーワードがあることがわかります。asyncキーワードは、インターフェイスを非同期と定義します。戻り結果だけからは、FastAPIと他のPythonフレームワークの違いを見分けることはできません。違いは並行アクセスにあります。FastAPIのサーバースレッドがルートリクエストを処理するとき、例えばhttp://127.0.0.1:8000/のような場合、ネットワークI/Oに遭遇した場合、それを待つことなく、他のリクエストを処理します。ネットワークI/Oが完了すると、実行が再開されます。この非同期機能により、I/O集中型タスクの処理能力が向上します。

例2: 明示的なネットワーク非同期I/O

もう一つの例を見てみましょう。ビジネスコードでは、明示的な非同期ネットワークリクエストが発行されます。このネットワークI/Oについても、ルートリクエストと同様に、FastAPIはそれを非同期で処理します。

# app.py
from fastapi import FastAPI, HTTPException
import httpx

app = FastAPI()

# 非同期GETリクエストの例
@app.get("/external-api")
async def call_external_api():
    url = "https://leapcell.io"
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        if response.status_code!= 200:
            raise HTTPException(status_code=response.status_code, detail="Failed to fetch data")
        return response.json()

データベースI/Oを非同期にするには、データベースドライバまたはORMからの非同期操作のサポートが必要です。

非同期I/O

FastAPIの非同期性のコア実装は「非同期I/O」です。FastAPIを使用せずに、直接非同期I/Oを使用して、非同期処理能力を持つサーバーを起動することができます。

import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(1)  # I/O操作をシミュレート
    return web.Response(text='{"Hello": "World"}', content_type='application/json')

async def init(loop):
    # イベントループを使用してWebリクエストを監視
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    # サーバーを起動し、イベントループがWebリクエストを監視および処理
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv

# 明示的にイベントループを取得
loop = asyncio.get_event_loop()
# イベントループを起動
loop.run_until_complete(init(loop))
loop.run_forever()

この例を起動すると、http://127.0.0.1:8000/の戻り結果は例1と同じです。非同期I/Oの基本的な実装原理は「コルーチン」と「イベントループ」です。

コルーチン

async def index(request):
    await asyncio.sleep(1)  # I/O操作をシミュレート
    return web.Response(text='{"Hello": "World"}', content_type='application/json')

index関数はasync defで定義されており、これはコルーチンです。I/O操作の前にawaitキーワードが使用されており、実行スレッドにこのI/O操作を待つことなく、他のタスクを実行するように指示します。通常の関数の呼び出しはスタックを通じて実装されており、関数は一つずつ呼び出されて実行されます。一方、コルーチンは特殊な種類の関数(コラボレーションスレッドではない)です。それは、awaitマークでスレッドが実行を一時停止し、他のタスクを実行するように切り替えることを可能にします。I/O操作が完了すると、実行が続きます。

複数のコルーチンが並行して実行される効果を見てみましょう。

import asyncio
from datetime import datetime

async def coroutine3():
    print(f"Coroutine 3 started at {datetime.now()}")
    await asyncio.sleep(1)  # I/O操作をシミュレート
    print(f"Coroutine 3 finished at {datetime.now()}")

async def coroutine2():
    print(f"Coroutine 2 started at {datetime.now()}")
    await asyncio.sleep(1)  # I/O操作をシミュレート
    print(f"Coroutine 2 finished at {datetime.now()}")

async def coroutine1():
    print(f"Coroutine 1 started at {datetime.now()}")
    await asyncio.sleep(1)  # I/O操作をシミュレート
    print(f"Coroutine 1 finished at {datetime.now()}")

async def main():
    print("Main started")

    # タスクを作成してコルーチンを並行実行
    task1 = asyncio.create_task(coroutine1())
    task2 = asyncio.create_task(coroutine2())
    task3 = asyncio.create_task(coroutine3())

    # すべてのタスクが完了するのを待つ
    await task1
    await task2
    await task3

    print("Main finished")

# メインコルーチンを実行
asyncio.run(main())

output:

Main started
Coroutine 1 started at 2024-12-27 12:28:01.661251
Coroutine 2 started at 2024-12-27 12:28:01.661276
Coroutine 3 started at 2024-12-27 12:28:01.665012
Coroutine 1 finished at 2024-12-27 12:28:02.665125
Coroutine 2 finished at 2024-12-27 12:28:02.665120
Coroxide 3 finished at 2024-12-27 12:28:02.665120
Main finished

スレッドは3つのタスクを順番に実行していないことがわかります。I/O操作に遭遇すると、他のタスクを実行するように切り替えます。I/O操作が完了すると、続けて実行します。また、3つのコルーチンは基本的に同じ時間にI/O操作を待ち始めており、最終的な実行完了時間も基本的に同じです。ここでは明示的にイベントループを使用していませんが、asyncio.runは暗黙的にそれを使用します。

ジェネレータ

コルーチンはジェネレータを通じて実装されています。ジェネレータは関数の実行を一時停止し、再開することができます。これはコルーチンの特性です。

def simple_generator():
    print("First value")
    yield 1
    print("Second value")
    yield 2
    print("Third value")
    yield 3

# simple_generatorはジェネレータ関数、genはジェネレータ
gen = simple_generator() 

print(next(gen))  # output: First value \n 1
print(next(gen))  # output: Second value \n 2
print(next(gen))  # output: Third value \n 3

next()でジェネレータを実行すると、yieldに遭遇すると、一時停止します。next()を再度実行すると、前回一時停止したyieldから続けて実行されます。Python 3.5以前では、コルーチンは「注釈」+ yeildで書かれていました。Python 3.5からは、async def + awaitが使用されています。

import asyncio
from datetime import datetime

@asyncio.coroutine
def my_coroutine():
    print("Start coroutine", datetime.now())
    # asyncio.sleep(1)の非同期呼び出し:
    yield from asyncio.sleep(1)
    print("End coroutine", datetime.now())

# イベントループを取得
loop = asyncio.get_event_loop()
# コルーチンを実行
loop.run_until_complete(my_coroutine())
loop.close()

ジェネレータの一時停止と再開機能は、コルーチン以外にも多くのことに使用できます。例えば、ループ中に計算を行い、アルゴリズムを保存することができます。例えば、パスカルの三角形を実装する場合(各行の両端は1で、他の位置の数は上の2つの数の合計)。

def pascal_triangle():
    row = [1]
    while True:
        yield row
        new_row = [1]  # 各行の最初の要素は常に1
        for i in range(1, len(row)):
            new_row.append(row[i - 1] + row[i])
        new_row.append(1)  # 各行の最後の要素は常に1
        row = new_row

# パスカルの三角形の最初の5行を生成して表示
triangle = pascal_triangle()
for _ in range(5):
    print(next(triangle))

output:

[1]
[1, 1]
[1, 2, 1]
[1, 3, 3, 1]
[1, 4, 6, 4, 1]

イベントループ

コルーチンの実行は一時停止できるため、コルーチンはいつ再開されるのでしょうか?これには、イベントループを使用して、実行スレッドに指示する必要があります。

# イベントループを取得
loop = asyncio.get_event_loop()
# イベントループがコルーチンを実行
loop.run_until_complete(my_coroutine())
loop.close()

イベントループはI/Oマルチプレクシング技術を使用して、コルーチンが続行できるイベントを常に監視するようにループします。実行可能になった場合、スレッドはコルーチンの実行を続けます。

I/Oマルチプレクシング技術

簡単にI/Oマルチプレクシングを理解するには、宅配便の拠点のボスだと想像してください。私は各宅配員に彼らのタスクの完了状況を積極的に尋ねる必要はありません。代わりに、宅配員はタスクを完了した後、自動的に私に報告してきます。

ezgif-4-2b53d0af38.jpg

これにより、私のタスク処理能力が向上し、より多くのことができるようになります。
selectpollepollはすべてI/Oマルチプレクシングを実現できます。selectpollと比較して、epollの性能がより良いです。Linuxでは一般的にデフォルトでepollが使用され、macOSではkqueueが使用されます。kqueueepollと似ており、同様の性能を持っています。

イベントループを使用したソケットサーバー

import selectors
import socket

# selectorsオブジェクトを作成する。Linuxで実行する場合、epollの実装と同等
sel = selectors.DefaultSelector()

# 要求受信イベントハンドリング関数。新しい接続を受け入れ、読み取りイベントを登録
def accept(sock, mask):
    conn, addr = sock.accept()  # 接続を受け入れる
    print('Accepted connection from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)  # 読み取りイベントを登録

# Request reading event handling function. Read request data and send an HTTP response, then close the connection.
def read(conn, mask):
    data = conn.recv(100)  # 接続からデータを読み取る
    print('response to')
    response = "HTTP/1.1 200 OK\r\n" \
              "Content-Type: application/json\r\n" \
              "Content-Length: 18\r\n" \
              "Connection: close\r\n" \
              "\r\n" \
              "{\"Hello\": \"World\"}"
    conn.send(response.encode())  # データを返信
    print('Closing connection')
    sel.unregister(conn)  # イベントを登録解除
    conn.close()  # 接続を閉じる

# Create a server socket
sock = socket.socket()
sock.bind(('localhost', 8000))
sock.listen()
sock.setblocking(False)

# 受信イベントを登録
sel.register(sock, selectors.EVENT_READ, accept)

print("Server is running on port 8000...")

# イベントループ
while True:
    # リクエストがないときはここでブロックする
    events = sel.select()  # 準備ができているファイルディスクリプタ(イベント)を選択
    print("events length: ", len(events))
    for key, mask in events:
        callback = key.data  # イベントハンドリング関数を取得
        print("handler_name:", callback.__name__)
        callback(key.fileobj, mask)  # イベントハンドリング関数を呼び出す

サーバーソケットを起動して指定ポートを監視します。Linuxシステムで実行する場合、selectorsはデフォルトでepollを使用して実装されています。このコードはepollを使って、要求受信イベント(acceptイベント)を登録しています。新しい要求が到着すると、epollがトリガーされてイベントハンドリング関数が実行され、同時に要求データを処理して応答するための読み取りイベント(readイベント)を登録します。http://127.0.0.1:8000/ からWeb側からアクセスすると、返却結果は例1と同じになります。サーバーの実行ログ:

Server is running on port 8000...
events length:  1
handler_name: accept
Accepted connection from ('127.0.0.1', 60941)
events length:  1
handler_name: read
response to
Closing connection

ソケットサーバー

直接ソケットを使ってサーバーを起動する場合、ブラウザでhttp://127.0.0.1:8080/にアクセスするか、curl http://127.0.0.1:8080/を使うと、{"Hello": "World"}が返されます。

import socket
from datetime import datetime

# TCPソケットを作成
server_socket = socket.socket()

# ソケットを指定したIPアドレスとポート番号にバインド
server_socket.bind(('127.0.0.1', 8001))

# 着信接続をリッスンするように開始
server_socket.listen(5)

# クライアント接続を受け入れるループ
while True:
    print("%s Waiting for a connection..." % datetime.now())
    client_socket, addr = server_socket.accept() # ここでブロックし、クライアント接続を待つ
    print(f"{datetime.now()} Got connection from {addr}")

    # クライアントデータを受信
    data = client_socket.recv(1024)
    print(f"Received: {data.decode()}")

    # 応答データを送信
    response = "HTTP/1.1 200 OK\r\n" \
               "Content-Type: application/json\r\n" \
               "Content-Length: 18\r\n" \
               "Connection: close\r\n" \
               "\r\n" \
               "{\"Hello": \"World\"}"

    client_socket.sendall(response.encode())

    # クライアントソケットを閉じる
    client_socket.close()

curl http://127.0.0.1:8001/でアクセスすると、サーバーの実行ログは以下の通りです:

2024-12-27 12:53:36.711732 Waiting for a connection...
2024-12-27 12:54:30.715928 Got connection from ('127.0.0.1', 64361)
Received: GET / HTTP/1.1
Host: 127.0.0.1:8001
User-Agent: curl/8.4.0
Accept: */*

まとめ

非同期I/Oは、基本的に「コルーチン」と「イベントループ」を使って実装されています。「コルーチン」は、スレッドが実行中にマークされたI/O操作に遭遇したとき、そのI/Oが完了するのを待つ必要がなく、一時停止して、ブロックせずに他のタスクを実行できるようにするものです。「イベントループ」はI/Oマルチプレクシング技術を使い、常にループしてI/Oイベントを監視します。特定のI/Oイベントが完了すると、対応するコールバックがトリガーされ、コルーチンの実行を続けることができます。


Leapcell:FastAPIやその他のPythonアプリケーション向けの理想的なプラットフォーム

最後に、Flask/FastAPIをデプロイする理想的なプラットフォームであるLeapcellを紹介しましょう。

Leapcellは、現代の分散アプリケーション向けに設計されたクラウドコンピューティングプラットフォームです。その従量制の価格設定モデルにより、アイドルコストが発生せず、ユーザーは実際に使用したリソースだけを支払うことになります。

barndpic.png

LeapcellがWSGI/ASGIアプリケーションに対して持つユニークな利点:

1. 多言語サポート

  • JavaScript、Python、Go、またはRustでの開発をサポートしています。

2. 無制限プロジェクトの無料デプロイ

  • 使用量に基づいて課金のみ。リクエストがない場合は課金されません。

3. 比類なきコスト効率

  • 従量制で、アイドル料金はかかりません。
  • 例えば、25ドルで694万件のリクエストをサポートし、平均応答時間は60ミリ秒です。

4. 簡素化された開発者体験

  • 直感的なユーザーインターフェイスで簡単にセットアップ可能。
  • 完全自動化されたCI/CDパイプラインとGitOps統合。
  • リアルタイムのメトリクスとログで、実行可能な洞察を提供します。

5. 容易なスケーラビリティと高性能

  • 自動スケーリングで、高い並列性を簡単に処理できます。
  • オペレーションオーバーヘッドがゼロで、開発者は開発に集中できます。

詳細はドキュメントで確認できます。
LeapcellのTwitter:https://x.com/LeapcellHQ

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?