はじめに
Pythonの軽量webフレームワークFlaskとGAE(GoogleAppEngine)を使って気象庁防災情報XML(以下JMX)を受信してみた話です。 「気象庁防災情報XMLフォーマット形式電文の公開(PUSH型)」にJMXの概要が書かれています。 他言語で書かれたものはいくつかあったのですがFlaskをつかったものがなかったので記事にしてみました。FlaskですがPythonのwebフレームワークは有名どころではDjangoがあげられますが、今回の目的物は「XMLを受信するだけ」で機能的に過剰だったのでFlaskを採用しました。
なにつくるの?
JMXはWebSub(旧pubsubhubbub)というプロトコルを用いて防災情報の更新をAtomフィードで通知します。
WebSubは、出版者(publishers)、購読者(subscribers)、ハブ(hubs)の三者で構成される。
最初に購読者はウェブサーバからHTTPリソース(URL)を要求して、そのコンテンツを取得する。次に購読者はレスポンスの内容を調べ、もしそれがハブを参照しているならば、購読者はそのハブ上にあるそのリソースのURL(これは仕様上は「トピック(topic)」と呼ばれる)を購読できる。購読したトピックのうちのいずれかが更新された場合にwebhookの仕組みを使ってハブからの通知を直接受けられるように、購読者はウェブ接続可能なサーバを走らせておく必要がある。出版者は自身のコンテンツをHTTPヘッダ内にハブ参照を含めて公開する。出版者が何かを出版するときは必ずそれらの参照されたハブに通知を流す。つまり、出版イベントが発生した場合、出版者はそのハブに連絡し、そのハブが購読者たちに連絡する。-- wikipedia引用
今回はパブリッシャー(気象庁)からのAtomフィードをハブを介して受け取るためのサブスクライバを作ります。
なぜWebSubなのか
JMXはサブスクライバを構築しなくても簡単に取得できるようになっています。「気象庁防災情報XMLフォーマット形式電文の公開(PULL型)」 例えば、一分間間隔で定期的にこのURLに対してクローリングするスクリプトを走らせていたとします。このサイトが一分間に一回以上の更新があった場合その間のデータの取りこぼしが生じます。いわゆるクライアントからサーバーに対して働きかけるPULL型の通信方式です。
これに対してWebSubはパブリッシャーからの通知を受けて初めてサイトにリクエストするため、速報性が高いことと無駄なリクエストを避けることができます。これがサーバーからクライアントに対して働きかけるPUSH型です。
ちなみにJMXそのものがパブリッシャーから配信される訳ではなく、あくまで情報が更新されたよ〜という atomフィードが来るだけで、サブスクライバにはこのatomフィードの中にあるXMLのURLに自分でリクエストする処理を実装する必要があります。
サブスクライバを構築する
サブスクライバ構築の概要が書いてあるので一読しといたほうがいいと思います。
XML 電文公開(PUSH 型)に係る仕様と Subscriber の構築について
実際に以下の図のようなものを作ります。今回はサブスクライバを構築し、JMXを受信するところまでがゴールです。
-
気象庁からHubへ、HubからサブスクライバへatomフィードがPushされます。atomフィードは以下のようになっています。各情報はentryタグ内にあり、この中のtitleタグを見ればなんの情報なのかを確認することができます。※一つのatomフィードに複数のentryタグが存在する場合があります。
-
JMXはentryタグの中のlinkタグを辿るとURLがあり、これをgetすると取得することができます。取得したJMXをBeautifulSoup等でパースし必要な情報を抜き出します。
-
取り出した情報をTwitter等のapiを使って外部に流します。
atomフィード例
<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="ja">
<title>JMAXML publishing feed</title>
<subtitle>this feed is published by JMA</subtitle>
<updated>2019-06-11T17:28:05+09:00</updated>
<id>urn:uuid:d38e0e80-12ba-3236-b10f-256b78a08995</id>
<link href="http://www.jma.go.jp/" rel="related"/>
<link href="http://xml.kishou.go.jp/feed/other.xml" rel="self"/>
<link href="http://alert-hub.appspot.com/" rel="hub"/>
<rights>Published by Japan Meteorological Agency</rights>
<entry>
<title>全般海上警報(定時)</title>
<id>urn:uuid:6610695c-e3ea-3332-a289-c4f0ce2766e3</id>
<updated>2019-06-11T08:27:44Z</updated>
<author><name>気象庁予報部</name></author>
<link href="http://xml.kishou.go.jp/data/6610695c-e3ea-3332-a289-c4f0ce2766e3.xml" type="application/xml"/>
<content type="text">【全般海上警報】</content>
</entry>
</feed>
実装する最低限の機能+α
- ハブから登録確認及び登録後5日間隔で送られてくるチャレンジコードのgetに対してのレスポンス
- 登録後にpushされるatomフィードに対してのレスポンス
- 受け取ったatomフィードに記載されているJMXのURLの取り出し及びそれに対してのgetリクエスト
- JMXのパース
ハブからサブスクライバへのリクエスト
GETリクエストパラメータ(一部)
パラメーター名 | 内容 |
---|---|
hub.verify_token | 登録申請時に指定した場合はこれに値が入る。セキュリティ対策のために指定したほうが良い |
hub.challenge | ランダムな文字列。サブスクライバ生存確認のためのチャレンジコード。これをそっくりそのままレスポンスとして返す。一番大事。 |
hub.topic | これが送られてきた。 http://xml.kishou.go.jp/feed/extra.xml |
hub.mode | 登録時はsubscribe。登録解除時はunsubscribeが入る。 |
hub.lease_seconds | 次回のGETリクエストまでの秒数(432000秒 = 5日後) |
POSTリクエストヘッダー(一部)
ヘッダー名 | 内容 |
---|---|
X-Hub-Signature | 指定したverify_tokenがキーとして生成されたSHA-1ハッシュ関数。真に気象庁からの情報なのかを判断するのに使えそう。使い方がわからないので誰か教えて... |
コード
import os
import requests
from bs4 import BeautifulSoup
from flask import Flask, request, Response
app = Flask(__name__)
VERIFY_TOKEN = os.environ.get('VERIFY_TOKEN')
# -- ルーティン処理 -----------------------------------------------------------------------------------------
# GETリクエスト(5日間隔で飛んでくるチャレンジコードを返す)
# hub.modeがsubscribeかunsubscribeだったらで条件分岐
# hub.verify_tokenが自分のVERIFY_TOKENと一致したらで条件分岐
# hub.challengeに改行コードが付いていると登録できないのでもし付いていたら消す処理をする
# レスポンスヘッダーのContent-Typeをtext/plainにしてhub.challengeを返す
@app.route('/sub', methods=['GET'])
def get():
verify_token = request.args.get('hub.verify_token')
challenge = request.args.get('hub.challenge')
mode = request.args.get('hub.mode')
if mode == 'subscribe' or mode == 'unsubscribe':
if verify_token == VERIFY_TOKEN:
result = None if challenge == None else challenge.replace('\n', '') if '\n' in challenge else challenge
r = Response(response=result, status=200)
r.headers['Content-Type'] = 'text/plain'
return r
else:
return Response('Bad request!', 404)
else:
return Response('Bad request!', 404)
# POSTリクエスト(申請が受理されると飛んでくるAtomフィードに対しての処理)
# AtomフィードをBeautifulSoupに入れXMLをパース、中に入っているJMXのURLを取り出す
# urlsに取りだ出したJMXのURLが格納される
@app.route('/sub', methods=['POST'])
def post():
sha1 = request.headers.get('X-Hub-Signature')
if sha1 != None:
data = request.get_data(as_text=True)
sig = 'sha1=' + hmac.new(bytes(VERIFY_TOKEN, 'UTF-8'), bytes(data, 'UTF-8'), hashlib.sha1).hexdigest()
if sig == sha1:
#処理を書く
#Atomはdataに入っているのでBeautifulSoupなのでパースして遊ぶ
soup = BeautifulSoup(data, 'lxml')
entry = soup.find_all('entry')
urls = i.find('link').get('href')
return Response(response='ok', status=200)
else:
return Response(response='Bad request!', status=404)
else:
return Response(response='Bad request!', status=404)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, threaded=True)
重要なのはGETリクエストに対してのレスポンスです。これはリクエストヘッダーのhub.challengeの内容をそのまま返すことで完了します。購読意思確認のためなので応答がないと登録できません。これは5日間隔で送られてきます。それ以外のGETリクエストに対しては404を返します。
登録申請が受理されると実際に以下のようなatomフィードがPOSTされます。atomフィードはリクエストボディの中に入っています。JMXのURLはAtomフィードの<entry>
内にある<link>
をたどると見つけることができます。このatomフィードをBeautifulSoupで解析、JMXがあるURLを抽出します。
本来はX-Hub-Signatureを解析して自分のVERIFY_TOKENと照らし合わせて真に気象庁からのPOSTなのかを判断すべきですが、実装が思いつかなかったのでどなたかわかるかたがいらしたらプルリクください! 2019-02-02 解決済み
JMXのURLを取得するところまでで今回の目標は達成ということで、あとは煮るなり焼くなり。。。。
デプロイから申請まで
今回はgithubからクローンしてそのままデプロイ、登録申請を行えるテンプレートパッケージをつくってみました。jmx_subscriber_template
1. パッケージをcloneする
githubからcloneします。
$ git clone https://github.com/0x0u/jmx_subscriber_template
$ cd jmx_subscriber_template
中身は以下のようになっています。GAEにデプロイするために必要なファイル群です。
jmx_subscriber_template
├── app.yaml
├── main.py
├── requirements.txt
├── secret.yaml
├── .gitignore
└── .gcloudignore
2. VERIFY_TOKENを設定する
jmx_subscriber_templateディレクトリ内でsecret.yamlのhogehoge部を自分で書き換えます。気象庁に登録申請する際に届け出るものと同じものを用意します。main.pyのVERIFY_TOKENが拾います。環境変数は全てこのファイルに記述します。
env_variables:
VERIFY_TOKEN: "hogehoge"
3. GAEにデプロイする
GAEとgcloudコマンドが実行できるようにしておくのが前提条件です。以下のコマンドでデプロイします。
$ gcloud app deploy app.yaml
4. 申請する
ユーザー登録についてに従い申請します。登録様式をダウンロードして、2で設定したVERIFY_TOKENと3でデプロイしたサブスクライバURL(https://プロジェクトID.appspot.com/sub)、その他を記述してメールに添付して送信すれば申請完了です。
実際に受信してみた
jmx_subscriber_templateと同一のソースコードで受信したものです。一応無料枠に収めることができました。アプリケーション及びサーバーエラーはありませんでした。
追記
2019-02-02 編集
POSTリクエストに対しての処理でVERIFY_TOKENを秘密鍵、リクエストボディをメッセージとしてHMACを発行しリクエストヘッダーのX-Hub-Signatureと一致したら...という条件分岐を追加。
参考文献
- httpのしくみ
- HTTPとPOSTとGET
- 他言語でのサブスクライバー構築
- 気象庁の防災情報XMLを受信するぞ(Perl)
- Ruby on Rails - PubSubHubbub Subscriber 実装!
- 気象庁の防災情報をリアルタイムで受け取る(Python Django)
- Slackに気象庁の情報(災害情報)を投下するHubotを作った(JavaScript)
- Flask関連
- FlaskでシンプルにContent-Typeをチェックする(@content_type)
- GAE関連
- Google App Engineを無料で運用する方法(2018年版)
- GoogleAppEngineスタンダード環境のPython3サポートはもう少し話題になっていいと思う