0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

LambdaとCloudWatchEventsを使ってイベント情報を毎日通知してみる。

Posted at

こんにちは

業務でCloudWatchEventsとLambdaを触ったので、自分で何か作れないかと考えた挙句
いろいろな勉強を兼ねてConpass APIを叩いて新着イベント情報を毎日通知するものを作りました。

##やりたいこと
①Lambdaをスケジュールで動かしたい。
②広島県に関連する新着のイベント情報を毎日知らせてほしい。
③今後通知先を増やせるような処理にしたい。
④一度作ったらとりあえずずっと動いててほしい。

##今回使ったもの

###Amazon CloudWatch Events

Amazon CloudWatch Events は、Amazon Web Services (AWS) リソースの変更を示すシステムイベントのほぼリアルタイムのストリームを提供します。すぐに設定できる簡単なルールを使用して、ルールに一致したイベントを 1 つ以上のターゲット関数またはストリームに振り分けることができます。

決まった時間にLambdaを呼び出すために使います。

###AWS Lambda

AWS Lambda は、サーバーのプロビジョニングや管理の必要なしにコードを実行できるコンピューティングサービスです。

今回の主軸となる部分。
イベント情報の取得、LINEの通知、Twitterへのツイート、DynamoDBへの保存をすべて1つのLambda関数で実行します。

###LINE Notify

LINE Notifyとはどんなサービスですか?
Webサービスと連携すると、LINEが提供する公式アカウント「LINE Notify」から通知を受信する事ができるサービスです。

自分自身のLINアカウントにイベント情報を送信します。

###Twitter API

Twitter APIは、Twitterで行われている会話に貢献し、関与し、分析するために必要なツールを提供します。

自身のTwitterアカウントでイベント情報のツイートをおこないます。

##ざっくりな図
connpass_api.jpg

##実装
###Lambda

lambda_function.py
from datetime import datetime, timezone
import os

import boto3
import requests

from models import Connpass, NoticeTable, LineNotice, TwitterNotice

KEYWORD = os.environ.get('KEYWORD')

PARAMS = {
    # イベント開催年月
    'ym': format(datetime.now(timezone.utc), "%Y%m"),
    # 検索結果の表示順 1:更新日時準 2:開催日時準 3:新着順
    'oder': '2',
    # 取得件数
    'count': '100',
    # 検索キーワード
    'keyword': KEYWORD
}

def lambda_handler(event, context):
    connpass = Connpass()
    # イベント情報取得
    events = connpass.get_event_data(PARAMS)

    # イベント情報用テーブル
    table = NoticeTable()

    # 既に通知済みのイベントを削除する
    event_ids = table.get_event_ids()
    for id in event_ids:
        if events.get(id):
            del events[id]

    # LINE通知
    line_notice = LineNotice()
    results = line_notice.send(events)

    # Twitterツイート
    twitter_notice = TwitterNotice()
    twitter_notice.send(events)

    # 通知結果保存
    table.save_items(results)

    # 現在時以前の通知結果を削除
    table.delete_items()

models.py
from abc import ABCMeta, abstractmethod
from datetime import datetime, timezone
import os

import boto3
import requests
from twitter import Twitter, OAuth

today = datetime.now(timezone.utc)

TABLE_NAME = os.environ.get('TABLE_NAME')

# LINE Notify
LINE_TOKEN = os.environ.get('LINE_TOKEN')
LINE_API_URL = os.environ.get('LINE_API_URL')

# Twitter API
TWITTER_ACCESS_TOKEN = os.environ.get('TWITTER_ACCESS_TOKEN')
TWITTER_ACCESS_TOKEN_SECRET = os.environ.get('TWITTER_ACCESS_TOKEN_SECRET')
TWITTER_API_KEY = os.environ.get('TWITTER_API_KEY')
TWITTER_API_SECRET = os.environ.get('TWITTER_API_SECRET')

class Connpass:
    def __init__(self):
        self.url = 'https://connpass.com/api/v1/event?{}'

    def get_event_data(self, params: dict) -> dict:
        url = self.generate_request_url(params)
        # リクエスト
        res = requests.get(url).json()

        # key:イベントID value:イベント情報 の辞書を作成
        events_dict = {}
        events_dict = {str(event['event_id']):event for event in res['events']}

        return self.delete_past_event_ids(events_dict)

    def generate_request_url(self, params: dict) -> str:
        params_text = ''
        for k, v in params.items():
            # パラメータ=値& の形でつなげる
            params_text += f'{k}={v}&'
        # 最後の&を削除して戻す
        return self.url.format(params_text)
    
    def delete_past_event_ids(self, events: dict) -> dict:
        new_events = {}
        for id, event in events.items():
            event_dt = datetime.strptime(event['started_at'], '%Y-%m-%dT%H:%M:%S%z')
            if today <= event_dt:
                new_events[id] = event
        return new_events


class NoticeTable:
    def __init__(self):
        dynamo_db = boto3.resource('dynamodb')
        table = dynamo_db.Table(TABLE_NAME)
        self.table = table
        self.items = table.scan()['Items']

    def get_event_ids(self):
        return [item['Id'] for item in self.items]
 
    def save_items(self, events: dict):
        with self.table.batch_writer() as batch:
            for id, event in events.items():
                batch.put_item(
                    Item={
                        'Id': str(id),
                        'Start': event['started_at']
                    }
                )

    def delete_items(self):
        delete_ids = []
        for item in self.items:
            item_dt = datetime.strptime(item['Start'], '%Y-%m-%dT%H:%M:%S%z')
            if item_dt <= today:
                delete_ids.append(item['Id'])

        for id in delete_ids:
            response = table.delete_item(Key={'Id': id}
            )
            print(id, response['ResponseMetadata']['HTTPStatusCode'])


