AWS SAM(AWS サーバーレスアプリケーションモデル)を使ってサーバーレスなクローラーを作り、定期実行させて結果をS3に保存することができたので、やり方をまとめておきます。
なお、Lambdaは最大でも15分の処理しか実行できないため、小さめなクローラーを想定しています。
以下の3ステップで実装します。
- SAMのHello Worldチュートリアルをやる
-
template.yaml
,requirements.txt
などを修正する -
app.py
にクローラーを実装する
SAMのHello Worldチュートリアルをやる
ここは公式チュートリアルで詳細に説明されているので省略します。
これをやると、API Gateway経由でHello World
を返すLambda関数ができるはずです。
template.yaml
, requirements.txt
などを修正する
Hello Worldチュートリアルからの変更点として以下の3つがあります。
- クローラーの処理が終わるまでタイムアウトさせないようにする必要がある
- 定期実行する
- クロールした結果をS3に書き込む
- API Gatewayは不要
これらをtemplate.yaml
に反映すると下のようになります。
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: >
python3.9
Sample SAM Template for hello-world
Globals:
Function:
Timeout: 900 # クローラーの処理が終わるまでタイムアウトさせない
MemorySize: 300 # 一応メモリサイズも増やす
Parameters:
S3BucketName:
Type: String
Default: "結果を保存したいバケット名" # バケット名をパラメータに書く
Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
PackageType: Image
Architectures:
- x86_64
Policies: # S3への書き込み権限をつける
- S3WritePolicy:
BucketName: !Ref S3BucketName
Events:
HelloWorld:
Type: Schedule # スケジューラーを設定
Properties:
Schedule: rate(1 day) # 1日おきに実行する
Metadata:
Dockerfile: Dockerfile
DockerContext: ./hello_world
DockerTag: python3.9-v1
ApplicationResourceGroup:
Type: AWS::ResourceGroups::Group
Properties:
Name:
Fn::Join:
- ""
- - ApplicationInsights-SAM-
- Ref: AWS::StackName
ResourceQuery:
Type: CLOUDFORMATION_STACK_1_0
ApplicationInsightsMonitoring:
Type: AWS::ApplicationInsights::Application
Properties:
ResourceGroupName:
Fn::Join:
- ""
- - ApplicationInsights-SAM-
- Ref: AWS::StackName
AutoConfigurationEnabled: "true"
DependsOn: ApplicationResourceGroup
Outputs:
HelloWorldFunction:
Description: Hello World Lambda Function ARN
Value: !GetAtt HelloWorldFunction.Arn
HelloWorldFunctionIamRole:
Description: Implicit IAM Role created for Hello World function
Value: !GetAtt HelloWorldFunctionRole.Arn
なお、今回は定期実行のスケジュールを一日おき(rate(1 day)
)にしてますが、他の指定の仕方はこちらに詳しく書いてあります。
requirements.txt
ではscrapy
をインストールするようにします。
scrapy==2.7.1
app.py
にクローラーを実装する
scrapyを実行する際は下のようにするのが一般的かと思いますが、
- クローラー本体だけでなく、
settings.py
,items.py
などの関連ファイルが必要になる -
bash
などからscrapy crawl ...
で実行する
今回はLambdaで実行するため、下のようにします。
- クローラー本体、設定などはすべて
app.py
に書く- 別ファイルに書いてapp.pyから参照するのもおそらく可能
-
CrawlerProcess
を使ってapp.py
から実行させる
それらの処理をapp.py
に書くと下のようになります(start_urls
, rules
などは適当です)。
from datetime import datetime
from zoneinfo import ZoneInfo
from scrapy.crawler import CrawlerProcess
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
JST = ZoneInfo("Asia/Tokyo")
# spiderの処理を書く
class SampleSpider(CrawlSpider):
name = "sample_spider"
allowed_domains = ["sample.com"]
start_urls = [
...
]
rules = (
Rule(
LinkExtractor(allow=r"..."), callback="parse"
),
)
def parse(self, response):
try:
item = {}
item["url"] = response.url
item["crawled_at"] = f"{datetime.now(JST):%Y-%m-%d %H:%M:%S}"
yield item
except Exception as e:
print(f"!!!crawl failed!!!: {response.url}\n{e}")
def lambda_handler(event, context):
# settings.pyに書いていたものを転記
settings = {
"FEED_FORMAT": "json",
"FEED_EXPORT_ENCODING": "utf-8",
# 出力するS3パス
"FEED_URI": f"s3://バケット名/items_{datetime.now(JST):%Y%m%d}.json",
"ROBOTSTXT_OBEY": True,
"CONCURRENT_REQUESTS": 2,
"DOWNLOAD_DELAY": 1,
"CONCURRENT_REQUESTS_PER_DOMAIN": 2,
"CONCURRENT_REQUESTS_PER_IP": 0,
"HTTPCACHE_ENABLED": False,
}
# Pythonからクローラーを実行
process = CrawlerProcess(settings=settings)
crawler = process.create_crawler(SamplrSpider)
process.crawl(crawler)
process.start()
stats = crawler.stats.get_stats()
# 取得したitem数, 処理時間などを返す
return {
"statusCode": 200,
"body": json.dumps(
{
"message": "success",
"item_scraped_count": stats["item_scraped_count"],
"elapsed_time_seconds": stats["elapsed_time_seconds"],
"start_time": f"{stats['start_time']:%Y-%m-%d %H:%M:%S}",
"finish_time": f"{stats['finish_time']:%Y-%m-%d %H:%M:%S}",
}
),
}
あとはsam build && sam deploy
すれば、クローラーが定期実行されて、その結果が指定したS3に保存されると思います。