参考
PythonによるWebスクレイピング
Pythonクローリング&スクレイピング-データ収集・解析のための実践開発ガイド
実践 Selenium WebDriver
はじめに
ぼくちんスクレイピング初心者
もっと良い書き方があったら教えていただきたい
30分ほどで書いたので誤字脱字ご容赦願う
やろうとしていること
以下の処理を別スレッドでやろうとおもう
・URLのリクエスト処理
・パース処理
・DBへのインサート処理
意識したこと
・マルチタスク処理で最も注意しなきゃなのがデータの取り扱いなんだなあ
・複数スレッドで1つのデータにアクセスするときはスレッドをロックしたりリリースしたりするんだなあ
・同じスコープ内にある値(スコープが広い値)を複数のスレッドでコネクリ回すような設計しちゃ駄目なんだなあ
・
このテストは スレッド安全な構成なわけだからいいんだけどさ、わざわざスレッド・アンセーフなことできないか考えていることころなんだなあ
ええんと、
結論
直列と並列の処理速度を比較したら、あたりまえだけど並列処理のほうがいい感じになった
テスト構成は以下の通りだ
疑似URLリクエスト回数: 10回 インターバル 1秒
疑似パース処理 インターバル 0.5秒
疑似インサート処理 インターバル 0.1秒
テスト結果
直列 16秒
並列 11秒
テストコード
import time
import threading
import datetime
from collections import deque
def show_active_thread():
print(f'thread数: {threading.active_count()}')
def show_current_thread(text: str):
print(f'{text} {threading.current_thread().name}')
def url_req(req_count: int):
for i in range(req_count):
req_interval = 1.0
time.sleep(req_interval)
response = f'response{i}'
print(f'response: {response}')
yield response
def parse(response: str):
parse_interval = 0.5
time.sleep(parse_interval)
parsed = ''.join(['parsed_', response])
print(f'parsed: {parsed}')
return parsed
def insert(parsed: str):
insert_interval = 0.1
time.sleep(insert_interval)
print(f'inserted: {parsed}')
def siries_jobs(req_count: int):
begin_t = datetime.datetime.now()
genr_response = url_req(req_count)
while True:
try:
response = next(genr_response)
parsed = parse(response)
insert(parsed)
except StopIteration:
break
print(f'処理時間: {datetime.datetime.now() - begin_t}')
class Concurrency():
def __init__(self) -> None:
self.url_values = deque([])
self.parse_values = deque([])
self.is_reqesting = False
self.is_parsing = False
def url_req_thread(self, req_count: int):
# urlリクエスト スレッド
show_current_thread('Begin')
show_active_thread()
# 処理
genr_response = url_req(req_count)
self.is_reqesting = True
while True:
try:
response = next(genr_response)
self.url_values.append(response)
except StopIteration:
self.is_reqesting = False
break
show_current_thread('End')
def parse_thread(self):
# パース スレッド
show_current_thread('Begin')
show_active_thread()
self.is_parsing = True
while True:
if self.url_values:
# 処理
response = self.url_values.popleft()
parsed = parse(response)
self.parse_values.append(parsed)
elif not self.url_values and not self.is_reqesting:
self.is_parsing = False
break
show_current_thread('End')
def insert_thread(self):
# インサート スレッド
show_current_thread('Begin')
show_active_thread()
while True:
if self.parse_values:
# 処理
parsed = self.parse_values.popleft()
insert(parsed)
elif not self.parse_values and not self.is_parsing:
break
show_current_thread('End')
def jobs(self, req_count: int):
show_active_thread()
# レスポンスをレスポンスキューに入れる
url_req_th = threading.Thread(target=self.url_req_thread, args=(req_count,), name='url_req_thread')
# レスポンスキューになにかあれば取り出してパースする。パースしたら、それをパースキューに入れる
parse_th = threading.Thread(target=self.parse_thread, args=(), name='parse_thread')
# パースキューになにかあれば取り出してインサートする
insert_th = threading.Thread(target=self.insert_thread, args=(), name='insert_thread')
begin_t = datetime.datetime.now()
url_req_th.start()
parse_th.start()
insert_th.start()
url_req_th.join()
parse_th.join()
insert_th.join()
show_active_thread()
print(f'処理時間: {datetime.datetime.now() - begin_t}')
if __name__ == '__main__':
req_count = 10
# 並行処理
Concurrency().jobs(req_count)
# 直列処理
siries_jobs(req_count)
response: response0
parsed: parsed_response0
inserted: parsed_response0
response: response1
parsed: parsed_response1
inserted: parsed_response1
response: response2
parsed: parsed_response2
inserted: parsed_response2
response: response3
parsed: parsed_response3
inserted: parsed_response3
response: response4
parsed: parsed_response4
inserted: parsed_response4
response: response5
parsed: parsed_response5
inserted: parsed_response5
response: response6
parsed: parsed_response6
inserted: parsed_response6
response: response7
parsed: parsed_response7
inserted: parsed_response7
response: response8
parsed: parsed_response8
inserted: parsed_response8
response: response9
End url_req_thread
parsed: parsed_response9
End parse_thread
inserted: parsed_response9
End insert_thread
thread数: 6
処理時間: 0:00:11.772894
response: response0
parsed: parsed_response0
inserted: parsed_response0
response: response1
parsed: parsed_response1
inserted: parsed_response1
response: response2
parsed: parsed_response2
inserted: parsed_response2
response: response3
parsed: parsed_response3
inserted: parsed_response3
response: response4
parsed: parsed_response4
inserted: parsed_response4
response: response5
parsed: parsed_response5
inserted: parsed_response5
response: response6
parsed: parsed_response6
inserted: parsed_response6
response: response7
parsed: parsed_response7
inserted: parsed_response7
response: response8
parsed: parsed_response8
inserted: parsed_response8
response: response9
parsed: parsed_response9
inserted: parsed_response9
処理時間: 0:00:16.283500