class Notice(metaclass=ABCMeta):
    @abstractmethod
    def __init__(self):
        pass

    @abstractmethod
    def send(self):
        pass

    def generate_message(self, event: dict) -> str:
        # 文字列をdatetime型へ変換
        start = datetime.strptime(event['started_at'], '%Y-%m-%dT%H:%M:%S%z')
        end = datetime.strptime(event['ended_at'], '%Y-%m-%dT%H:%M:%S%z')

        # 曜日を表示用文字列へ変換
        week_day = {0: '', 1: '', 2: '', 3: '', 4: '', 5: '', 6: ''}
        start_weekday = week_day[start.weekday()]
        end_weekday = week_day[end.weekday()]

        # 日付を表示用文字列へ変換
        start_date = f'{start.year}{start.month}{start.day}日({start_weekday})'
        end_date = f'{end.year}{end.month}{end.day}日({end_weekday})'

        # 時間を表示用文字列へ変換
        end_time = format(end, '%H:%M')
        start_time = format(start, '%H:%M')

        # 開催日時の表示文字列をイベントの日程で変更
        if start.day == end.day:
            # 終了日が開催日と同じ日の場合
            dt = f'{start_date} {start_time} ~ {end_time}'
        else:
            # 終了日が開催日翌日以降の場合
            dt = (
                f'{start_date} {start_time} ~ \n'
                f'    {end_date} {end_time}\n'
            )

        return (
            '\n'
            '【タイトル】\n'
            f'{event["title"]}\n'
            '\n'
            '【日時】\n'
            f'{dt}\n'
            '\n'
            '【場所】\n'
            f'{event["address"]}\n'
            '\n'
            '【会場】\n'
            f'{event["place"]}\n'
            '\n'
            '【定員】\n'
            f'{event["limit"]}\n'
            '\n'
            '【ハッシュタグ】\n'
            f'#{event["hash_tag"]}'
            '\n'
            '【URL】\n'
            f'{event["event_url"]}'
        )


class LineNotice(Notice):
    def __init__(self):
        self.token = LINE_TOKEN
        self.url = LINE_API_URL

    def send(self, events: dict):
        results = {}
        if events:
            # イベントの数だけ内容を送信
            for id, event in events.items():
                send_contents = self.generate_message(event)
                token_dict = {'Authorization': 'Bearer' + ' ' + self.token}
                send_dict = {'message': send_contents}

                res = requests.post(self.url, headers=token_dict, data=send_dict)

                if res.status_code == 200:
                    results[id] = event
                else:
                    print(res.status_code, id)
        else:
            # イベント情報なしとだけ送信
            send_contents = '\n新着イベント情報なし'
            token_dict = {'Authorization': 'Bearer' + ' ' + self.token}
            send_dict = {'message': send_contents}

            res = requests.post(self.url, headers=token_dict, data=send_dict)
        return results


class TwitterNotice(Notice):
    def __init__(self):
        self.api_key = TWITTER_API_KEY
        self.api_secret = TWITTER_API_SECRET
        self.access_token = TWITTER_ACCESS_TOKEN
        self.access_token_secret = TWITTER_ACCESS_TOKEN_SECRET
    
    def send(self, events):
        twitter = Twitter(
            auth=OAuth(
                self.access_token,
                self.access_token_secret,
                self.api_key,
                self.api_secret
            )
        )

        if events:
            # イベントの数だけ内容を送信
            for id, event in events.items():
                send_contents = self.generate_message(event)
                status_update = twitter.statuses.update(status=send_contents)
        else:
            # イベント情報なしとだけ送信
            send_contents = (
                '\n'
                '新着イベント情報なし'
            )
            status_update = twitter.statuses.update(status=send_contents)

もともとLINE通知しか行っていませんでしたが、後にTwitterへのツイートも追加しました。
検索キーワードを広島県で検索し、保存された通知結果と重複しない場合にのみ通知する仕組みです。
検索結果のイベントが過去に通知済みの場合は、新着情報なしの旨を通知します。

Lambda関数は上記のソースファイルと使用している外部ライブラリをzipにまとめてアップロードするだけで作成完了です。(IAM等の設定は割愛
また、Lambdaで外部ライブラリを使う際に以下のコマンドで任意の場所にダウンロードしました。

pip install <ライブラリ名> -t <保存先パス>

lambda環境変数.jpg
各種APIのキー等は上記のように、Lambdaの環境変数に設定しました。
検索用のキーワードもこちらで指定しています。

###CloudWatchEvents
image.png
CLoudWatchEventsの設定はいたってシンプルです。
イベントソースをルールのスケジュール式を参考に、毎朝8時に設定します(GMTなので-9時間の値を設定
ターゲットはLambda関数を指定しただけです。

##参考にしたもの
connpass APIの使い方はこちらを

LINE Notifyは下記のYoutube動画を参考に

Twitterへのツイートは下記の記事を参考に

##LINE通知
image.png

##Twitterツイート
image.png

##最後に
毎朝1回の処理実行なので無料枠で余裕です。
勢いで作った感が否めませんが、自分が作ったものが多少なりとも実感できるのは良いことなのでぜひLambdaで何かを作ってみてはいかがでしょうか。

また、「こうしたほうが良いよ」とかありましたら遠慮なくコメントください!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?