0
0

スクレイピングでのデータ収集を並列処理で高速化する!

Last updated at Posted at 2024-06-11

はじめに

スクレイピングを用いて、データ収集するコードを作成する機会があり
(これも別途記載できればと思います…)
そのデータ収集を、並列処理を用いて高速化することを試みました。
今回は、「スクレイピングでの認証に関する問題」と「並列処理について調べたこと」の2点をまとめていきます。
どちらの内容とも初学者ですので、誤植などあればアドバイスいただけますと幸いです。

スクレイピングでの認証に関する問題

データ収集についてですが、
SSLエラーに対する対応まとめ を参考に修正しました。

認証エラーが発生したコード
get_data
import pandas as pd
import requests, ssl, urllib3
from retrying import retry

df = pd.read_excel("ここは該当ファイル名を入れてみて")

@retry(stop_max_attempt_number=5, wait_fixed=2000)
def get_data_reverse('引数も良い感じに'):
    url = '〇〇適宜必要なものを〇〇'  # APIエンドポイント
    params = {
        'このあたりも適宜'
    }
    response = requests.get(url, params=params, timeout=(10,10)) 
    print(params,response.status_code) #テストするときにあると便利
    if response.status_code == 200:  # HTTPステータスコードが200(成功)なら
        data = response.json()  # レスポンスをJSONとして解析
        return data
    else:
        return None

試したこと

  • pipやその他のアップデート
pip install --upgrade pip
pip install --upgrade requests #これはimportした分全て実施

原因

他の方のPCでも同一のpythonを実行してもらったらできていたのですが、自分のPCではエラーしていたりで苦労していたのですが、
もしかしたら、協力してくれた彼と自分のPCの差はセキュリティーソフトの有無とかだったのでそれかもしれません。

修正点

修正したコード
get_data
import pandas as pd
import requests, ssl, urllib3
from retrying import retry

df = pd.read_excel("ここは該当ファイル名を入れてみて")

class CustomHttpAdapter (requests.adapters.HTTPAdapter):
    def __init__(self, ssl_context=None, **kwargs):
        self.ssl_context = ssl_context
        super().__init__(**kwargs)
 
    def init_poolmanager(self, connections, maxsize, block=False):
        self.poolmanager = urllib3.poolmanager.PoolManager(
            num_pools=connections, maxsize=maxsize,
            block=block, ssl_context=self.ssl_context)

@retry(stop_max_attempt_number=5, wait_fixed=2000)
def get_data_reverse('引数も良い感じに'):
    url = '〇〇適宜必要なものを〇〇'  # APIエンドポイント
    params = {
        'このあたりも適宜'
    }
    session = requests.session()
    ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
    ctx.options |= 0x4
    session.mount('https://', CustomHttpAdapter(ctx))
    response = session.get(url, params=params, timeout=(10,10))
    #response = requests.get(url, params=params, timeout=(10,10)) 
    
    print(params,response.status_code) 
    if response.status_code == 200:  # HTTPステータスコードが200(成功)なら
        data = response.json()  # レスポンスをJSONとして解析
        return data
    else:
        return None

上記のように、class以下を追加して、responseを修正すると、うまくいきました。

並列処理について

並列処理まとめ
今回は、ThreadPoolExecutorを用いて、実装しました。
ThreadPoolExecutornの特徴は下記に記載。(詳細はリンク先から)

スレッドを使って並列タスクを実行します。
ネットワークアクセスなどCPUに負荷がかからない処理の並列実行に適しています。

並列処理のコード
get_data
# 並列処理の実行
with concurrent.futures.ThreadPoolExecutor() as executor:
    future_to_index_row = {executor.submit(process_row, index, row): (index, row) for index, row in df[start_index-1:].iterrows()}
    # 非同期処理の結果を取得
    for future in concurrent.futures.as_completed(future_to_index_row):
        index, row = future_to_index_row[future]
        try:
            index, polygon_data = future.result()  # 処理結果を取得
            df.at[index, "polygon"] = polygon_data  # DataFrameの対応する行に結果を書き込む
            print(index,"行目の処理が完了")
        except Exception as exc:
            print(f'{index} generated an exception: {exc}')  # エラーが発生した場合、インデックスとエラー内容を表示
# 処理結果の表示
df.to_csv('output.csv', index=False, encoding='utf-8-sig')  # CSVに書き出し

副産物(エンコーディング)

WindowsとMACでの文字化け問題ですが、encoding='utf-8-sig' で解決することを初めて知りました。これは便利

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0