1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

GPUを使ったウェブクローラー (CUDA, BERT, PyTorch 使用)

Last updated at Posted at 2023-06-26

この数日でGPUを使ったウェブクローラーの開発をしていました。書いた言語はPython 3で、動作環境はUbuntu 20.04 + CUDA 11.4です。
ChatGPT(GPT-4版)を使って数時間で書きました。このバージョンになるまでに10バージョン以上作りました。 

このスクリプトはBeautiful Soupを使ってWebサイトをクロールして、BERTを使ってtokenのキーワードをJSON化した後、CSVに出力します。

一応非同期HTTP/HTTPSリクエストなどで高速処理化しています。後工夫点としては、tranformersのBertTokenizerとBertForTokenClassificationはGPUだけでtoken(キーワード)の処理をするのは割と遅かったので、GPU + CPUのハイブリッド構成になっている所です。

使用しているライブラリはBeautiful Soup (bs4)、transformers, torch, pandas, json, asyncio, asiohttp, aiofiles, async_timeoutです。他にもこのスクリプトを開発する際、Poolでマルチプロセスも使ったりしていたのですが、どうやらこのスクリプトが一番速いようです。今度、以前のバージョンでも試してどのくらい速いのか試してみようと思います。

ソースコードはGitHubにもアップロードしました。
https://github.com/stingraze/web-crawler-gpu/

7/4/2023更新:下にリンクを再帰的に辿るウェブクローラーのコードもアップロードしました。
7/5/2023更新:下にCUDA 11.4、GeForce RTX 2080Ti、Core i5第10世代 12スレッドCPU環境下のデモ動画をアップロードしました。
7/9/2023更新:GitHubにasync fetch タイムアウトとURL除外リストを実装したバージョンをアップロードしました。https://github.com/stingraze/web-crawler-gpu/blob/main/gpu-crawler-recursive11.py

gpu-webcrawler.py
#(C)Tsubasa Kato - Inspire Search Corporation 2023/6/26
#Created with the help of ChatGPT (GPT-4)
import requests
from bs4 import BeautifulSoup
from transformers import BertTokenizer, BertForTokenClassification
import torch
import pandas as pd
import json
import asyncio
import aiohttp
import aiofiles
import async_timeout

# Load pre-trained BERT tokenizer and model for token classification
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
model = BertForTokenClassification.from_pretrained('dbmdz/bert-large-cased-finetuned-conll03-english')

# Check if a GPU is available and if not, use a CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Move the model to GPU if available
model = model.to(device)

