1
3

More than 3 years have passed since last update.

オープンデータカタログをポーリングして変換したり蓄積したり通知したりする仕掛け

Last updated at Posted at 2020-06-14

オープンデータカタログをポーリングして変換したデータを蓄積したり通知したりする仕掛け

はじめに

以前、静岡県オープンデータカタログ(csv)からCOVID-19対策サイトのデータ(json)に変換して取得するAPI という記事を書きました。
ここで作ったAPIをLambdaでポーリングして、データの変更があったらS3に蓄積して、情報はDynamoDBにも保存して、変更があったことをSlackに通知する、みたいな仕掛けを作りました。ETLごっこです。
なお、浜松市だけでなく、静岡市のデータも監視対象としました。
※この仕掛けは結局日の目を見ることはありませんでしたが、詳細はあとがきに書こうと思います。

Lambdaの関数コード(Python)はこちらのリポジトリで公開しています。
https://github.com/ww2or3ww/covid19_shizuoka-opendata_notifier

Lambdaで定期実行する

トリガーを追加する。
ink (16).png
トリガーの設定をする。
EventBridge (ClowdWatch Event)を選択し、ルールを作成する。
Screenshot 2020-06-14 at 10.24.58.png
スケジュール式の書き方は、ぐぐってください。
Screenshot 2020-06-14 at 10.31.39.png
これだけです。
Screenshot 2020-06-14 at 10.33.34.png

環境変数を利用する

PythonのソースコードをGitHubなどに公開するにあたって、公開したくない情報は環境変数に設定しておきましょう。

Lambdaの関数コードの下にある「環境変数」の編集を選択。
ink (17).png
環境変数のキーと値を追加してゆく。
Screenshot 2020-06-14 at 10.45.22.png
追加される。
Screenshot 2020-06-14 at 10.47.36.png

Pythonのコードから取得するときは以下のような感じです。

api_address = os.environ["API_ADDRESS_CSV2JSON"]

API経由でJSONデータを取得する

このAPI をコールして、JSONデータを取得します。
コードはこんな感じです。

import json
import requests
class CityInfo:
    def __init__(self, city, queryParam):
        self.city = city
        self.queryParam = queryParam
CityInfo(
    "hamamatsu", 
    "main_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,inspection_persons:d4827176-d887-412a-9344-f84f161786a2,contacts:1b57f2c0-081e-4664-ba28-9cce56d0b314"
)
apiResponse = requests.get("{0}?type={1}".format(API_ADDRESS_CSV2JSON, queryParam), auth=AUTH, headers={"x-api-key": API_KEY_CSV2JSON})
retJson = json.loads(apiResponse.text)

DynamoDBで最終更新情報を管理する

テーブル仕様はこのようにしました。
Screenshot 2020-06-14 at 12.31.13.png

city(PK) 都市種別(hamamatsu/shizuoka-shi)
type(SK) データ種別(陽性患者数や、検査実施人数などのタイプ)
id 静岡県オープンデータカタログのID
name typeの名称
path 最新データが保存されているS3のパス(キー)
update そのデータが最後に更新された日時

JSONの各データには最終更新日がありますので、その日時とDynamoDBのupdateとを比較して、更新されていればレコードもUpdateします。(新規データの場合はInsertします。)

import boto3
from boto3.dynamodb.conditions import Key
DYNAMODB_NAME = os.environ["DYNAMODB_NAME"]
DYNAMO_TABLE = boto3.resource("dynamodb").Table(DYNAMODB_NAME)

def processType(cityInfo, retJson, typeId):
    date = retJson[type]["date"]
    listItem = typeId.split(":")
    type = listItem[0]
    id = listItem[1]

    record = selectItem(cityInfo.city, type)
    if(record["Count"] is 0):
        path = uploadFile(cityInfo.city, type, id, date, retJson[type])
        insertItem(cityInfo.city, type, id, date, TYPE_NAME[type], path)
    elif record["Items"][0]["update"] != date:
        path = uploadFile(cityInfo.city, type, id, date, retJson[type])
        updateItem(cityInfo.city, type, id, date, path)

def selectItem(city, type):
    return DYNAMO_TABLE.query(
        KeyConditionExpression=Key("city").eq(city) & Key("type").eq(type)
    )

