この数日でGPUを使ったウェブクローラーの開発をしていました。書いた言語はPython 3で、動作環境はUbuntu 20.04 + CUDA 11.4です。
ChatGPT(GPT-4版)を使って数時間で書きました。このバージョンになるまでに10バージョン以上作りました。
このスクリプトはBeautiful Soupを使ってWebサイトをクロールして、BERTを使ってtokenのキーワードをJSON化した後、CSVに出力します。
一応非同期HTTP/HTTPSリクエストなどで高速処理化しています。後工夫点としては、tranformersのBertTokenizerとBertForTokenClassificationはGPUだけでtoken(キーワード)の処理をするのは割と遅かったので、GPU + CPUのハイブリッド構成になっている所です。
使用しているライブラリはBeautiful Soup (bs4)、transformers, torch, pandas, json, asyncio, asiohttp, aiofiles, async_timeoutです。他にもこのスクリプトを開発する際、Poolでマルチプロセスも使ったりしていたのですが、どうやらこのスクリプトが一番速いようです。今度、以前のバージョンでも試してどのくらい速いのか試してみようと思います。
ソースコードはGitHubにもアップロードしました。
https://github.com/stingraze/web-crawler-gpu/
7/4/2023更新:下にリンクを再帰的に辿るウェブクローラーのコードもアップロードしました。
7/5/2023更新:下にCUDA 11.4、GeForce RTX 2080Ti、Core i5第10世代 12スレッドCPU環境下のデモ動画をアップロードしました。
7/9/2023更新:GitHubにasync fetch タイムアウトとURL除外リストを実装したバージョンをアップロードしました。https://github.com/stingraze/web-crawler-gpu/blob/main/gpu-crawler-recursive11.py
#(C)Tsubasa Kato - Inspire Search Corporation 2023/6/26
#Created with the help of ChatGPT (GPT-4)
import requests
from bs4 import BeautifulSoup
from transformers import BertTokenizer, BertForTokenClassification
import torch
import pandas as pd
import json
import asyncio
import aiohttp
import aiofiles
import async_timeout
# Load pre-trained BERT tokenizer and model for token classification
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
model = BertForTokenClassification.from_pretrained('dbmdz/bert-large-cased-finetuned-conll03-english')
# Check if a GPU is available and if not, use a CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Move the model to GPU if available
model = model.to(device)
# Define the labels
labels = ['O', 'B-MISC', 'I-MISC', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC']
# Function to fetch webpage and process text
async def crawl_and_process(session, url):
word_labels = []
try:
print("Async fetch:" + url)
# Specify a timeout of 10 seconds for the request
async with async_timeout.timeout(10):
async with session.get(url) as response:
response.raise_for_status()
soup = BeautifulSoup(await response.text(), 'html.parser')
text = soup.get_text()
#Sleep for 1 second
await asyncio.sleep(1)
# Tokenize the text and move to GPU
tokens = tokenizer(text, return_tensors='pt', truncation=True, padding='max_length', max_length=128)
tokens = tokens.to(device)
# Predict all tokens
print("Predicting tokens" + url)
with torch.no_grad():
predictions = model(tokens['input_ids'])
predicted_index = torch.argmax(predictions[0], dim=2)
predicted_index = predicted_index.to('cpu')
except Exception as e:
print(f"Failed to fetch and process {url}. Error: {e}")
return (url, word_labels)
current_word, current_label = "", "O"
for token, prediction in zip(tokens['input_ids'][0], predicted_index[0]):
decoded_token = tokenizer.decode([token.item()]).strip()
if decoded_token.startswith("##"):
# This token is a subtoken of a larger word
current_word += decoded_token[2:]
else:
# This token is a new word; save the old word (if it's not an 'O' entity)
if current_label != 'O':
word_labels.append({current_word: current_label})
current_word = decoded_token
current_label = labels[prediction]
# Save the last word (if it's not an 'O' entity)
if current_label != 'O':
word_labels.append({current_word: current_label})
return (url, word_labels)
# Create a function to run the asyncio event loop
async def main(urls):
# Create a session
async with aiohttp.ClientSession() as session:
tasks = []
# Assign tasks
for url in urls:
tasks.append(crawl_and_process(session, url))
# Run tasks and gather results
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Read list of URLs from text file
with open('urls.txt', 'r') as f:
urls = [line.strip() for line in f]
# Run the asyncio event loop
results = asyncio.run(main(urls))
# Process the results and save them to a CSV file
df = pd.DataFrame(columns=['URL', 'Words'])
for res in results:
url, word_labels = res
df_temp = pd.DataFrame({'URL': [url], 'Words': [json.dumps(word_labels)]})
df = pd.concat([df, df_temp], ignore_index=True)
df.to_csv('crawl_results.csv', index=False)
再帰的 GPUウェブクローラー(recursive GPU web crawler)
#(C)Tsubasa Kato 2023
# Created with the help of ChatGPT (GPT-3.5 & GPT-4)
import requests
from bs4 import BeautifulSoup
from transformers import BertTokenizer, BertForTokenClassification
import torch
import pandas as pd
import json
import asyncio
import aiohttp
from urllib.parse import urljoin
import signal
import atexit
visited_urls = set()
results = [] # Define the results list
append_interval = 10 # Append to CSV every 10 sites
csv_filename = 'crawl_results.csv' # CSV file name
# Function to fetch webpage and process text
async def crawl_and_process(session, url, depth=0):
word_labels = []
try:
# Pause for 1 second
await asyncio.sleep(1)
print("Async fetch:" + url)
# Fetch URL with a timeout of 10 seconds
async with session.get(url) as response:
response.raise_for_status()
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
text = soup.get_text()
# Tokenize the text and move to GPU
tokens = tokenizer(text, return_tensors='pt', truncation=True, padding='max_length', max_length=128)
tokens = tokens.to(device)
# Predict all tokens with a timeout of 10 seconds
signal.alarm(10)
print("Predicting tokens for " + url)
with torch.no_grad():
predictions = model(tokens['input_ids'])
signal.alarm(0) # Reset the alarm
predicted_index = torch.argmax(predictions[0], dim=2)
predicted_index = predicted_index.to('cpu')
current_word, current_label = "", "O"
for token, prediction in zip(tokens['input_ids'][0], predicted_index[0]):
decoded_token = tokenizer.decode([token.item()]).strip()
if decoded_token.startswith("##"):
# This token is a subtoken of a larger word
current_word += decoded_token[2:]
else:
# This token is a new word; save the old word (if it's not an 'O' entity)
if current_label != 'O':
word_labels.append({current_word: current_label})
current_word = decoded_token
current_label = labels[prediction]
# Save the last word (if it's not an 'O' entity)
if current_label != 'O':
word_labels.append({current_word: current_label})
results.append((url, word_labels)) # Append results
# Extract all links if depth is less than or equal to 1
if depth <= 1:
for link in soup.find_all('a'):
new_url = link.get('href')
if new_url:
new_url = urljoin(url, new_url)
if new_url not in visited_urls:
visited_urls.add(new_url)
await crawl_and_process(session, new_url, depth + 1) # Recursive call
except Exception as e:
print(f"Failed to fetch and process {url}. Error: {e}")
# Create a function to run the asyncio event loop
async def main(seed_urls):
# Create a session
async with aiohttp.ClientSession() as session:
# Assign tasks
tasks = [crawl_and_process(session, url) for url in seed_urls]
await asyncio.gather(*tasks) # Await all tasks
# Load pre-trained BERT tokenizer and model for token classification
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
model = BertForTokenClassification.from_pretrained('dbmdz/bert-large-cased-finetuned-conll03-english')
# Check if a GPU is available and if not, use a CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Move the model to GPU if available
model = model.to(device)
# Define the labels
labels = ['O', 'B-MISC', 'I-MISC', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC']
# Read list of URLs from text file
with open('seeds.txt', 'r') as f:
seed_urls = [line.strip() for line in f]
# Register the cleanup function to handle saving the data
@atexit.register
def save_data_on_exit():
# Process the results and save them to a CSV file
df = pd.DataFrame(columns=['URL', 'Words'])
for res in results:
url, word_labels = res
df_temp = pd.DataFrame({'URL': [url], 'Words': [json.dumps(word_labels)]})
df = pd.concat([df, df_temp], ignore_index=True)
# Save the data to the CSV file
df.to_csv(csv_filename, index=False)
print(f"Data saved to {csv_filename}")
# Run the asyncio event loop
asyncio.run(main(seed_urls))
GeForce RTX 2080Ti NVIDIA CUDAで動作している動画:
M1 Pro搭載のMacbook Proで動作している動画:
加藤翼
株式会社インスパイアサーチ
代表取締役
https://www.inspiresarch.io
LinkedIn: https://www.linkedin.com/in/tsubasakato/
Twitter: https://twitter.com/_stingraze