# Define the labels
labels = ['O', 'B-MISC', 'I-MISC', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC']

# Function to fetch webpage and process text
async def crawl_and_process(session, url):
    word_labels = []
    
    try:
        print("Async fetch:" + url)
        # Specify a timeout of 10 seconds for the request
        async with async_timeout.timeout(10):
            async with session.get(url) as response:
                response.raise_for_status()
                soup = BeautifulSoup(await response.text(), 'html.parser')
                text = soup.get_text()
        #Sleep for 1 second
        await asyncio.sleep(1)
        # Tokenize the text and move to GPU
        tokens = tokenizer(text, return_tensors='pt', truncation=True, padding='max_length', max_length=128)
        tokens = tokens.to(device)

        # Predict all tokens
        print("Predicting tokens" + url)
        with torch.no_grad():
            predictions = model(tokens['input_ids'])

        predicted_index = torch.argmax(predictions[0], dim=2)
        predicted_index = predicted_index.to('cpu')

    except Exception as e:
        print(f"Failed to fetch and process {url}. Error: {e}")
        return (url, word_labels)

    current_word, current_label = "", "O"
    for token, prediction in zip(tokens['input_ids'][0], predicted_index[0]):
        decoded_token = tokenizer.decode([token.item()]).strip()

        if decoded_token.startswith("##"):
            # This token is a subtoken of a larger word
            current_word += decoded_token[2:]
        else:
            # This token is a new word; save the old word (if it's not an 'O' entity)
            if current_label != 'O':
                word_labels.append({current_word: current_label})
            current_word = decoded_token
            current_label = labels[prediction]

    # Save the last word (if it's not an 'O' entity)
    if current_label != 'O':
        word_labels.append({current_word: current_label})

    return (url, word_labels)

# Create a function to run the asyncio event loop
async def main(urls):
    # Create a session
    async with aiohttp.ClientSession() as session:
        tasks = []
        # Assign tasks
        for url in urls:
            tasks.append(crawl_and_process(session, url))
        # Run tasks and gather results
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

# Read list of URLs from text file
with open('urls.txt', 'r') as f:
    urls = [line.strip() for line in f]

# Run the asyncio event loop
results = asyncio.run(main(urls))

# Process the results and save them to a CSV file
df = pd.DataFrame(columns=['URL', 'Words'])
for res in results:
    url, word_labels = res
    df_temp = pd.DataFrame({'URL': [url], 'Words': [json.dumps(word_labels)]})
    df = pd.concat([df, df_temp], ignore_index=True)
df.to_csv('crawl_results.csv', index=False)

再帰的 GPUウェブクローラー(recursive GPU web crawler)

recursive-gpu-webcrawler.py
#(C)Tsubasa Kato 2023 
# Created with the help of ChatGPT (GPT-3.5 & GPT-4)
import requests
from bs4 import BeautifulSoup
from transformers import BertTokenizer, BertForTokenClassification
import torch
import pandas as pd
import json
import asyncio
import aiohttp
from urllib.parse import urljoin
import signal
import atexit

visited_urls = set()
results = []  # Define the results list
append_interval = 10  # Append to CSV every 10 sites
csv_filename = 'crawl_results.csv'  # CSV file name

# Function to fetch webpage and process text
async def crawl_and_process(session, url, depth=0):
    word_labels = []

    try:
        # Pause for 1 second
        await asyncio.sleep(1)
        print("Async fetch:" + url)

        # Fetch URL with a timeout of 10 seconds
        async with session.get(url) as response:
            response.raise_for_status()
            html = await response.text()

        soup = BeautifulSoup(html, 'html.parser')

        text = soup.get_text()

        # Tokenize the text and move to GPU
        tokens = tokenizer(text, return_tensors='pt', truncation=True, padding='max_length', max_length=128)
        tokens = tokens.to(device)

        # Predict all tokens with a timeout of 10 seconds
        signal.alarm(10)
        print("Predicting tokens for " + url)
        with torch.no_grad():
            predictions = model(tokens['input_ids'])

        signal.alarm(0)  # Reset the alarm

        predicted_index = torch.argmax(predictions[0], dim=2)
        predicted_index = predicted_index.to('cpu')

        current_word, current_label = "", "O"
        for token, prediction in zip(tokens['input_ids'][0], predicted_index[0]):
            decoded_token = tokenizer.decode([token.item()]).strip()

            if decoded_token.startswith("##"):
                # This token is a subtoken of a larger word
                current_word += decoded_token[2:]
            else:
                # This token is a new word; save the old word (if it's not an 'O' entity)
                if current_label != 'O':
                    word_labels.append({current_word: current_label})
                current_word = decoded_token
                current_label = labels[prediction]

        # Save the last word (if it's not an 'O' entity)
        if current_label != 'O':
            word_labels.append({current_word: current_label})

        results.append((url, word_labels))  # Append results

        # Extract all links if depth is less than or equal to 1
        if depth <= 1:
            for link in soup.find_all('a'):
                new_url = link.get('href')
                if new_url:
                    new_url = urljoin(url, new_url)
                    if new_url not in visited_urls:
                        visited_urls.add(new_url)
                        await crawl_and_process(session, new_url, depth + 1)  # Recursive call

    except Exception as e:
        print(f"Failed to fetch and process {url}. Error: {e}")

# Create a function to run the asyncio event loop
async def main(seed_urls):
    # Create a session
    async with aiohttp.ClientSession() as session:
        # Assign tasks
        tasks = [crawl_and_process(session, url) for url in seed_urls]
        await asyncio.gather(*tasks)  # Await all tasks

# Load pre-trained BERT tokenizer and model for token classification
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
model = BertForTokenClassification.from_pretrained('dbmdz/bert-large-cased-finetuned-conll03-english')

# Check if a GPU is available and if not, use a CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Move the model to GPU if available
model = model.to(device)

# Define the labels
labels = ['O', 'B-MISC', 'I-MISC', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC']

# Read list of URLs from text file
with open('seeds.txt', 'r') as f:
    seed_urls = [line.strip() for line in f]

# Register the cleanup function to handle saving the data
@atexit.register
def save_data_on_exit():
    # Process the results and save them to a CSV file
    df = pd.DataFrame(columns=['URL', 'Words'])
    for res in results:
        url, word_labels = res
        df_temp = pd.DataFrame({'URL': [url], 'Words': [json.dumps(word_labels)]})
        df = pd.concat([df, df_temp], ignore_index=True)

    # Save the data to the CSV file
    df.to_csv(csv_filename, index=False)
    print(f"Data saved to {csv_filename}")

# Run the asyncio event loop
asyncio.run(main(seed_urls))

GeForce RTX 2080Ti NVIDIA CUDAで動作している動画:

M1 Pro搭載のMacbook Proで動作している動画:

加藤翼

株式会社インスパイアサーチ 
代表取締役
https://www.inspiresarch.io

LinkedIn: https://www.linkedin.com/in/tsubasakato/
Twitter: https://twitter.com/_stingraze

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?