【PyCharm5.0新機能】async/awaitのスレッド実行状況を可視化してデバッグする

  • 25
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

2015/11/23にPyCharm5.0がリリースされました。リリースノートにはPython3.5サポートと非同期実行の可視化機能などが記載されています。非同期実行の可視化機能が便利そうだったので早速使ってみた記事です。

Thread Concurrency Visualization(非同期実行の可視化機能)

以前書いたasync/awaitの記事を試しにPyCharmで実行して可視化してみた。
pyc5.gif

今回作るモノ

Wikipediaの13記事を同時にダウンロードしてファイルに保存するプログラム
スクリーンショット 2015-11-26 17.52.22.png

改善前のプログラム

ざっくりざっくりで書いてみました。

async_web.py
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import aiohttp
import asyncio


async def download_file(title, url):
    """
    urlをダウンロードしてファイルに保存
    :param title: str
    :param url: str
    """
    local_filename = title + ".txt"
    with aiohttp.ClientSession() as client:
        async with client.get(url) as resp:
            assert resp.status == 200
            data = await resp.text()
            await save_file(local_filename, data)
    return local_filename

async def save_file(filename, text):
    """
    textをfileに保存する
    :param filename: str
    :param text: str
    """
    path = "./data/{}".format(filename)
    f = open(path, 'w')
    f.write(text)
    f.close()


async def task_download(url, title):
    await download_file(url, title)


urls = [
     ['ヤクルト', 'https://ja.wikipedia.org/wiki/%E6%9D%B1%E4%BA%AC%E3%83%A4%E3%82%AF%E3%83%AB%E3%83%88%E3%82%B9%E3%83%AF%E3%83%AD%E3%83%BC%E3%82%BA'],
     ['巨人', 'https://ja.wikipedia.org/wiki/%E8%AA%AD%E5%A3%B2%E3%82%B8%E3%83%A3%E3%82%A4%E3%82%A2%E3%83%B3%E3%83%84'],
     ['阪神', 'https://ja.wikipedia.org/wiki/%E9%98%AA%E7%A5%9E%E3%82%BF%E3%82%A4%E3%82%AC%E3%83%BC%E3%82%B9'],
     ['広島', 'https://ja.wikipedia.org/wiki/%E5%BA%83%E5%B3%B6%E6%9D%B1%E6%B4%8B%E3%82%AB%E3%83%BC%E3%83%97'],
     ['中日', 'https://ja.wikipedia.org/wiki/%E4%B8%AD%E6%97%A5%E3%83%89%E3%83%A9%E3%82%B4%E3%83%B3%E3%82%BA'],
     ['横浜', 'https://ja.wikipedia.org/wiki/%E6%A8%AA%E6%B5%9CDeNA%E3%83%99%E3%82%A4%E3%82%B9%E3%82%BF%E3%83%BC%E3%82%BA'],
     ['ソフバン', 'https://ja.wikipedia.org/wiki/%E7%A6%8F%E5%B2%A1%E3%82%BD%E3%83%95%E3%83%88%E3%83%90%E3%83%B3%E3%82%AF%E3%83%9B%E3%83%BC%E3%82%AF%E3%82%B9'],
     ['日ハム', 'https://ja.wikipedia.org/wiki/%E5%8C%97%E6%B5%B7%E9%81%93%E6%97%A5%E6%9C%AC%E3%83%8F%E3%83%A0%E3%83%95%E3%82%A1%E3%82%A4%E3%82%BF%E3%83%BC%E3%82%BA'],
     ['ロッテ', 'https://ja.wikipedia.org/wiki/%E5%8D%83%E8%91%89%E3%83%AD%E3%83%83%E3%83%86%E3%83%9E%E3%83%AA%E3%83%BC%E3%83%B3%E3%82%BA'],
     ['西武', 'https://ja.wikipedia.org/wiki/%E5%9F%BC%E7%8E%89%E8%A5%BF%E6%AD%A6%E3%83%A9%E3%82%A4%E3%82%AA%E3%83%B3%E3%82%BA'],
     ['オリックス', 'https://ja.wikipedia.org/wiki/%E3%82%AA%E3%83%AA%E3%83%83%E3%82%AF%E3%82%B9%E3%83%BB%E3%83%90%E3%83%95%E3%82%A1%E3%83%AD%E3%83%BC%E3%82%BA'],
     ['楽天', 'https://ja.wikipedia.org/wiki/%E6%9D%B1%E5%8C%97%E6%A5%BD%E5%A4%A9%E3%82%B4%E3%83%BC%E3%83%AB%E3%83%87%E3%83%B3%E3%82%A4%E3%83%BC%E3%82%B0%E3%83%AB%E3%82%B9'],
     ['サッカー日本代表', 'https://ja.wikipedia.org/wiki/%E3%82%B5%E3%83%83%E3%82%AB%E3%83%BC%E6%97%A5%E6%9C%AC%E4%BB%A3%E8%A1%A8'],
]


