2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ScrapyのItem Pipelineで複数Spiderの完了をSlackに通知する

Last updated at Posted at 2022-01-23

できたもの

スクリーンショット 2022-01-24 0.47.12.png
スクリーンショット 2022-01-24 0.45.03.png

環境

  • Python: 3.7.10
  • Scrapy: 2.5.1

背景

個人的にいくつかのクローラーをScrapyで運用していて、クロールが終わったりエラーになったら通知してほしいなと思いました。
ぱっと思いついたやり方は以下の通りでした。

  • (クロール結果がS3に出力されるので)Lambdaで出力を検知して通知する
    • エラーでS3に何も出力されなかったら検知できない
  • Scrapyの各Spider(spiders/*.py)内に通知処理を書く
    • Signalsという機能を使えばできそう
    • エラー検知用のメソッドがある
    • 同じScrapyプロジェクト内に複数のSpiderを動かしていて、同じ処理を複数の場所で書きたくない
  • ScrapyのItem Pipelineを使う
    • ここに書けばすべてのSpiderに適用される
    • エラー検知用のメソッドはないが、自分で書けばよい

ちゃんとエラー検知したいなら専用のメソッドがあるSignalsがよいと思いますが、何ヶ月か運用してエラーは起きなかったので、今回は比較的簡単なItem Pipelineを使いました。

なお、公式リファレンスにもあるように、本来はアイテムごとのバリデーションやDBへの保存処理を行うものです。

また、PythonからSlackに通知するやり方はいろんな記事があるので割愛しますが、このあたりを参考にしました。

実装方法

pipelines.py に処理を記述

Scrapyのプロジェクト内にpipelines.pyがあると思うので、そこに通知処理を書きます。

pipelines.py
from itemadapter import ItemAdapter
import json
import requests
import traceback


def post_slack(message_json):
    webhook_url = "SLACK_WEBHOOK_URL"
    requests.post(webhook_url, json.dumps(message_json))

class [ProjectName]Pipeline:
    def process_item(self, item, spider):
        return item

    def close_spider(self, spider):
        ```spiderが閉じたら起動する処理```

        # クロールが完了したらSlackに通知する
        try:
            stats = spider.crawler.stats.get_stats()
            message_json = {
                "text": f"{spider.name}のクロールが完了しました!",
                "attachments": [
                    {
                        "text": f"レコード数:{stats['item_scraped_count']}"
                    }
                ]
            }

        # エラーなら内容を通知する
        except:
            message_json = {
                "text": f"*{spider.name}のクロールでエラーが発生しました!*",
                "attachments": [
                    {
                        "color": "danger",
                        "text": traceback.format_exc()
                    }
                ]
            }
        post_slack(message_json)
  • close_spiderはデフォルトで用意されているメソッドで、ここに処理を書くとSpiderが閉じたときに実行されます
  • spider.crawler.stats.get_stats()はクローラー内のいろんな統計情報が保存されている辞書です
  • traceback.format_exc()はエラー内容を表示してくれる関数です
  • 注意:上の書き方だと、最初のアイテムのスクレイピングが成功して途中で失敗するケースは拾えません
    • 主なエラーケースはKeyError: 'item_scraped_count'になります
    • item_scraped_countは最初のアイテムの処理が終わった段階でキーが作成されるので、最初が正常に処理できたらKeyErrorになりません
    • 今回は、サイト構造が全体的に変わって何も取得できないケースが拾えればよかったので、上のような実装にしました

settings.pyへの追記

上の処理を反映するために、クラス名を下のようにsettings.pyに追記します(デフォルトだとコメントアウトされているはず)。
300などの数字は優先度を表していて、0-1000の数字が小さい順に適用されるようです。
参考:Settings - Scrapy

settings.py
ITEM_PIPELINES = {
   "[project_name].pipelines.[ProjectName]Pipeline": 300,
}

感想

これで自分用のSlackに完了通知が来るようになり、ちゃんと動いてくれてるなとクローラーを労う気持ちが生まれました。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?