Edited at

まだ await 使ってないの?

More than 1 year has passed since last update.


非同期プログラミングが熱い

pythonで並列処理を実現するにはすごくざっくりいって3つの選択肢がある。


  • マルチスレッド方式 (モジュール名: concurrent.futures, threading)

  • マルチプロセス方式 (モジュール名: concurrent.futures, multiprocessing, subprocess)

  • 非同期I/O方式 (モジュール名: asyncio)

マルチスレッド方式はスレッドセーフにプログラム作るのが本当に(本当に!)難しいし、マルチプロセス方式はデータの共有が煩雑。これに比べて非同期I/O方式はめちゃくちゃ簡単に並列処理が書けるので CPU処理が主体でないような処理では、非同期I/O方式による実装が人気を集めている( 非同期I/O方式では1CPU しか使えないのは注意)。非同期I/O方式の興隆は python に限ったことではなく、Javascript などほかの言語でも同様(参考: https://qiita.com/yosgspec/items/abbbc1a0f3d99f51dfb4 )。

Python界ではすごい前から Twisted とかの外部モジュールを利用することで非同期I/O方式を利用した実装が可能だったんだけど、 Python 3.4 くらい?から標準モジュールで非同期I/O方式 の実装ができるようになって、3.5ではpythonの文法も拡張された ( async def ... とか await とか)。これによって本当に簡単に並列処理をかけるようになった。


具体例: はてブをクロール

はてなブックマークは、ホットエントリのRSSフィードを提供するとともに、7つのカテゴリごとのRSSフィードを提供している。この8つのRSSフィードをクロールして、人気順に出力するプログラムを書いてみる。

まずは並列処理しない場合はこんな感じ↓ 4000ms程度時間がかかる。

import feedparser

entries = {}
urls = ["http://b.hatena.ne.jp/hotentry.rss", ]
for cat in ["social", "economics", "life", "knowledge", "it", "entertainment","fun",]:
urls.append("http://b.hatena.ne.jp/hotentry/{}.rss".format(cat))

for url in urls:
root = feedparser.parse(url)
for e in root.entries:
entries[e.link] = int(e['hatena_bookmarkcount'])

ranking = list(entries.items())
ranking.sort(key=lambda x:x[1], reverse=True)

for x in ranking[:3]:
print (x[1],x[0])

非同期I/O方式だとこんな感じ↓ 900ms 程度で終わった

import asyncio, aiohttp

import feedparser

entries = {}
urls = ["http://b.hatena.ne.jp/hotentry.rss", ]
for cat in ["social", "economics", "life", "knowledge", "it", "entertainment","fun",]:
urls.append("http://b.hatena.ne.jp/hotentry/{}.rss".format(cat))

async def get(session, feed):
async with session.get(feed) as resp:
root = feedparser.parse(await resp.text())
for e in root.entries:
entries[e.link] = int(e['hatena_bookmarkcount'])

async def main():
async with aiohttp.ClientSession() as session:
cols = [get(session,x) for x in urls]
await asyncio.gather(*cols)

asyncio.get_event_loop().run_until_complete(main())
ranking = list(entries.items())
ranking.sort(key=lambda x:x[1], reverse=True)

for x in ranking[:3]:
print (x[1],x[0])

スレッド方式だとこんな感じ↓ これは800ms程度

import concurrent.futures

import feedparser

entries = {}
urls = ["http://b.hatena.ne.jp/hotentry.rss", ]
for cat in ["social", "economics", "life", "knowledge", "it", "entertainment","fun",]:
urls.append("http://b.hatena.ne.jp/hotentry/{}.rss".format(cat))

def process(url):
root = feedparser.parse(url)
for e in root.entries:
entries[e.link] = int(e['hatena_bookmarkcount'])

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as ex:
ex.map(process, urls)

ranking = list(entries.items())
ranking.sort(key=lambda x:x[1], reverse=True)
for x in ranking[:3]:
print (x[1],x[0])


スレッドセーフ性について

pythonは標準ライブラリの関数ですらスレッドセーフ性がドキュメント化されていない。例えば urllib.request.urlopen() なんかもスレッドセーフではない?みたいな議論もある(https://stackoverflow.com/questions/5825151/are-urllib2-and-httplib-thread-safe とは言えpythonの公式ドキュメントですらマルチスレッド環境で利用する例も載っているが、、、)。今回利用した feedparser モジュールもドキュメント化されていないので、スレッドセーフかは保障できない。一般にある操作がスレッドセーフか否かは、その操作に関連する各種リソースがどのように使われているかを pvm のバイトコードレイヤーまで想像して見なければいけないので、マルチスレッド方式の実装は本当に注意しなければいけない。

このような懸念があるため、マルチスレッド方式で書くのは個人的にものすごく抵抗感がある。async await を使った非同期I/O方式で実装できる場合は、こっちで実現した方が精神衛生上よいと思うが、どうおもう?