def insertItem(city, type, id, date, name, path):
    DYNAMO_TABLE.put_item(
      Item = {
        "city": city, 
        "type": type, 
        "id": id, 
        "update" : date, 
        "name" : name, 
        "path" : path
      }
    )

def updateItem(city, type, id, date, path):
    DYNAMO_TABLE.update_item(
        Key={
            "city": city,
            "type": type,
        },
        UpdateExpression="set #update = :update, #path = :path",
        ExpressionAttributeNames={
            "#update": "update", 
            "#path": "path"
        },
        ExpressionAttributeValues={
            ":update": date, 
            ":path": path
        }
    )

最新情報だけではなく、履歴情報についても別テーブルに蓄積するようにしてみたのですが、それについては割愛します。

S3にアップロードする

更新されたデータはS3にアップロードします。

import boto3
S3 = boto3.resource('s3') 
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]

def uploadFile(city, type, id, date, jsonData):
    dt = datetime.strptime(date, '%Y/%m/%d %H:%M')
    path = "data/{0}/{1}/{2}/{3}/{4}/{5}-{6}".format(city, type, dt.year, str(dt.month).zfill(2), str(dt.day).zfill(2), str(dt.hour).zfill(2), str(dt.minute).zfill(2))
    objJson = S3.Object(S3_BUCKET_NAME, "{0}.json".format(path))
    objJson.put(Body = json.dumps(jsonData, ensure_ascii=False, indent=2))
    return path

JSONだけではなく、元データのCSVも取得しにいってJSONとセットで保存していたりもするのですが、割愛します。

S3にはこんな感じで溜まっていってます。
Screenshot 2020-06-14 at 13.52.15.png
パーティショニングについてや、CSVとJSONが同じ場所にないほうが良かったとか、今改めて見るとちょっと微妙な感じなのですが、、。

Slackに通知する

更新があった場合、Slackにその事を通知します。
Pythonのコードは以下のような感じ。

import slackweb
SLACK_WEBHOOK_HAMAMATSU = os.environ["SLACK_WEBHOOK_HAMAMATSU"]

notifyText = "【{0}】\n{1}".format(cityInfo.city, notifyText)
notifyToSlack(SLACK_WEBHOOK_HAMAMATSU, notifyText)
def notifyToSlack(url, text):
    slack = slackweb.Slack(url=url)
    slack.notify(text=text)

slackwebに渡すWebHook URLはどのようにして取得するか?
Incoming WebHooksにアクセス。
ink (19).png
通知する先のChannelを選択して「Add Incoming WebHooks integration」ボタンを押下すると、Webhook URLを入手することができます。
ink (20).png
アイコンや名前もここで変えることができます。

Slackに通知されている様子がこちら。
Screenshot 2020-06-14 at 13.56.26.png

あとがき

最後の投稿(4/15)から2ヶ月経とうとしていることに気付いて、急いで記事を書いてます。
本日6/14、ぎりぎりセーフですね。
この仕掛け自体は4/20頃に出来上がってて、それ以来個人的に稼働させ続けてます。

本当はこの仕掛け、code4hamamatsuのSlackチャンネルに通知して、データ更新を皆さんに伝えられたらと思って作り始めたんです。4月の前半くらいから。

しかし、GitHub Actionsでポーリングして変更があればコミットされて、それでNetlifyのビルドが走って、その結果がSlackチャンネルに通知される、みたいな環境が先に整いましたので、使ってもらう機会はありませんでした。

作ってる途中に「あ、これ要らないじゃん。」ってなって少しテンション落ちたのですが、優先度下げながらもとりあえず動くところまで作った、そして個人的に稼働させてる、みたいな感じです。

せめて皆さんに存在くらいは知ってもらおうと4月末のJAWS-UG浜松 AWS勉強会 2020#4でLTのネタにしようと準備してアジェンダにも上げておいたのですが、時間なくやりませんでした。
Screenshot 2020-06-14 at 09.51.49.png

Qiitaの記事くらいにはしようしようと思っていたのですが、2ヶ月経って今日に至る、みたいな感じです。
ああ!かわいそうな子!!

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3