Connpassで勉強会行ってますか?
自分はConnpassでそこそこ興味があって近い所があれば聞きに行きたいなくらいの人間です。
今回は、近くで開催されるイベントを定期的にslackに通知してみようと思います。
※connpassにはapiが公開されてたんですね、知らずに地道にスクレイピングしてしまった・・・
(まあ、スクレイピングを学ぶいい機会になったので良しとしよう。。)
StepFunctions
視覚的なワークフローを使用して、分散アプリケーションとマイクロサービスのコンポーネントを調整できるウェブサービスです。
公式より引用
StepFunctionsを使う背景
これまで連続する処理はlambdaを分けて、親lambdaが子lambdaをキックするような設計でエラー時のslack通知を実装してきましたが、以下の記事にもあるようにlambda間が密結合で、メンテナンス性に優れているとは言えないと思います。
2.複数の Lambda に分け、Lambda Function から Lambda function を呼ぶようにする
コードのメンテナンス性は多少上がるかもしれませんが、エラーが発生したときに「どこまで処理が完了したか」というステートを管理できておらず、再ラン性は低いままです。
また、それぞれの Lambda Function に「次に呼ぶ function の情報」を埋め込むことになるので、密結合なコードになってしまいます。
StepFunctionsを使ってみる。
使ってみるとめちゃくちゃ簡単にlambdaを実行できた。これなら色々やってみたいと思う。
今回は初歩の初歩でただlambda(getConnpassSchedule)がlambda(notifySlack)を呼ぶだけの構造を作る。
{
"Comment": "Get and notify connpass schedule flow",
"StartAt": "getConnpassSchedule",
"States": {
"getConnpassSchedule": {
"Comment": "Get connpass schedule ",
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:getConnpassSchedule",
"InputPath": "$",
"ResultPath": "$",
"OutputPath": "$",
"Next": "notifySlack"
},
"notifySlack": {
"Comment": "Notify connpass schedule to slack",
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:notifySlack",
"InputPath": "$",
"ResultPath": "$",
"OutputPath": "$",
"End": true
}
}
}
Type
にTask
を指定したことでlambdaが設定される。
そのほかのパラメータは以下。
項目 | 説明 |
---|---|
InputPath | 入力値。引数eventで受け取れる。 |
ReturnPath | 返却値。lambdaがreturnした値。 |
OutputPath | 出力値。次の関数の入力値として渡す値。 |
Next | 次のステートを指定。 |
End | trueで終了を意味する。 |
$
はStepFunctionの各ステートの実行に際して渡すjsonであり、$.key
として特定のキーに対応した値のみを渡すことも可能。
なお、stepfunctions作成後すぐだとIAMロールが準備されていないことがあり、この間にテストしても直後に失敗するため、その場合は少し待つ。(新規に作成したIAMロールがIAMコンソール上で見つからない場合はまだできてない。)
Connpassから新着情報を拾ってくる。
構成図
定期的にサイトを訪問し、新着情報に"大阪"のキーワードがあればslackへ流します。
何度も読み込むとご迷惑なので、一度読んだ情報に到達したらそれ以上は読み込まないように中止するため、S3に最新情報を保存しておきます。
Dockerを使ってライブラリやソースコードを丸ごとs3に乗せ、lambdaから実行します。
Dockerはビルドする際にカレントディレクトリのファイルを丸ごとパッケージングするため、作業用ディレクトリを作成してから以下の手順に入る。
インストールする各種モジュールは互換性の関係上、最新版ではなく指定のものを利用するらしい。
$ mkdir -p bin/
$ curl -SL https://github.com/adieuadieu/serverless-chrome/releases/download/v1.0.0-37/stable-headless-chromium-amazonlinux-2017-03.zip > headless-chromium.zip
$ unzip headless-chromium.zip -d bin/
$ rm headless-chromium.zip
$ brew tap homebrew/cask
$ brew cask install chromedriver
追記(2019/12/26)
以下のエラー出た場合、ChromeDriverのアップデートが必要です。
selenium.common.exceptions.WebDriverException: Message: session not created: This version of ChromeDriver only supports Chrome version 77
$ brew tap homebrew/cask
$ brew cask reinstall chromedriver
requirements.txt
pipで入れるパッケージを箇条書きにするだけで、好みのパッケージを入れた版のpython実行環境が作れるテキストファイル。
selenium==2.53.6
slackweb==1.0.5
Dockerfile
拡張子なしのただのテキストとして記述すればいい。
FROM lambci/lambda:build-python3.6
ENV AWS_DEFAULT_REGION ap-northeast-1
ENV APP_DIR /var/task
ADD . .
CMD pip install -r requirements.txt -t $APP_DIR && \
zip -9 deploy_package.zip index.py && \
zip -r9 deploy_package.zip *
スクレイピングするソースコードを用意。名前はindex.pyとした(変える場合はlambdaのハンドラも変えること)。
//コードはページの一番下
作業用ディレクトリの完成形は以下。
getConnpassSchedule
├── bin
│ ├── chromedriver
│ └── headless-chromium
├── Dockerfile
├── index.py
└── requirements.txt
dockerを実行するとさらに、deploy_package.zipやseleniumなどのファイルも勝手に生成される。
dockerビルドして実行。
$ docker build -t イメージ名 .
$ docker run -v $(PWD):/var/task イメージ名
参考サイトでは${PWD}
としていたがエラーした。ディレクトリ内に”deploy_pacakge.zip”が作成されるのでこれを事前に用意したS3バケットにアップする。
$ aws s3 cp deploy_package.zip s3://バケット名
$ aws lambda create-function --region ap-northeast-1 --function-name getConnpassSchedule --runtime python3.6 --role IAMロール名(arn) --code S3Bucket=バケット名,S3Key=deploy_package.zip --handler index.lambda_handler --memory-size 512 --timeout 300
!注意!
GUIからlambdaを用意して作成してもよいが、メモリサイズを512にしないて実行するとえらい時間がかかって軒並みタイムアウトする。(デフォルトは128とかで足りない。)
Slackへ通知する
引数としてchannelとメッセージ内容textを受け取って通知するだけの関数を用意。
console.log("start lambda");
const https = require('https');
const url = require('url');
const slack_url = process.env.SLACK_WEBHOOK_URL;
const slack_req_ops = url.parse(slack_url);
slack_req_ops.method = 'POST';
slack_req_ops.header = {'Content-Type':'application/json'};
exports.handler = function(event, context){
console.log(event.text);
if(event.text){
var req = https.request(slack_req_ops, function(res){
if(res.statusCode == 200){
context.succeed('post to slack');
}else{
context.fail('[ERROR] status code : ' + res.statusCode);
}
});
req.on('error', function(e){
console.log('[ERROR] problem with request', e.message);
context.fail(e.message);
});
var channel = event.channel;
var message = event.text;
req.write(JSON.stringify({"channel":channel,"text":message}));
req.end();
}
};
StepFunctionsで連携させる。
上記のjsonで完了済み。
あとはstepfunctionsから実行するのみ。
今後やってみようと思うこと。
- そもそも今の通知内容はタイトルと場所だけなので開催日時情報とか定員情報も欲しいところ。
- slack上で申し込みまで完結させたい。
- 出張で東京に行くこともあるのでカレンダーから日程ごとにフィルタ対象の開催地を変更する。
蛇足
何回かコードを書き直して、docker実行して、s3あげて。。。の作業をいちいちやるのも疲れるので全部連結したシェルスクリプトを用意して実行するだけでもだいぶマシになりました。
そのうちデプロイの自動化とかにも手を出していこうと思いつつなかなか手が出せていない。。。
#!/bun/bash
docker build -t イメージ名 . \
&& docker run -v $(PWD):/var/task lambda_connpass \
&& aws s3 cp deploy_package.zip s3://バケット名 \
#&& aws s3 cp /Users/kept1994/Downloads/next.txt s3://バケット名 \
&& aws lambda update-function-code --function-name getConnpassSchedule --s3-bucket バケット名 --s3-key deploy_package.zip \
&& aws stepfunctions start-execution --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:getConnpassSchedule
# StepFunctions連携前のテストは以下で差し替え
#&& aws lambda invoke --function-name getConnpassSchedule --payload '{ "test": "1" }' response.json
追記 (2019/12/26)
SAM+CodeDeployでlambdaへデプロイする。
参考
スクレイピング〜S3へのアップ : https://torahack.com/python-scraping-docker-for-lambda/
ソースコード
import json
import os
from urllib.parse import parse_qs
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import Select
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException
import re
import boto3
import time
Lambda = boto3.client("lambda")
S3 = boto3.client("s3")
S3_r = boto3.resource('s3')
def readS3():
result = S3_r.meta.client.download_file('バケット名', 'next.txt', '/tmp/next.txt')
with open('/tmp/next.txt', 'r') as f:
title = f.read()
return title
def updateS3(event_title):
with open('/tmp/tmp.txt', 'w') as f:
f.write(event_title)
with open('/tmp/tmp.txt', 'r') as f:
data = f.read()
result = S3.put_object(
ACL='private',
Body=data,
Bucket="バケット名",
Key="next.txt"
)
class Event:
def __init__(self, title, place):
self.title = title
self.place = place
def getTitle(self):
return self.title
def getSlackMessage(self):
return "title: " + self.title + "\nplace: " + self.place + "\n"
eventList = list()
def readOnePage(driver, roopCount, end_title):
try:
title = ""
place = ""
print("LOOP : " + str(roopCount))
for i in range(1,5):
if 'event_title' == driver.find_element_by_xpath("//*[@id=\"main\"]/div/div[" + str(roopCount) + "]/div/p[" + str(i) + "]").get_attribute('class'):
print(" " + str(i) + " : title")
title = driver.find_element_by_xpath("//*[@id=\"main\"]/div/div[" + str(roopCount) + "]/div/p[" + str(i) + "]/a").text
print(" " + title)
elif '' == driver.find_element_by_xpath("//*[@id=\"main\"]/div/div[" + str(roopCount) + "]/div/p[" + str(i) + "]").get_attribute('class'):
print(" " + str(i) + " : place")
place = driver.find_element_by_xpath("//*[@id=\"main\"]/div/div[" + str(roopCount) + "]/div/p[" + str(i) + "]/span[2]/span").text
print(" " + place)
break
else:
print(" " + str(i) + " : not target element.")
pass
if end_title == title:
print("new event list is end.")
return False
if '大阪' in place:
print("find key word!")
eventList.append(Event(title, place))
else:
pass
roopCount += 1
return True
except NoSuchElementException:
print("explore end")
slack_message = '[ERROR] NoSuchElementException error occured.'
return False
except:
import traceback
traceback.print_exc()
slack_message = '[ERROR] Unexpected error occurred'
return False
return False
def getNextButton(num):
if num == 1:
return 7
elif num < 5:
return num + 7
else:
return 12
def lambda_handler(event,context):
# Headless Chromeを使うための設定を追加
options = Options()
options.add_argument('--headless')
options.add_argument('--disable-gpu')
options.add_argument("--disable-application-cache")
options.add_argument("--disable-infobars")
options.add_argument("--no-sandbox")
options.add_argument("--hide-scrollbars")
options.add_argument("--enable-logging")
options.add_argument("--log-level=0")
options.add_argument("--v=99")
options.add_argument("--single-process")
options.add_argument("--ignore-certificate-errors")
# Headless Chromeを起動
options.binary_location = "./bin/headless-chromium"
#--------------------
# ローカルでの検証環境用
#driver = webdriver.Chrome()
# awsにあげる本番環境用
driver = webdriver.Chrome(executable_path="./bin/chromedriver", chrome_options=options)
#--------------------
driver.get("https://connpass.com/explore/")
roopCount = 1
page = 1
MAX_PAGES = 3
end_title = readS3()
while page <= MAX_PAGES:
isContinue = readOnePage(driver, roopCount, end_title)
if isContinue:
try:
if roopCount >= 20:
print("--------*--------*" + str(page) + "*--------*--------")
roopCount = 1
driver.find_element_by_xpath("//*[@id=\"main\"]/div/div[21]/ul/li[" + str(getNextButton(page)) + "]/a").click()
time.sleep(3)
page += 1
else:
roopCount += 1
except NoSuchElementException:
print("explore end!")
slack_message = '[NOTICE] New event has NOT held in Osaka.'
break
else:
print("loop end")
break
driver.close()
driver.quit()
#Slack通知メッセージ整形
slack_payloads = ""
if not len(eventList) == 0:
for event_obj in eventList:
payload = event_obj.getSlackMessage()
slack_payloads += payload
updateS3(eventList[0].getTitle())
else:
pass
print("lambda end")
return {
"channel": "#チャンネル名",
"text": slack_payloads
}
#--------------------
# ローカルでの検証環境用
#lambda_handler(0,0)
#--------------------