前提
本日のお題
13. 並列 Web ダウンローダ(asyncio)
何を作る?
URL リストを渡すと、非同期で一気にダウンロードするスクリプト。
学べること
-
asyncio/aiohttpの基本 - 同時接続数の制限(セマフォ)
- 例外発生時のリトライ
面白いところ
- 同じ処理を同期と非同期で書いて速度差を体感できる
- 「ネットワーク IO で非同期が効く」実感が得られる
回答
準備
pip install aiohttp
コード
13_async_downloader.py
"""
並列 Web ダウンローダ(asyncio + aiohttp)
機能:
- URL リストを非同期で並列ダウンロード
- 同時接続数をセマフォで制限
- 失敗時はリトライ
使い方例:
# 単純に URL を列挙
python async_downloader.py \
--out downloads \
https://example.com/file1.jpg \
https://example.com/file2.jpg
# URL を書いたテキストファイルからまとめて読み込む
python async_downloader.py --out downloads --urls-file urls.txt
"""
import argparse
import asyncio
from pathlib import Path
from typing import List, Optional
from urllib.parse import urlparse, unquote
import aiohttp
from aiohttp import ClientError
DEFAULT_CONCURRENCY = 5
DEFAULT_RETRY = 3
DEFAULT_TIMEOUT = 20.0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="並列 Web ダウンローダ(asyncio + aiohttp)"
)
parser.add_argument(
"urls",
nargs="*",
help="ダウンロードする URL(スペース区切り)。--urls-file も併用可。",
)
parser.add_argument(
"--urls-file",
type=Path,
help="1 行 1 URL で書かれたテキストファイル",
)
parser.add_argument(
"--out",
"-o",
type=Path,
default=Path("downloads"),
help="出力先ディレクトリ(デフォルト: ./downloads)",
)
parser.add_argument(
"--concurrency",
"-c",
type=int,
default=DEFAULT_CONCURRENCY,
help=f"同時接続数(デフォルト: {DEFAULT_CONCURRENCY})",
)
parser.add_argument(
"--retry",
"-r",
type=int,
default=DEFAULT_RETRY,
help=f"失敗時のリトライ回数(デフォルト: {DEFAULT_RETRY})",
)
parser.add_argument(
"--timeout",
type=float,
default=DEFAULT_TIMEOUT,
help=f"1 リクエストあたりのタイムアウト秒数(デフォルト: {DEFAULT_TIMEOUT})",
)
return parser.parse_args()
def load_urls(args: argparse.Namespace) -> List[str]:
urls: List[str] = []
if args.urls_file:
if not args.urls_file.exists():
raise FileNotFoundError(f"URL ファイルが見つかりません: {args.urls_file}")
with args.urls_file.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
urls.append(line)
urls.extend(args.urls)
# 重複をざっくり削る(順序は一応維持)
seen = set()
unique_urls = []
for u in urls:
if u not in seen:
seen.add(u)
unique_urls.append(u)
return unique_urls
def filename_from_url(url: str) -> str:
"""
URL からファイル名を推定する。
パスが / で終わっている場合は "index.html" を付ける簡単仕様。
"""
parsed = urlparse(url)
path = parsed.path
if not path or path.endswith("/"):
return "index.html"
name = path.rsplit("/", 1)[-1]
if not name:
return "downloaded_file"
return unquote(name)
async def fetch_one(
session: aiohttp.ClientSession,
url: str,
out_dir: Path,
sem: asyncio.Semaphore,
retry: int,
timeout: float,
) -> None:
"""
1 つの URL をダウンロードするタスク。
- 同時実行数は sem によって制限
- 失敗時は retry 回までリトライ
"""
filename = filename_from_url(url)
out_path = out_dir / filename
for attempt in range(1, retry + 1):
try:
async with sem:
print(f"[{attempt}/{retry}] GET {url}")
async with session.get(url, timeout=timeout) as resp:
if resp.status != 200:
raise ClientError(f"HTTP {resp.status}")
data = await resp.read()
out_dir.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(data)
print(f" -> saved to {out_path}")
return
except (asyncio.TimeoutError, ClientError, OSError) as e:
print(f" !! Error on {url}: {e}")
if attempt == retry:
print(f" xx Giving up: {url}")
else:
await asyncio.sleep(1.0) # 簡易バックオフ
async def download_all(
urls: List[str],
out_dir: Path,
concurrency: int,
retry: int,
timeout: float,
) -> None:
sem = asyncio.Semaphore(max(concurrency, 1))
timeout_cfg = aiohttp.ClientTimeout(total=None) # 個別の get() で timeout をかける
headers = {
"User-Agent": "AsyncDownloader/0.1",
"Accept": "*/*",
}
async with aiohttp.ClientSession(timeout=timeout_cfg, headers=headers) as session:
tasks = [
fetch_one(session, url, out_dir, sem, retry, timeout)
for url in urls
]
await asyncio.gather(*tasks)
def main():
args = parse_args()
try:
urls = load_urls(args)
except FileNotFoundError as e:
print(e)
return
if not urls:
print("ダウンロード対象の URL がありません。")
print("・コマンドライン引数で URL を指定する")
print("・--urls-file で URL 一覧ファイルを指定する")
return
print("=== 並列 Web ダウンローダ ===")
print(f"URL 件数 : {len(urls)}")
print(f"出力ディレクトリ: {args.out.resolve()}")
print(f"同時接続数 : {args.concurrency}")
print(f"リトライ回数 : {args.retry}")
print()
try:
asyncio.run(
download_all(
urls=urls,
out_dir=args.out,
concurrency=args.concurrency,
retry=args.retry,
timeout=args.timeout,
)
)
except KeyboardInterrupt:
print("\n[INFO] 中断されました。")
if __name__ == "__main__":
main()
実行例
※ダウンロードするものはテキトウデス。全く意味はありません。
>python 13_async_downloader.py --out 13_downloads https://v.ftcdn.net/06/96/17/82/700_F_696178212_UcAeKXiXhOn8J8zkQQbPtHteCWkX3T3l_ST.mp4 https://v.ftcdn.net/17/66/22/45/700_F_1766224505_XjLDbNG5Kkpk2QDd0oYedf9afsT0qu2A_ST.mp4
=== 並列 Web ダウンローダ ===
URL 件数 : 2
出力ディレクトリ: C:\path\to\13_downloads
同時接続数 : 5
リトライ回数 : 3
[1/3] GET https://v.ftcdn.net/06/96/17/82/700_F_696178212_UcAeKXiXhOn8J8zkQQbPtHteCWkX3T3l_ST.mp4
[1/3] GET https://v.ftcdn.net/17/66/22/45/700_F_1766224505_XjLDbNG5Kkpk2QDd0oYedf9afsT0qu2A_ST.mp4
-> saved to 13_downloads\700_F_1766224505_XjLDbNG5Kkpk2QDd0oYedf9afsT0qu2A_ST.mp4
-> saved to 13_downloads\700_F_696178212_UcAeKXiXhOn8J8zkQQbPtHteCWkX3T3l_ST.mp4
感想
- aiohttpは使用したことがなかったので勉強になった。
- 非同期でhttpを使用する場面が(Pythonを使っていて)どれだけあるかは図りかねますが頭の片隅に置いておきたい。