※ こちらは「 ヤプリアドベントカレンダー 」の2日目,「 Aidemy Advent Calendar 2022 」の3日目にクロスエントリさせていただいた記事です.この後も続々と記事が投稿されていきますので,是非ご覧ください
はじめに
ニュースサイトを定期的にスクレイピングし,記事の内容を Google ドライブへ保存するデータパイプラインを作ってみました( そして途中で詰まりました…(後述) ).この記事では,作る際に考えたことやつまづいたことなどをまとめています.
やりたいこと
- ニュースサイトを定期的にスクレイピングして記事本文から必要なデータを抽出&蓄積し,分析等で使えるようにすること
- 複数のニュースサイトでも対応可能にすること
作りたかったもの
上記やりたいことを実現するために,下図のパイプラインを考えました.イメージとしては,Google Cloud Storage (GCS) をデータレイク, BigQuery を データウェアハウス (DWH) としたパイプラインを Google ドライブで再現した形です.
パイプラインの機能は以下の通りです.なお,このパイプラインは途中で実装が止まってしまったので,どこまで実装できたのかも併せて記します:
- スクレイピングスクリプトを GitHub Actions 上で定期実行(実装済 )
- スクレイピング結果の HTML データを Google ドライブへ保存(ここで詰まりました… )
- 保存した HTML データから必要な情報を抽出し, Google ドライブ上の DB ファイルへ保存する(未実装 )
- 実行ログをGoogleドライブへ保存(実装済 )
- 重要なログ(エラー発生,スクレイピング進捗など)をSlackへ通知(実装済 )
主な使用技術は次の通りです:
- スクレイピング … Python (Beautiful Soup)
- Googleドライブの操作 … Python (PyDrive2)
- 定期実行 … GitHub Actions
- Slackへの通知 … Slack App
- 開発環境 … Docker
コードは次のリポジトリに掲載しています(なお,元々プライベートリポジトリで作っていたものから公開して問題ない部分を切り取って載せているため,一部でファイル等が欠けています.ご了承ください )
どうやって作ったのか?
① 設計,② 実装の順に説明します.
① 設計
①-1 技術選定・パイプライン設計
満たすべき要件
今回は次の要件が満たせるように技術選定・パイプライン設計を行いました:
- 定期的にスクレイピングして自動でデータが貯まっていくこと
- 後で分析しやすい形でスクレイピング結果を保存すること
主に悩んだ点を以下に示します.
Q1. 実行環境や保存先をどこにするか?
選択肢として,ローカル or クラウドの2択を検討し,最終的に次の理由でクラウドを採用しました:
- プログラムの定期実行がやりやすい(ローカルだと PC 再起動などによって定期実行が止まりやすい)
- スクレイピング結果をローカルに保存しても,バックアップ等でどのみちクラウドにはアップロードすることになる(→ならば,最初からクラウドに保存したほうが手間が少ない)
Q2. どこのクラウドを利用するか?
諸々の事情で請求先設定が面倒そうだったため,請求先設定が不要な実行環境が必要でした(そのため,クラウドと言いつつ AWS, GCP, Azure 等は採用しづらい状況でした).
最終的に,実行環境は GitHub Actions, 保存先は Google ドライブを採用しました.
GitHub Actions を採用した理由は次の通りです:
- GitHub Education のおかげで無料で使えたから
Google ドライブを採用した理由は次の通りです:
- 他の類似無料サービスと比べて大容量だったから
- API が充実しており,外部サービスから扱いやすそうだったから
今思えば,請求先設定が面倒だからと早々あきらめず, GCS や BQ 等の無料枠をもっと検討すべきだったかなと思います…
GitHub Actions のみだとワークフローの不具合を検知しづらいので,次のようなロギング& Slack 通知の仕組みを導入しました:
- 実行ログを Google ドライブへ保存
- ワークフロー失敗等やスクレイピング進捗といった重要なログは Slack へ通知
Q3. パイプライン設計どうする?
パイプラインは ELT 方式を採用し,スクレイピング結果の HTML データを保存してから欲しい情報を抽出するようにしました.理由は,後から欲しい情報が増えた際に再スクレイピングなしで元の HTML データを解析できるからです.スクレイピングして集めたデータを分析していると,後から欲しい情報が増えることが度々あります.
DWH へのデータロードはテーブル洗い替えで行うことにしました.理由は以下の通りです:
- 実装がシンプル(前回どこまでやったか?等を管理する必要が無い)
- リトライが容易
上記を図にするとこのようになります:
Q4. 保存形式どうする?
今回は次の理由であえてスプレッドシートではなく DB ファイルで保存するようにしました:
- 利用方法が基本的に Python なので,そこで扱いやすくするため
- データ件数の増加に強いため(スプレッドシートだと1シート当たりの行数制限がある)
今思えば,下記の理由でスプレッドシートでも良かったな…という気がしています…
- スプレッドシートの行数制限に達するようなデータ件数ならテーブル洗い替えが厳しい
- 洗い替え処理の時間がサービスアカウントの有効時間を超える(後述)
- Google ドライブ上で直接中身が見れないのが不便
Q5. スクレイピング,データロードのプログラムをどうやって作るか?
ここは使いなれた Python + Beautiful Soup の組み合わせを採用しました.理由は, GitHub Actions や Google ドライブあたりの連携で手間取る可能性が高く,そこ以外は手間取らずに作れるようにしておきたかったからです.
①-2 スクレイピング処理のフロー構築
ニュースサイトをスクレイピングするにあたって,アクセスするページは次の2種類です:
- 記事一覧ページ
- 記事本文ページ
したがって,スクレイピングの流れはおおよそ次の 2 ステップになります:
- 記事一覧ページから記事本文ページへのリンクを抽出
- 抽出したリンクをもとに,記事本文ページの HTML データを取得
ただし,ステップ 1 は次の理由で少し工夫が必要です:
- 記事一覧ページは基本的にページングが施されているため,ページ送り処理が必要
- スクレイピングの度に全ての記事本文ページにアクセスすると効率的もサイトへの負荷的にも良くないため,保存済みの記事はスキップする処理が必要
最終的に次の処理フローになりました:
-
記事一覧ページから記事本文ページへのリンクを抽出
- 記事一覧ページから記事本文ページへの URL を抽出する
- Google ドライブから保存済み記事の URL 一覧を取得
- 先ほど抽出した URL から保存済みのものを除外する
- 除外して残った URL が 0 件 or 記事一覧ページの最終ページに到達したら処理終了
-
抽出したリンクをもとに,記事本文ページの HTML データを取得
- 記事本文ページの HTML データを取得する
- 取得した HTML データから必要な要素(例:
<main></main>
で囲まれているところ)だけ抽出する - 抽出した HTML データを Google ドライブへ保存する
①-3 ディレクトリ設計
スクレイピングプログラムのディレクトリ構成
下記のようにしました:
.
├── services/
│ ├── site_1_scraper.py
│ ├── site_2_scraper.py
│ └── ...
├── shared/
│ ├── repository/
│ │ ├── abstract_storage_repository.py
│ │ └── gdrive_storage_repository.py
│ ├── gdrive_client.py
│ ├── gdrive_logger.py
│ ├── scraping_tools.py
│ └── slack_client.py
└── main.py
main.py
を実行することでスクレイピングを行います.
shared/
には共通化した処理( Google ドライブの操作など)が書かれたソースファイルを配置しています.
Google ドライブ操作に関する処理は gdrive_client.py
に書かれています.
保存処理自体は abstract_storage_repository.py
で定義されている AbstractStorageRepository
クラスを継承して実装するようにしています.これにより,保存場所が Google ドライブ以外になっても対応しやすくなります.
services/
には実際に特定のサイトへスクレイピングを行う処理が書かれたソースファイルを配置しています.ここに配置したファイルは main.py
で呼び出して使います.
このディレクトリ構成にした理由は,以前,別のタスクでこのディレクトリ構成に触れたところ,タスク固有のロジックと一般的な処理とが分離されていて体験が良かったからです.
Google ドライブのフォルダ構成
下記のようにしました:
.
├── html/
│ ├── https://site-1.com/
│ │ ├── https://site-1.com/article/1
│ │ ├── https://site-1.com/article/2
│ │ └── ...
│ ├── https://site-2.com/
│ │ ├── https://site-2.com/article/1
│ │ ├── https://site-2.com/article/2
│ │ └── ...
│ └── ...
└── log/
├── scraping/
│ ├── yyyy-mm-dd-HH-MM-SS-ffffff.log
│ ├── yyyy-mm-dd-HH-MM-SS-ffffff.log
│ └── ...
└── load/
├── yyyy-mm-dd-HH-MM-SS-ffffff.log
├── yyyy-mm-dd-HH-MM-SS-ffffff.log
└── ...
html/
には記事の HTML データを保存します.
保存はニュースサイトごとにフォルダを分けて行うようにしました.フォルダ名はニュースサイトのドメインをそのまま使うようにしました.理由はサイトの名前衝突を回避できるからです.新しくサイトを追加したい場合は html/
配下にニュースサイトのドメインでフォルダを作ればOKです.
ファイル名は記事の URL をそのまま使うようにしました.これにより,フォルダ内のファイル名一覧を取得するだけで保存済み記事の URL 一覧を取得することができます.
log/
には実行ログを保存します.ファイル名は (実行時のタイムスタンプ).log
にしました.
② 実装
②-1 Google ドライブの操作
必要な操作
必要な Google ドライブの操作は以下の通りです:
- ファイル一覧の取得(保存済み記事一覧の取得に必要)
- ファイルの保存(記事の保存に必要)
- ファイル内容の更新(ログファイルの更新に必要)
今回は実装したGoogleドライブ操作処理のテストも実装したので,次の操作も実装しました:
- ファイルのアップロード
- ファイルのダウンロード
PyDrive2
Google ドライブ操作処理の実装は PyDrive2 を用いて行いました. PyDrive2 は Google Drive API の Python Wrapper です.
PyDrive2 の環境構築については次の記事をご覧ください:
ただし,先の記事の方法だと 7 日で認証が切れてしまい,定期実行ができません.7 日で認証が切れる理由は以下の通りです:
外部ユーザータイプ用に OAuth 同意画面が構成され、公開ステータスが「テスト中」である Google Cloud Platform プロジェクトは、7 日後に期限切れになる更新トークンが発行されます。
引用: OAuth 2.0 を使用した Google API へのアクセス | Authorization | Google Developers
今回作成した OAuth 認証は PyDrive2 から Google ドライブを操作できるようにするために作った認証のため,公開ステータスは当然「テスト中」となります.
そこで,今回はサービスアカウントを利用することにしました.利用手順は以下の通りです:
- サービスアカウントを作成する
- サービスアカウントの鍵を作成し,その JSON ファイルをリポジトリ内に配置する
- サービスアカウントに Google ドライブのアクセスしたいフォルダへのアクセス権を付与する
PyDrive2 による認証とインスタンス作成は次のコードで行えます:
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from oauth2client.service_account import ServiceAccountCredentials
# 認証
gauth = GoogleAuth()
scope = ["https://www.googleapis.com/auth/drive"]
gauth.credentials = ServiceAccountCredentials.from_json_keyfile_name(os.environ["SA_KEY_FILE"], scope)
# インスタンスを作成
drive = GoogleDrive(gauth)
先ほど挙げた必要操作を PyDrive2 でどのように実装したかについては,次の記事に詳細を記しましたのでご覧いただければと思います:
②-2 スクレイピング処理
スクレイピング処理の実装は,ニュースサイトごとに前述の処理フローを実装したクラスを作ることで行いました.
クラスの構造は,実際にスクレイピング処理を実行するパブリックメソッド .execute()
,記事の保存等細かい処理を実装したプライベートメソッドの 2 つで構成されています.
②-3 ロギング
ログレベルの設定やログフォーマット等の環境設定は,JSONファイルに記述してそれを読み込む方法を採用しました.環境設定のJSONファイルの書き方は次の記事を参考にしました.
参考:
何をロギングするか?どれをSlackへ通知するか?等は実際にログの出力を見てちゃんと処理状況を理解できるか検証しながら決めていきました.その結果,次のようになりました:
ログの内容 | ログレベル | Slackへ通知するか? |
---|---|---|
サイトAでスクレイピングを開始する | INFO | o |
記事一覧のnページ目をスクレイピングする | INFO | o |
URLのリクエストが失敗する | ERROR | o |
そのページでm個の記事本文ページへのURLを取得した | INFO | |
l個の保存済み記事のURLを取得した | INFO | |
k件の記事が既に保存済みだったので除外した | INFO | |
このページでは新たに(m-k)件の記事を保存する | x | o |
記事本文ページのスクレイピングを開始する | INFO | |
{URL}の記事をスクレイピングする | INFO | |
記事を保存した | INFO | |
スクレイピングが完了した | INFO | o |
例外が発生した | ERROR | o |
②-4 Slack への通知
Slack への通知は、 Slack App を作成することで行いました.実装手順は次の通りです:
- 通知用 Slack App を作成し、通知したいチャンネルへ追加
- 通知処理を実装(
slack_client.py
) - スクレイピング処理で通知処理を呼び出し、Slack へ通知を送信
参考:
②-5 GitHub Actions ワークフローの構築
定期実行を実現するために、GitHub Actions では次のワークフローを作成しました.
ワークフロー作成で工夫した点は次の3つです:
- 環境変数の利用
- ローカルと GitHub Actions 上でマウント先が変わる問題の解消
- 定期実行の実現
環境変数の利用
通常の環境変数はリポジトリの Secrets
に登録して、ワークフローの env:
で呼び出すことで利用しました.
PyDrive2
の settings.yml
や認証情報 JSON
ファイル、ロギング環境設定 JSON
ファイルなどの構造化データは、 Base64
でエンコードして Secrets
に登録してワークフロー内でデコードすることで利用しました.
ただ、構造化データの利用方法はもう少し安全で適切な方法がありそうです…
ローカルと GitHub Actions 上でマウント先が変わる問題の解消
今回, Docker
を用いて開発をするにあたって作業ディレクトリを環境変数として WORKING_DIR = /work
のように設定し,そこにプロジェクトのディレクトリをマウントしていました.そのため,プログラム内でファイルを読み込む際は f'os.environ["WORKING_DIR"]/hoge/bar
のように環境変数 WORKING_DIR
利用していました.
しかし、 GitHub Actions 上では /home/runner
という異なる場所へマウントされてしまうため,そのままだと f'os.environ["WORKING_DIR"]/hoge/bar
は Not Found Error
となってしまいます.
そこで、ワークフロー内で export WORKING_DIR=$PWD
を実行して WORKING_DIR
の値をカレントディレクトリのパスで更新することでこの問題を解決しました.
図にすると下記のようになります:
定期実行
GitHub 上での定期実行は、次の記事を参考に行いました.
注意点は以下の通りです:
- ワークフローのタイムゾーンは UTC なため、 JST の場合は 9 時間引いた値でスケジューリングする必要がある
- スケジュールした時刻ぴったりにワークフローが実行されるわけではなく、何分かズレる可能性がある
特に後者は公式ドキュメントで注釈が付いています:
ノート: scheduleイベントは、GitHub Actionsのワークフローの実行による高負荷の間、遅延させられることがあります。 高負荷の時間帯には、毎時の開始時点が含まれます。 遅延の可能性を減らすために、Ⅰ時間の中の別の時間帯に実行されるようワークフローをスケジューリングしてください。
詰まっているところ
PyDrive2 で利用している サービスアカウントの有効時間内に次の処理が終わらない 点で詰まっています:
- 新しく追加したサイトのスクレイピング処理
- 保存した HTML データを DB ファイルへロードする処理
原因は次の 2 点です:
- PyDrive2 で利用しているサービスアカウントの有効時間はデフォルト 1 時間
- PyDrive2 で 1 ファイル操作するのに 1 秒程かかる
これにより, 1 ファイル/秒 * 1 時間 = 3600 ファイル
以上は扱えない状況となっています.
恒久的な解決策としては次の 2 つを考えています.ただ,正直「そこまでするなら素直に GCS + BQ の構成するなどして, Google Cloud サービスで統一したほうがやりやすいのでは…?」という感があります…
- 有効時間を延長する
- どこまで取ってこれたかを記録して,次回実行時に続きから再開できるようにする
とはいえ分析のためのパイプラインであり,また 1 から作り直すと分析で使える状態になるまでさらに時間がかかってしまって本末転倒です.そこで現在は暫定的な解決策として次のフローを取ることで,辛うじて分析可能な状態にしています:
- 初めてのサイトはローカルでスクレイピングを実行
- スクレイピング結果の HTML データ を Google ドライブへアップロード
- 以降は作成したパイプラインで定期スクレイピング
- 分析の際は Google ドライブから HTML データをダウンロードする
最後に
この記事では,ニュースサイトを定期的にスクレイピングし,記事の内容を Google ドライブへ保存するデータパイプラインの作成を試み,その際に考えたことや詰まっているところなどについて記しました.
もし記事に記載した内容に誤りがあったり,もっと良い方法等がありましたら,コメントでご指摘いただけると幸いです.
ここまでお読みいただきありがとうございました