0
1

More than 1 year has passed since last update.

Python IOバウンドなスクレイピング処理をマルチスレッドやるときの初心者メモ

Last updated at Posted at 2022-02-15

参考

PythonによるWebスクレイピング
Pythonクローリング&スクレイピング-データ収集・解析のための実践開発ガイド
実践 Selenium WebDriver

はじめに

ぼくちんスクレイピング初心者
もっと良い書き方があったら教えていただきたい
30分ほどで書いたので誤字脱字ご容赦願う

やろうとしていること

以下の処理を別スレッドでやろうとおもう

・URLのリクエスト処理
・パース処理
・DBへのインサート処理

意識したこと

・マルチタスク処理で最も注意しなきゃなのがデータの取り扱いなんだなあ
・複数スレッドで1つのデータにアクセスするときはスレッドをロックしたりリリースしたりするんだなあ
・同じスコープ内にある値(スコープが広い値)を複数のスレッドでコネクリ回すような設計しちゃ駄目なんだなあ

このテストは スレッド安全な構成なわけだからいいんだけどさ、わざわざスレッド・アンセーフなことできないか考えていることころなんだなあ

ええんと、

結論

直列と並列の処理速度を比較したら、あたりまえだけど並列処理のほうがいい感じになった

テスト構成は以下の通りだ

疑似URLリクエスト回数: 10回 インターバル 1秒
疑似パース処理 インターバル 0.5秒
疑似インサート処理 インターバル 0.1秒

テスト結果

直列 16秒
並列 11秒

テストコード

import time
import threading
import datetime
from collections import deque


def show_active_thread():
    print(f'thread数: {threading.active_count()}')


def show_current_thread(text: str):
    print(f'{text} {threading.current_thread().name}')


def url_req(req_count: int):
    for i in range(req_count):
        req_interval = 1.0
        time.sleep(req_interval)
        response = f'response{i}'
        print(f'response: {response}')
        yield response


def parse(response: str):
    parse_interval = 0.5
    time.sleep(parse_interval)
    parsed = ''.join(['parsed_', response])
    print(f'parsed: {parsed}')
    return parsed


def insert(parsed: str):
    insert_interval = 0.1
    time.sleep(insert_interval)
    print(f'inserted: {parsed}')


def siries_jobs(req_count: int):
    begin_t = datetime.datetime.now()
    genr_response = url_req(req_count)
    while True:
        try:
            response = next(genr_response)
            parsed = parse(response)
            insert(parsed)
        except StopIteration:
            break
    print(f'処理時間: {datetime.datetime.now() - begin_t}')


class Concurrency():
    def __init__(self) -> None:
        self.url_values = deque([])
        self.parse_values = deque([])
        self.is_reqesting = False
        self.is_parsing = False

    def url_req_thread(self, req_count: int):
        # urlリクエスト スレッド
        show_current_thread('Begin')
        show_active_thread()
        # 処理
        genr_response = url_req(req_count)
        self.is_reqesting = True
        while True:
            try:
                response = next(genr_response)
                self.url_values.append(response)
            except StopIteration:
                self.is_reqesting = False
                break
        show_current_thread('End')

    def parse_thread(self):
        # パース スレッド
        show_current_thread('Begin')
        show_active_thread()
        self.is_parsing = True
        while True:
            if self.url_values:
                # 処理
                response = self.url_values.popleft()
                parsed = parse(response)
                self.parse_values.append(parsed)
            elif not self.url_values and not self.is_reqesting:
                self.is_parsing = False
                break
        show_current_thread('End')

    def insert_thread(self):
        # インサート スレッド
        show_current_thread('Begin')
        show_active_thread()
        while True:
            if self.parse_values:
                # 処理
                parsed = self.parse_values.popleft()
                insert(parsed)
            elif not self.parse_values and not self.is_parsing:
                break
        show_current_thread('End')

    def jobs(self, req_count: int):
        show_active_thread()
        # レスポンスをレスポンスキューに入れる
        url_req_th = threading.Thread(target=self.url_req_thread, args=(req_count,), name='url_req_thread')
        # レスポンスキューになにかあれば取り出してパースする。パースしたら、それをパースキューに入れる
        parse_th = threading.Thread(target=self.parse_thread, args=(), name='parse_thread')
        # パースキューになにかあれば取り出してインサートする
        insert_th = threading.Thread(target=self.insert_thread, args=(), name='insert_thread')

        begin_t = datetime.datetime.now()

        url_req_th.start()
        parse_th.start()
        insert_th.start()

        url_req_th.join()
        parse_th.join()
        insert_th.join()

        show_active_thread()

        print(f'処理時間: {datetime.datetime.now() - begin_t}')


if __name__ == '__main__':
    req_count = 10
    # 並行処理
    Concurrency().jobs(req_count)
    # 直列処理
    siries_jobs(req_count)

response: response0
parsed: parsed_response0
inserted: parsed_response0
response: response1
parsed: parsed_response1
inserted: parsed_response1
response: response2
parsed: parsed_response2
inserted: parsed_response2
response: response3
parsed: parsed_response3
inserted: parsed_response3
response: response4
parsed: parsed_response4
inserted: parsed_response4
response: response5
parsed: parsed_response5
inserted: parsed_response5
response: response6
parsed: parsed_response6
inserted: parsed_response6
response: response7
parsed: parsed_response7
inserted: parsed_response7
response: response8
parsed: parsed_response8
inserted: parsed_response8
response: response9
End url_req_thread
parsed: parsed_response9
End parse_thread
inserted: parsed_response9
End insert_thread
thread数: 6
処理時間: 0:00:11.772894
response: response0
parsed: parsed_response0
inserted: parsed_response0
response: response1
parsed: parsed_response1
inserted: parsed_response1
response: response2
parsed: parsed_response2
inserted: parsed_response2
response: response3
parsed: parsed_response3
inserted: parsed_response3
response: response4
parsed: parsed_response4
inserted: parsed_response4
response: response5
parsed: parsed_response5
inserted: parsed_response5
response: response6
parsed: parsed_response6
inserted: parsed_response6
response: response7
parsed: parsed_response7
inserted: parsed_response7
response: response8
parsed: parsed_response8
inserted: parsed_response8
response: response9
parsed: parsed_response9
inserted: parsed_response9
処理時間: 0:00:16.283500
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1