オープンデータカタログをポーリングして変換したデータを蓄積したり通知したりする仕掛け
はじめに
以前、静岡県オープンデータカタログ(csv)からCOVID-19対策サイトのデータ(json)に変換して取得するAPI という記事を書きました。
ここで作ったAPIをLambdaでポーリングして、データの変更があったらS3に蓄積して、情報はDynamoDBにも保存して、変更があったことをSlackに通知する、みたいな仕掛けを作りました。ETLごっこです。
なお、浜松市だけでなく、静岡市のデータも監視対象としました。
※この仕掛けは結局日の目を見ることはありませんでしたが、詳細はあとがきに書こうと思います。
Lambdaの関数コード(Python)はこちらのリポジトリで公開しています。
https://github.com/ww2or3ww/covid19_shizuoka-opendata_notifier
Lambdaで定期実行する
トリガーを追加する。
トリガーの設定をする。
EventBridge (ClowdWatch Event)を選択し、ルールを作成する。
スケジュール式の書き方は、ぐぐってください。
これだけです。
環境変数を利用する
PythonのソースコードをGitHubなどに公開するにあたって、公開したくない情報は環境変数に設定しておきましょう。
Lambdaの関数コードの下にある「環境変数」の編集を選択。
環境変数のキーと値を追加してゆく。
追加される。
Pythonのコードから取得するときは以下のような感じです。
api_address = os.environ["API_ADDRESS_CSV2JSON"]
API経由でJSONデータを取得する
このAPI をコールして、JSONデータを取得します。
コードはこんな感じです。
import json
import requests
class CityInfo:
def __init__(self, city, queryParam):
self.city = city
self.queryParam = queryParam
CityInfo(
"hamamatsu",
"main_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,inspection_persons:d4827176-d887-412a-9344-f84f161786a2,contacts:1b57f2c0-081e-4664-ba28-9cce56d0b314"
)
apiResponse = requests.get("{0}?type={1}".format(API_ADDRESS_CSV2JSON, queryParam), auth=AUTH, headers={"x-api-key": API_KEY_CSV2JSON})
retJson = json.loads(apiResponse.text)
DynamoDBで最終更新情報を管理する
city(PK) | 都市種別(hamamatsu/shizuoka-shi) |
type(SK) | データ種別(陽性患者数や、検査実施人数などのタイプ) |
id | 静岡県オープンデータカタログのID |
name | typeの名称 |
path | 最新データが保存されているS3のパス(キー) |
update | そのデータが最後に更新された日時 |
JSONの各データには最終更新日がありますので、その日時とDynamoDBのupdateとを比較して、更新されていればレコードもUpdateします。(新規データの場合はInsertします。)
import boto3
from boto3.dynamodb.conditions import Key
DYNAMODB_NAME = os.environ["DYNAMODB_NAME"]
DYNAMO_TABLE = boto3.resource("dynamodb").Table(DYNAMODB_NAME)
def processType(cityInfo, retJson, typeId):
date = retJson[type]["date"]
listItem = typeId.split(":")
type = listItem[0]
id = listItem[1]
record = selectItem(cityInfo.city, type)
if(record["Count"] is 0):
path = uploadFile(cityInfo.city, type, id, date, retJson[type])
insertItem(cityInfo.city, type, id, date, TYPE_NAME[type], path)
elif record["Items"][0]["update"] != date:
path = uploadFile(cityInfo.city, type, id, date, retJson[type])
updateItem(cityInfo.city, type, id, date, path)
def selectItem(city, type):
return DYNAMO_TABLE.query(
KeyConditionExpression=Key("city").eq(city) & Key("type").eq(type)
)
def insertItem(city, type, id, date, name, path):
DYNAMO_TABLE.put_item(
Item = {
"city": city,
"type": type,
"id": id,
"update" : date,
"name" : name,
"path" : path
}
)
def updateItem(city, type, id, date, path):
DYNAMO_TABLE.update_item(
Key={
"city": city,
"type": type,
},
UpdateExpression="set #update = :update, #path = :path",
ExpressionAttributeNames={
"#update": "update",
"#path": "path"
},
ExpressionAttributeValues={
":update": date,
":path": path
}
)
最新情報だけではなく、履歴情報についても別テーブルに蓄積するようにしてみたのですが、それについては割愛します。
S3にアップロードする
更新されたデータはS3にアップロードします。
import boto3
S3 = boto3.resource('s3')
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]
def uploadFile(city, type, id, date, jsonData):
dt = datetime.strptime(date, '%Y/%m/%d %H:%M')
path = "data/{0}/{1}/{2}/{3}/{4}/{5}-{6}".format(city, type, dt.year, str(dt.month).zfill(2), str(dt.day).zfill(2), str(dt.hour).zfill(2), str(dt.minute).zfill(2))
objJson = S3.Object(S3_BUCKET_NAME, "{0}.json".format(path))
objJson.put(Body = json.dumps(jsonData, ensure_ascii=False, indent=2))
return path
JSONだけではなく、元データのCSVも取得しにいってJSONとセットで保存していたりもするのですが、割愛します。
S3にはこんな感じで溜まっていってます。
パーティショニングについてや、CSVとJSONが同じ場所にないほうが良かったとか、今改めて見るとちょっと微妙な感じなのですが、、。
Slackに通知する
更新があった場合、Slackにその事を通知します。
Pythonのコードは以下のような感じ。
import slackweb
SLACK_WEBHOOK_HAMAMATSU = os.environ["SLACK_WEBHOOK_HAMAMATSU"]
notifyText = "【{0}】\n{1}".format(cityInfo.city, notifyText)
notifyToSlack(SLACK_WEBHOOK_HAMAMATSU, notifyText)
def notifyToSlack(url, text):
slack = slackweb.Slack(url=url)
slack.notify(text=text)
slackwebに渡すWebHook URLはどのようにして取得するか?
Incoming WebHooksにアクセス。
通知する先のChannelを選択して「Add Incoming WebHooks integration」ボタンを押下すると、Webhook URLを入手することができます。
アイコンや名前もここで変えることができます。
あとがき
最後の投稿(4/15)から2ヶ月経とうとしていることに気付いて、急いで記事を書いてます。
本日6/14、ぎりぎりセーフですね。
この仕掛け自体は4/20頃に出来上がってて、それ以来個人的に稼働させ続けてます。
本当はこの仕掛け、code4hamamatsuのSlackチャンネルに通知して、データ更新を皆さんに伝えられたらと思って作り始めたんです。4月の前半くらいから。
しかし、GitHub Actionsでポーリングして変更があればコミットされて、それでNetlifyのビルドが走って、その結果がSlackチャンネルに通知される、みたいな環境が先に整いましたので、使ってもらう機会はありませんでした。
作ってる途中に「あ、これ要らないじゃん。」ってなって少しテンション落ちたのですが、優先度下げながらもとりあえず動くところまで作った、そして個人的に稼働させてる、みたいな感じです。
せめて皆さんに存在くらいは知ってもらおうと4月末のJAWS-UG浜松 AWS勉強会 2020#4でLTのネタにしようと準備してアジェンダにも上げておいたのですが、時間なくやりませんでした。
Qiitaの記事くらいにはしようしようと思っていたのですが、2ヶ月経って今日に至る、みたいな感じです。
ああ!かわいそうな子!!