2
2

More than 3 years have passed since last update.

近くで開催される勉強会を定期的にslackへ流す。

Last updated at Posted at 2019-11-16

Connpassで勉強会行ってますか?

自分はConnpassでそこそこ興味があって近い所があれば聞きに行きたいなくらいの人間です。

今回は、近くで開催されるイベントを定期的にslackに通知してみようと思います。

※connpassにはapiが公開されてたんですね、知らずに地道にスクレイピングしてしまった・・・
(まあ、スクレイピングを学ぶいい機会になったので良しとしよう。。)

StepFunctions

視覚的なワークフローを使用して、分散アプリケーションとマイクロサービスのコンポーネントを調整できるウェブサービスです。

公式より引用

StepFunctionsを使う背景

これまで連続する処理はlambdaを分けて、親lambdaが子lambdaをキックするような設計でエラー時のslack通知を実装してきましたが、以下の記事にもあるようにlambda間が密結合で、メンテナンス性に優れているとは言えないと思います。

2.複数の Lambda に分け、Lambda Function から Lambda function を呼ぶようにする

コードのメンテナンス性は多少上がるかもしれませんが、エラーが発生したときに「どこまで処理が完了したか」というステートを管理できておらず、再ラン性は低いままです。
また、それぞれの Lambda Function に「次に呼ぶ function の情報」を埋め込むことになるので、密結合なコードになってしまいます。

StepFunctionsを使ってみる。

使ってみるとめちゃくちゃ簡単にlambdaを実行できた。これなら色々やってみたいと思う。

今回は初歩の初歩でただlambda(getConnpassSchedule)がlambda(notifySlack)を呼ぶだけの構造を作る。

用意したjson
{
    "Comment": "Get and notify connpass schedule flow",
    "StartAt": "getConnpassSchedule",
    "States": {
        "getConnpassSchedule": {
            "Comment": "Get connpass schedule ",
            "Type": "Task",
            "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:getConnpassSchedule",
            "InputPath": "$",
            "ResultPath": "$",
            "OutputPath": "$",
            "Next": "notifySlack"
        },
        "notifySlack": {
            "Comment": "Notify connpass schedule to slack",
            "Type": "Task",
            "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:notifySlack",
            "InputPath": "$",
            "ResultPath": "$",
            "OutputPath": "$",
            "End": true
        }
    }
}

TypeTaskを指定したことでlambdaが設定される。
そのほかのパラメータは以下。

項目 説明
InputPath 入力値。引数eventで受け取れる。
ReturnPath 返却値。lambdaがreturnした値。
OutputPath 出力値。次の関数の入力値として渡す値。
Next 次のステートを指定。
End trueで終了を意味する。

$はStepFunctionの各ステートの実行に際して渡すjsonであり、$.keyとして特定のキーに対応した値のみを渡すことも可能。

なお、stepfunctions作成後すぐだとIAMロールが準備されていないことがあり、この間にテストしても直後に失敗するため、その場合は少し待つ。(新規に作成したIAMロールがIAMコンソール上で見つからない場合はまだできてない。)

Connpassから新着情報を拾ってくる。

構成図

image.png

定期的にサイトを訪問し、新着情報に"大阪"のキーワードがあればslackへ流します。
何度も読み込むとご迷惑なので、一度読んだ情報に到達したらそれ以上は読み込まないように中止するため、S3に最新情報を保存しておきます。

Dockerを使ってライブラリやソースコードを丸ごとs3に乗せ、lambdaから実行します。

Dockerはビルドする際にカレントディレクトリのファイルを丸ごとパッケージングするため、作業用ディレクトリを作成してから以下の手順に入る。

インストールする各種モジュールは互換性の関係上、最新版ではなく指定のものを利用するらしい。

headless-chromiumのダウンロード
$ mkdir -p bin/
$ curl -SL https://github.com/adieuadieu/serverless-chrome/releases/download/v1.0.0-37/stable-headless-chromium-amazonlinux-2017-03.zip > headless-chromium.zip
$ unzip headless-chromium.zip -d bin/
$ rm headless-chromium.zip
chromedriverのダウンロード
$ brew tap homebrew/cask
$ brew cask install chromedriver

追記(2019/12/26)

以下のエラー出た場合、ChromeDriverのアップデートが必要です。

selenium.common.exceptions.WebDriverException: Message: session not created: This version of ChromeDriver only supports Chrome version 77

$ brew tap homebrew/cask
$ brew cask reinstall chromedriver

requirements.txt

pipで入れるパッケージを箇条書きにするだけで、好みのパッケージを入れた版のpython実行環境が作れるテキストファイル。