loop = asyncio.get_event_loop()
tasks = asyncio.wait([task_download(title, url) for title, url in urls])
loop.run_until_complete(tasks)
loop.close()

PyCharmで可視化する

1. タブを右クリックした『Concurrency Diaglam』を押します

スクリーンショット 2015-11-26 17.35.50.png

2. ダイアグラムが表示されます

■ Threading graph
pc5_before.gif

■ Asyncio graph
pc5_before_graph.gif

3. 遅い原因を考える

Asyncio graphを見るとTask15-27部分はaiohttpでHTTP通信している箇所なのですが、上手くTaskを切り替えて動作しています。しかしその後の処理で時間が掛かっていることが判ります。ディスクに書き込んでいるsave_file関数がブロッキングIOなので書き込み待ちで時間が掛かったと考えられます。つまりsave_file関数をnon-blocking IO で書き換えれば改善しそうです。

改善後のプログラム

ディスクに書き込んでいるsave_file関数をnon-blocking IO で書き換えたかったのですが、やり方が判らなかったので昔ながらのマルチプロセス処理で書き換えてみました。

async_web_mp.py
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import random
import aiohttp
import asyncio
import multiprocessing as mp


async def task_download(i, title, url):
    local_filename = title + "_mp.txt"
    with aiohttp.ClientSession() as client:
        async with client.get(url) as resp:
            assert resp.status == 200
            # print(await resp.text())
            data = await resp.text()
            print(i, title, url)
    process = mp.Process(target=save_file, args=(local_filename, data))
    process.start()
    return local_filename


def save_file(filename, text):
    """
    textをfileに保存する
    :param filename: str
    :param text: str
    """
    path = "./data/{}".format(filename)
    f = open(path, 'w')
    f.write(text)
    f.close()

