Edited at

asyncioで大きなファイルを並行ダウンロード

More than 1 year has passed since last update.


はじめに

サイトのクローリングやスクレイピングに並行処理は必須。だけどasyncやaiohttpは記法が独特で記事も少ないしレファレンスもわかりづらい。ま、scrapyあるしいっか...

そう思っていた時期が私にもありました。

最近になってなんとなく動かせるようになったのでなんとなく記事にします。

レファレンスはこちら

asyncioの詳しい説明はこちら(Pythonにおける非同期処理: asyncio逆引きリファレンス)が詳しいです。


インストール

asyncioは標準モジュールですが、aiohttpはインストールしないといけません。

pip install aiohttp


簡単な使い方

以下は公式のexampleファイルです。


example.py

import aiohttp

import asyncio
import async_timeout

async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url) as response:
return await response.text()

async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'http://python.org')
print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())


asyncioの基本は非同期処理したい部分をasyncで修飾し、それらを含む関数もasyncで修飾します。処理の結果をawaitで取得します。asyncで書いて、awaitで取得です。また、公式では非同期な部分をasync with~ でラッピングするような記法によって非同期処理を自動的にクローズするような書き方をしています。

exampleを以下のように書き換えることもできます。


example2.py

import asyncio

import async_timeout

async def main():
async with aiohttp.ClientSession() as session:
with async_timeout.timeout(10):
async with session.get('http://python.org') as response:
html = await response.text()
print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())



重いファイルの並行ダウンロード

今回作ったプログラムは以下の構成となっています。

async def download(url):

"""
ダウンロード処理のメイン。
urlから一つのファイルをダウンロードしてくる。
"""

def download_many(url_list):
"""
イベントループを開始し、
受け取ったurl_listからひとつずつdownload(url)を起動する。
"""

def main(download_many):
"""
download_many()を引数に取り、関数の実行にかかった時間やファイル数を計測する。
"""

実際に非同期的な処理をするのはdownload()のみです。なので他の関数はasyncで修飾しません。

では実際のプログラムです。

import asyncio

import aiohttp
import time
import os

async def download(url):
start = time.time() #1
chunk_size = 10 #2
extention = 'zip'
filename = url.split('/')[-1]

async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=600) as resp: #3
print('start: {}: {}'.format(resp.status,url))

save_path = './{}.{}'.format(filename, extention)
with open(save_path, 'wb') as fd:
while True:
chunk = await resp.content.read(chunk_size) #4
if not chunk:
break
fd.write(chunk)

elapsed = round(time.time() - start)
print('end: {}: {}s'.format(filename, elapsed))

def download_many(url_list):
loop = asyncio.get_event_loop() #5
to_do = [download(url) for url in url_list] #6
wait_coro = asyncio.wait(to_do) #7
res, _ = loop.run_until_complete(wait_coro) #8
loop.close()
return len(res) #9

def main(download_many):
start = time.time() #10
count = download_many(url_list)
elapsed = time.time() - start
msg = '\n{} files downloaded in {:.2f}s'
print(msg.format(count, elapsed))

if __name__ == "__main__":
main(download_many)

1.各コルーチンの処理時間を計測します。

2.chunk_sizeは各リクエスト処理におけるレスポンスデータのバイト列の読み込み行数を指定しています。レスポンスデータは一度メモリ空間に展開されるのでメモリサイズを超えないような読み込み量に設定しなければなりません。あらかじめダウンロードファイルのサイズをみて調整するようですがあまり多くは言及されていません。数GB以上のファイルであれば値を小さく取るようにして下さい。

3.async with~で各urlごとにセッションの開始・リクエスト処理を実行しています。timeoutは指定しなければ300sで、None or 0 に設定するとタイムアウトの監視を無視します。

4.chunk_sizeごとに取得してきたバイト列を順次保存していきます。

5.イベントループ実装への参照を取得します。

6.asyncで修飾した関数はジェネレータオブジェクトです。各urlごとにジェネレータオブジェクトのリストを作成します。

7.渡されたすべてのコルーチンが完了したときに完了するコルーチンです。6のコルーチンリストをひとつのコルーチンとしてloop.run_until_complete(...)に渡します。

8.引数のコルーチンを駆動します。wait_coroが完了まで実行されると、2要素のタプルが返ってきます。1番目には完了したもの、二番目には未完のものがそれぞれ収容されています。

9.完了したコルーチンの数を返します。

10.非同期処理全体の処理時間を計測します。


おわりに

requestsでひとつひとつダウンロードするよりこっちのほうが早くていいですね!