requirements.txt
selenium==2.53.6
slackweb==1.0.5

Dockerfile

拡張子なしのただのテキストとして記述すればいい。

Dockerfile
FROM lambci/lambda:build-python3.6

ENV AWS_DEFAULT_REGION ap-northeast-1
ENV APP_DIR /var/task

ADD . .

CMD pip install -r requirements.txt -t $APP_DIR && \
  zip -9 deploy_package.zip index.py && \
  zip -r9 deploy_package.zip *

スクレイピングするソースコードを用意。名前はindex.pyとした(変える場合はlambdaのハンドラも変えること)。
//コードはページの一番下

作業用ディレクトリの完成形は以下。

ディレクトリ構成図
getConnpassSchedule
├── bin
│   ├── chromedriver
│   └── headless-chromium
├── Dockerfile
├── index.py
└── requirements.txt

dockerを実行するとさらに、deploy_package.zipやseleniumなどのファイルも勝手に生成される。
dockerビルドして実行。

$ docker build -t イメージ名 .
$ docker run -v $(PWD):/var/task イメージ名

参考サイトでは${PWD}としていたがエラーした。ディレクトリ内に”deploy_pacakge.zip”が作成されるのでこれを事前に用意したS3バケットにアップする。

$ aws s3 cp deploy_package.zip s3://バケット名
$ aws lambda create-function --region ap-northeast-1 --function-name getConnpassSchedule --runtime python3.6 --role IAMロール名(arn) --code S3Bucket=バケット名,S3Key=deploy_package.zip --handler index.lambda_handler --memory-size 512 --timeout 300

!注意!
GUIからlambdaを用意して作成してもよいが、メモリサイズを512にしないて実行するとえらい時間がかかって軒並みタイムアウトする。(デフォルトは128とかで足りない。)

Slackへ通知する

引数としてchannelとメッセージ内容textを受け取って通知するだけの関数を用意。

notifySlack
console.log("start lambda");

const https = require('https');
const url = require('url');
const slack_url = process.env.SLACK_WEBHOOK_URL;
const slack_req_ops = url.parse(slack_url);
slack_req_ops.method = 'POST';
slack_req_ops.header = {'Content-Type':'application/json'};

exports.handler = function(event, context){
    console.log(event.text);

    if(event.text){
        var req = https.request(slack_req_ops, function(res){
            if(res.statusCode == 200){
                context.succeed('post to slack');
            }else{
                context.fail('[ERROR] status code : ' + res.statusCode);
            }
        });

        req.on('error', function(e){
            console.log('[ERROR] problem with request', e.message);
            context.fail(e.message);
        });

        var channel = event.channel;
        var message = event.text;

        req.write(JSON.stringify({"channel":channel,"text":message}));
        req.end();

    }
};

StepFunctionsで連携させる。

上記のjsonで完了済み。
あとはstepfunctionsから実行するのみ。

今後やってみようと思うこと。

  • そもそも今の通知内容はタイトルと場所だけなので開催日時情報とか定員情報も欲しいところ。
  • slack上で申し込みまで完結させたい。
  • 出張で東京に行くこともあるのでカレンダーから日程ごとにフィルタ対象の開催地を変更する。

蛇足

何回かコードを書き直して、docker実行して、s3あげて。。。の作業をいちいちやるのも疲れるので全部連結したシェルスクリプトを用意して実行するだけでもだいぶマシになりました。

そのうちデプロイの自動化とかにも手を出していこうと思いつつなかなか手が出せていない。。。

#!/bun/bash

docker build -t イメージ名 . \
&& docker run -v $(PWD):/var/task lambda_connpass \
&& aws s3 cp deploy_package.zip s3://バケット名 \
#&& aws s3 cp /Users/kept1994/Downloads/next.txt s3://バケット名 \
&& aws lambda update-function-code --function-name getConnpassSchedule --s3-bucket バケット名 --s3-key deploy_package.zip \
&& aws stepfunctions start-execution --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:getConnpassSchedule

# StepFunctions連携前のテストは以下で差し替え
#&& aws lambda invoke --function-name getConnpassSchedule --payload '{ "test": "1" }' response.json

追記 (2019/12/26)
SAM+CodeDeployでlambdaへデプロイする。

参考

スクレイピング〜S3へのアップ : https://torahack.com/python-scraping-docker-for-lambda/

ソースコード

index.py
import json
import os
from urllib.parse import parse_qs
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import Select
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException
import re
import boto3
import time

Lambda = boto3.client("lambda")
S3 = boto3.client("s3")
S3_r = boto3.resource('s3')

