今回は、httpx(非同期処理)についてまとめてみました。
これから業務で非同期処理を書いてく方や、勉強をしていくエンジニアの参考になれば幸いです。
非同期処理とは
複数のタスクを並行して処理するための仕組みです。
特に「I/O待ち(ネットワーク通信、ファイル読み書き、DBアクセスなど)」が発生する処理を効率よく実行するために使われます。
…具体的な例を用いて話ますね。
たとえば、「お客様の情報をデータベースに書き込む」 という処理があるとします。
この書き込みには 10秒 かかるとしましょう。
通常(同期処理の場合)、プログラムはこの 10秒間じっと待っている ので、その間は 他の処理を進めることができません。
その結果、注文確定やメール送信などの処理が 10秒間遅れてしまう ことになります。
しかし、非同期処理を使えば、データベースの書き込みを待っている間に、他の処理を並行して実行 できます!
つまり、「待ち時間をムダにせず、他の処理を進めることで、全体の処理を速くする」 という考え方が非同期処理になります。
この方が非常にわかりやすくまとめてくださっているので、一読を推奨します。
非同期処理とは何か、何が嬉しいの?
非同期処理の基本的な書き方
非同期処理の基本 (async / await)
- Python で非同期関数を作成するには async def を使い、非同期処理を実行するときに await を使います。
async defで書かれた関数を「コルーチン」と呼びます。
一時的に別の処理に移したいタイミングで await を書きます。
import asyncio # 非同期処理用のライブラリ
async def my_task():
print("処理開始")
await asyncio.sleep(2)
print("処理終了")
async def main():
await my_task()
asyncio.run(main())
非同期処理を並列実行する (asyncio.gather)
- asyncio.gather() を使うと、複数の非同期処理を並行実行できます。
import asyncio
async def task1():
print("タスク1 開始")
await asyncio.sleep(3) # 3秒待つ
print("タスク1 終了")
async def task2():
print("タスク2 開始")
await asyncio.sleep(2) # 2秒待つ
print("タスク2 終了")
async def main():
await asyncio.gather(task1(), task2()) # タスク1 & タスク2 を並行実行
asyncio.run(main())
非同期処理の同時実行を制限 (asyncio.Semaphore())
- 非同期処理では 「サーバーへの負荷を制御する」 ことが重要になります!
asyncio.Semaphore() を使うと、同時に実行できる非同期処理の数を制限 できます。
import asyncio
semaphore = asyncio.Semaphore(2) # 最大2つのタスクを同時に実行
async def limited_task(name, seconds):
async with semaphore: # セマフォを使って制限
print(f"{name} 開始")
await asyncio.sleep(seconds)
print(f"{name} 終了")
async def main():
tasks = [
limited_task("タスク1", 3),
limited_task("タスク2", 2),
limited_task("タスク3", 1), # これは最初待機する
limited_task("タスク4", 1) # これも待機
]
await asyncio.gather(*tasks)
asyncio.run(main())
aenter と aexit
- aenter と aexit は、非同期のコンテキストマネージャ を作るための特殊メソッドです。
- 「リソースの確保・解放」を自動化するために使用します。
- aenter → async with の 開始時 に実行される
- aexit → async with の 終了時 に実行される(後片付けや終了処理)
import asyncio
class MyAsyncContext:
async def __aenter__(self):
print("🔹 `async with` の開始")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, traceback):
print("🔹 `async with` の終了")
await asyncio.sleep(1)
async def main():
async with MyAsyncContext() as ctx: # `__aenter__` が実行される
print("✨ 中の処理実行中...")
# `async with` ブロックを抜けたら `__aexit__` が実行される
asyncio.run(main())
非同期処理のサンプルプログラム
- 実際に手で描いて動かしてみたので、まずは全体のコードをお見せします。
下記プログラムは、有名なアニメまとめサイト「ねいろ速報」さんのアニメスレッド一覧とその中身のスレ内容を非同期で取得してテキストに書き込むプログラムです。
このプログラムは「ねいろ速報」さんのサイトを 学習目的でスクレイピングする例 です。
⚠本番環境での運用や、大量のデータ取得は推奨しません。⚠
この非同期処理は、設定を間違えると 「ねいろ速報」 さんのサイトに対して
大量のリクエストが飛び、非常に大きな負荷をかける危険性 があります! ⚠️
✅ 安全に実行するためのポイント
-
同時リクエスト数を制限する(例:
asyncio.Semaphore(3)
に設定) -
await asyncio.sleep(1)
などを入れて、リクエスト間隔を空ける -
長時間・大量のデータを取得しない(適度なページ数で止める)
-
このコードを実行する際は、必ず設定を確認し、必要以上に負荷をかけないようにしてください。
-
万が一トラブルが発生しても、私は一切責任を負いません ので、ご理解の上で自己責任で実行をお願いします。
❌ 負荷がかかる危険な設定
# ⚠️ これは危険な例! (同時リクエスト数が多すぎる)
self.semaphore = asyncio.Semaphore(50) # ❌ 同時50リクエストは負荷が大きい
import httpx
import asyncio
import bs4
import json
from bs4 import BeautifulSoup
import re
class AsyncAnimeThreadScraper:
def __init__(self, max_concurrent_requests=5):
self.base_url = 'http://animesoku.com/archives/cat_234052.html'
self.client = httpx.AsyncClient()
self.semaphore = asyncio.Semaphore(max_concurrent_requests)
return
async def get_thread_titles_and_urls(self, url):
if url == 'http://animesoku.com/archives/cat_234052.html?p=1':
url = self.base_url
try:
response = await self.client.get(url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
scripts = soup.find_all("script")
thread_info = []
for script in scripts:
script_content = script.string
if not script_content:
continue
try:
json_str = script_content.split("ld_blog_vars.articles.push(")[-1].rsplit(");", 1)[0]
if 'permalink' and 'title' and 'id' and 'categories' in json_str:
thread_info.append(json_str)
except Exception as e:
print(f"JSON抽出エラー: {e}")
continue
result_dict = {}
for item in thread_info:
permalink_match = re.search(r"permalink\s*:\s*'([^']+)'", item)
permalink = permalink_match.group(1) if permalink_match else None
title_match = re.search(r"title\s*:\s*'([^']+)'", item)
title = title_match.group(1) if title_match else None
if permalink and title:
result_dict[title] = permalink
return result_dict
except httpx.HTTPStatusError as e:
print(f"HTTPエラー: {self.base_url} => {e}")
async def get_anime_thread_comments(self, title, url):
try:
response = await self.client.get(url, timeout=30)
soup = BeautifulSoup(response.text, "html.parser")
response.raise_for_status()
comments = []
comment_blocks = soup.find_all("div", class_="t_h")
for block in comment_blocks:
comment_text = block.find("b")
if comment_text and comment_text.text.strip():
comments.append(comment_text.text.strip())
comments.insert(0, f"スレッドURL: {url}\n")
for i, comment in enumerate(comments, 1):
print(f"""
スレッドタイトル : {title}
{i}: {comment}""")
with open(f'./thread_dir/{title}.txt', 'w')as f:
f.write("\n".join(comments))
except Exception as e:
print(e)
return
def create_urls(self, page_nums):
urls = []
for page_num in range(1, page_nums + 1):
page = f'{self.base_url}?p={str(page_num)}'
print(page)
urls.append(page)
return urls
async def __aenter__(self):
self.client = httpx.AsyncClient()
return self
async def __aexit__(self, exc_type, exc_val, traceback):
return await self.client.aclose()
async def main(self):
urls = self.create_urls(3)
tasks = []
for url in urls:
tasks.append(self.get_thread_titles_and_urls(url))
results = await asyncio.gather(*tasks)
combined_results = {}
for result in results:
if result:
combined_results.update(result)
print(f"取得結果: {combined_results}")
comment_tasks = []
for title, url in combined_results.items():
comment_tasks.append(self.get_anime_thread_comments(title, url))
await asyncio.gather(*comment_tasks)
return
async def run_scraper():
async with AsyncAnimeThreadScraper(max_concurrent_requests=3) as scraper:
await scraper.main()
if __name__ == "__main__":
asyncio.run(run_scraper())