search
LoginSignup
1
Help us understand the problem. What are the problem?

posted at

updated at

ScrapyのItem Pipelineで複数Spiderの完了をSlackに通知する

できたもの

スクリーンショット 2022-01-24 0.47.12.png
スクリーンショット 2022-01-24 0.45.03.png

環境

  • Python: 3.7.10
  • Scrapy: 2.5.1

背景

個人的にいくつかのクローラーをScrapyで運用していて、クロールが終わったりエラーになったら通知してほしいなと思いました。
ぱっと思いついたやり方は以下の通りでした。

  • (クロール結果がS3に出力されるので)Lambdaで出力を検知して通知する
    • エラーでS3に何も出力されなかったら検知できない
  • Scrapyの各Spider(spiders/*.py)内に通知処理を書く
    • Signalsという機能を使えばできそう
    • エラー検知用のメソッドがある
    • 同じScrapyプロジェクト内に複数のSpiderを動かしていて、同じ処理を複数の場所で書きたくない
  • ScrapyのItem Pipelineを使う
    • ここに書けばすべてのSpiderに適用される
    • エラー検知用のメソッドはないが、自分で書けばよい

ちゃんとエラー検知したいなら専用のメソッドがあるSignalsがよいと思いますが、何ヶ月か運用してエラーは起きなかったので、今回は比較的簡単なItem Pipelineを使いました。

なお、公式リファレンスにもあるように、本来はアイテムごとのバリデーションやDBへの保存処理を行うものです。

また、PythonからSlackに通知するやり方はいろんな記事があるので割愛しますが、このあたりを参考にしました。

実装方法

pipelines.py に処理を記述

Scrapyのプロジェクト内にpipelines.pyがあると思うので、そこに通知処理を書きます。

pipelines.py
from itemadapter import ItemAdapter
import json
import requests
import traceback


def post_slack(message_json):
    webhook_url = "SLACK_WEBHOOK_URL"
    requests.post(webhook_url, json.dumps(message_json))

class [ProjectName]Pipeline:
    def process_item(self, item, spider):
        return item

    def close_spider(self, spider):
        ```spiderが閉じたら起動する処理```

        # クロールが完了したらSlackに通知する
        try:
            stats = spider.crawler.stats.get_stats()
            message_json = {
                "text": f"{spider.name}のクロールが完了しました!",
                "attachments": [
                    {
                        "text": f"レコード数:{stats['item_scraped_count']} 件"
                    }
                ]
            }

        # エラーなら内容を通知する
        except:
            message_json = {
                "text": f"*{spider.name}のクロールでエラーが発生しました!*",
                "attachments": [
                    {
                        "color": "danger",
                        "text": traceback.format_exc()
                    }
                ]
            }
        post_slack(message_json)
  • close_spiderはデフォルトで用意されているメソッドで、ここに処理を書くとSpiderが閉じたときに実行されます
  • spider.crawler.stats.get_stats()はクローラー内のいろんな統計情報が保存されている辞書です
  • traceback.format_exc()はエラー内容を表示してくれる関数です
  • 注意:上の書き方だと、最初のアイテムのスクレイピングが成功して途中で失敗するケースは拾えません
    • 主なエラーケースはKeyError: 'item_scraped_count'になります
    • item_scraped_countは最初のアイテムの処理が終わった段階でキーが作成されるので、最初が正常に処理できたらKeyErrorになりません
    • 今回は、サイト構造が全体的に変わって何も取得できないケースが拾えればよかったので、上のような実装にしました

settings.pyへの追記

上の処理を反映するために、クラス名を下のようにsettings.pyに追記します(デフォルトだとコメントアウトされているはず)。
300などの数字は優先度を表していて、0-1000の数字が小さい順に適用されるようです。
参考:Settings - Scrapy

settings.py
ITEM_PIPELINES = {
   "[project_name].pipelines.[ProjectName]Pipeline": 300,
}

感想

これで自分用のSlackに完了通知が来るようになり、ちゃんと動いてくれてるなとクローラーを労う気持ちが生まれました。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
1
Help us understand the problem. What are the problem?