n = "?{}".format(str(random.randint(1, 100000)))  # キャッシュ化を防ぐ
urls = [
     ['ヤクルト', 'https://ja.wikipedia.org/wiki/%E6%9D%B1%E4%BA%AC%E3%83%A4%E3%82%AF%E3%83%AB%E3%83%88%E3%82%B9%E3%83%AF%E3%83%AD%E3%83%BC%E3%82%BA'],
     ['巨人', 'https://ja.wikipedia.org/wiki/%E8%AA%AD%E5%A3%B2%E3%82%B8%E3%83%A3%E3%82%A4%E3%82%A2%E3%83%B3%E3%83%84'],
     ['阪神', 'https://ja.wikipedia.org/wiki/%E9%98%AA%E7%A5%9E%E3%82%BF%E3%82%A4%E3%82%AC%E3%83%BC%E3%82%B9'],
     ['広島', 'https://ja.wikipedia.org/wiki/%E5%BA%83%E5%B3%B6%E6%9D%B1%E6%B4%8B%E3%82%AB%E3%83%BC%E3%83%97'],
     ['中日', 'https://ja.wikipedia.org/wiki/%E4%B8%AD%E6%97%A5%E3%83%89%E3%83%A9%E3%82%B4%E3%83%B3%E3%82%BA'],
     ['横浜', 'https://ja.wikipedia.org/wiki/%E6%A8%AA%E6%B5%9CDeNA%E3%83%99%E3%82%A4%E3%82%B9%E3%82%BF%E3%83%BC%E3%82%BA'],
     ['ソフバン', 'https://ja.wikipedia.org/wiki/%E7%A6%8F%E5%B2%A1%E3%82%BD%E3%83%95%E3%83%88%E3%83%90%E3%83%B3%E3%82%AF%E3%83%9B%E3%83%BC%E3%82%AF%E3%82%B9'],
     ['日ハム', 'https://ja.wikipedia.org/wiki/%E5%8C%97%E6%B5%B7%E9%81%93%E6%97%A5%E6%9C%AC%E3%83%8F%E3%83%A0%E3%83%95%E3%82%A1%E3%82%A4%E3%82%BF%E3%83%BC%E3%82%BA'],
     ['ロッテ', 'https://ja.wikipedia.org/wiki/%E5%8D%83%E8%91%89%E3%83%AD%E3%83%83%E3%83%86%E3%83%9E%E3%83%AA%E3%83%BC%E3%83%B3%E3%82%BA'],
     ['西武', 'https://ja.wikipedia.org/wiki/%E5%9F%BC%E7%8E%89%E8%A5%BF%E6%AD%A6%E3%83%A9%E3%82%A4%E3%82%AA%E3%83%B3%E3%82%BA'],
     ['オリックス', 'https://ja.wikipedia.org/wiki/%E3%82%AA%E3%83%AA%E3%83%83%E3%82%AF%E3%82%B9%E3%83%BB%E3%83%90%E3%83%95%E3%82%A1%E3%83%AD%E3%83%BC%E3%82%BA'],
     ['楽天', 'https://ja.wikipedia.org/wiki/%E6%9D%B1%E5%8C%97%E6%A5%BD%E5%A4%A9%E3%82%B4%E3%83%BC%E3%83%AB%E3%83%87%E3%83%B3%E3%82%A4%E3%83%BC%E3%82%B0%E3%83%AB%E3%82%B9'],
     ['サッカー日本代表', 'https://ja.wikipedia.org/wiki/%E3%82%B5%E3%83%83%E3%82%AB%E3%83%BC%E6%97%A5%E6%9C%AC%E4%BB%A3%E8%A1%A8'],
]


loop = asyncio.get_event_loop()
tasks = asyncio.wait([task_download(i, x[0], x[1] + n) for i, x in enumerate(urls)])
loop.run_until_complete(tasks)
loop.close()

改善後の結果

pc5_after.gif

11.5秒 >> 5.5秒と改善しました。

ご注意ください!

  1. 判りやすいデータを取るためにディスクアクセスが劣悪な環境で実行してデータを取りました。
  2. Task10万個起動して可視化したらPyCharmがフリーズしました...

おまけ:キレイなやつ

jawiki-latest-all-titles-in-ns0
be.gif

async_web_counter.py
# -*- coding: utf-8 -*-
import threading
import time
import urllib


class WikipediaCrawler(object):
    """
    wikipediaのURLを応答するクラス
    """
    PATH = './data/jawiki-latest-all-titles-in-ns0'

    def __init__(self):
        self.lock = threading.Lock()

    def get_url(self):
        f = open(WikipediaCrawler.PATH, 'r')
        for title in f:
            time.sleep(0.5)
            if not self.lock.acquire(timeout=3):
                # ロックを取れないとき
                print('%s: Cannot acquire lock (timed out)' % t.name)
                continue

            # ロックが取れた
            try:
                time.sleep(0.5)
                url = WikipediaCrawler.get_url(title)
                print(url)
                yield url
            finally:
                # ロック解放
                self.lock.release()

    @classmethod
    def get_wikipedia_url(cls, title):
        """
        titleからwikipediaのURLを生成
        """
        _base_url = "https://ja.wikipedia.org/wiki/{}"
        url = _base_url.format(urllib.quote_plus(title))
        return url[:-3]


def worker(crawler):
    t = threading.current_thread()
    for url in crawler.get_url():
        print(url)

threads_count = 3
threads = []
crawler = WikipediaCrawler()

for i in range(threads_count):
    t = threading.Thread(target=worker, args=(crawler,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

参考

What’s New in PyCharm 5
Python 標準ライブラリ探訪 (18) ~ threading.Lock編 ~