def readS3():  
    result = S3_r.meta.client.download_file('バケット名', 'next.txt', '/tmp/next.txt')

    with open('/tmp/next.txt', 'r') as f:
        title = f.read()
    return title


def updateS3(event_title):
    with open('/tmp/tmp.txt', 'w') as f:
        f.write(event_title)
    with open('/tmp/tmp.txt', 'r') as f:
        data = f.read()
        result = S3.put_object(
        ACL='private',
        Body=data,
        Bucket="バケット名",
        Key="next.txt"
        )


class Event:
    def __init__(self, title, place):
        self.title = title
        self.place = place
    def getTitle(self):
        return self.title
    def getSlackMessage(self):
        return "title: " + self.title + "\nplace: " + self.place + "\n"

eventList = list()

def readOnePage(driver, roopCount, end_title):
    try:
        title = ""
        place = ""
        print("LOOP : " + str(roopCount))
        for i in range(1,5):
            if 'event_title' == driver.find_element_by_xpath("//*[@id=\"main\"]/div/div[" + str(roopCount) + "]/div/p[" + str(i) + "]").get_attribute('class'):
                print("    " + str(i) + " : title")
                title = driver.find_element_by_xpath("//*[@id=\"main\"]/div/div[" + str(roopCount) + "]/div/p[" + str(i) + "]/a").text
                print("    " + title)
            elif '' == driver.find_element_by_xpath("//*[@id=\"main\"]/div/div[" + str(roopCount) + "]/div/p[" + str(i) + "]").get_attribute('class'):
                print("    " + str(i) + " : place")
                place = driver.find_element_by_xpath("//*[@id=\"main\"]/div/div[" + str(roopCount) + "]/div/p[" + str(i) + "]/span[2]/span").text
                print("    " + place)
                break
            else:
                print("    " + str(i) + " : not target element.")
                pass
        if end_title == title:
            print("new event list is end.")
            return False
        if '大阪' in place:
            print("find key word!")
            eventList.append(Event(title, place))
        else:
            pass
        roopCount += 1
        return True
    except NoSuchElementException:
        print("explore end")
        slack_message = '[ERROR] NoSuchElementException error occured.'
        return False
    except:
        import traceback
        traceback.print_exc()
        slack_message = '[ERROR] Unexpected error occurred'
        return False    
    return False

def getNextButton(num):
    if num == 1:
        return 7
    elif num < 5:
        return num + 7 
    else:
        return 12


def lambda_handler(event,context):

    # Headless Chromeを使うための設定を追加
    options = Options()
    options.add_argument('--headless')
    options.add_argument('--disable-gpu')
    options.add_argument("--disable-application-cache")
    options.add_argument("--disable-infobars")
    options.add_argument("--no-sandbox")
    options.add_argument("--hide-scrollbars")
    options.add_argument("--enable-logging")
    options.add_argument("--log-level=0")
    options.add_argument("--v=99")
    options.add_argument("--single-process")
    options.add_argument("--ignore-certificate-errors")
    # Headless Chromeを起動
    options.binary_location = "./bin/headless-chromium"

    #--------------------
    # ローカルでの検証環境用
    #driver = webdriver.Chrome()
    # awsにあげる本番環境用
    driver = webdriver.Chrome(executable_path="./bin/chromedriver", chrome_options=options)
    #--------------------

    driver.get("https://connpass.com/explore/")

    roopCount = 1
    page = 1
    MAX_PAGES = 3

    end_title = readS3()

    while page <= MAX_PAGES:
        isContinue = readOnePage(driver, roopCount, end_title)
        if isContinue:
            try:
                if roopCount >= 20:
                    print("--------*--------*" + str(page) + "*--------*--------")
                    roopCount = 1
                    driver.find_element_by_xpath("//*[@id=\"main\"]/div/div[21]/ul/li[" + str(getNextButton(page)) + "]/a").click()
                    time.sleep(3)
                    page += 1
                else:
                    roopCount += 1
            except NoSuchElementException:
                print("explore end!")
                slack_message = '[NOTICE] New event has NOT held in Osaka.'
                break
        else:
            print("loop end")
            break

    driver.close()
    driver.quit()

    #Slack通知メッセージ整形
    slack_payloads = ""
    if not len(eventList) == 0:
        for event_obj in eventList:
            payload = event_obj.getSlackMessage()
            slack_payloads += payload
        updateS3(eventList[0].getTitle())
    else:
        pass
    print("lambda end")    
    return {
        "channel": "#チャンネル名",
        "text": slack_payloads
    }

#--------------------
# ローカルでの検証環境用
#lambda_handler(0,0)
#--------------------
2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2