はじめに
AXIS(アクシス)は、日本国内の気象、災害、交通といった情報をAPIで提供しています。
APIの使用に掛かる料金は無料(!)ですが、利用には会員登録が必要です。
利用者が設定した契約情報はトークンで管理されるのですが、「発行月最終日の23時59分59秒」という有効期限が設けられています(例:3/6発行だと3/31 23:59:59)。
そこでAXISから、現トークンの残り有効日数が7日未満となった場合に限り、新トークンを発行できるAPI(Token Refresh API)が提供されています。
ページの注意書きにもありますが、毎秒、毎分のポーリングはアカウント停止の対象となるようなので、できるだけ負荷を掛けないように新トークンを入手するコードを書いてみました。
前提
pip
で必要なパッケージをインストール
pip install requests
pip install threading
pip install schedule
今回は省略していますが、Websocket用の関数も書くはずなので、マルチスレッドで実行します。
コード
python token_refresh.py
import requests
from threading import Thread
import time
import datetime
import calendar
import schedule
token = ["YOUR_TOKEN"] # 発行したトークンを入力
class ref_input: # 起動時点ですでにトークン更新済みか確認
while True:
ref_check = input("Is your token already refreshed?(y/N)")
if ref_check == "y": # 有効期限が来月末
break
elif ref_check == "N": # 有効期限が今月末
break
else:
print('You can only input "y" or "N"!')
def get_lastdate(dt): # 月の最終日に変換
return dt.replace(day=calendar.monthrange(dt.year, dt.month)[1])
def token_ref():
dt_now = datetime.datetime.now()
if ref_input.ref_check == "y":
dt_format = datetime.datetime(dt_now.year, dt_now.month, dt_now.day+7, 23, 59, 59)
ref_input.ref_check = "N" # 更新完了後は補正を無効化
else:
dt_format = datetime.datetime(dt_now.year, dt_now.month, dt_now.day, 23, 59, 59)
dt_ex = get_lastdate(dt_format) # 現トークンの有効期限
dt_rem = dt_ex - dt_now # 有効期限までの残り日数を計算
if dt_rem.days < 7: # 有効期限7日未満
url = "https://axis.prioris.jp/api/token/refresh/"
jsondata = requests.get(url=url, headers=["Authorization: Bearer %s" % token[0]]).json()
status = jsondata["status"]
if status == "generate a new token":
token.clear()
new_token = jsondata["token"]
token.append(new_token)
dt_ex = dt_ex + datetime.timedelta(days=1) # 1日足して来月へ
dt_ex = get_lastdate(dt_ex) # 新トークンの有効期限
print(f"[axis] Got New Token! {dt_now}")
print(f"Expiration Date: {dt_ex}")
with open("token.txt", "a") as f: # 発行されたトークンをtxtに書き込む
f.write(f"[Generated at {dt_now.strftime("%Y/%m/%d %H:%M:%S")}] {new_token}\n")
else: # 現トークンが有効期限切れ
print(f"[axis] contract has expired. Please check axis website. {dt_now}")
else: # 有効期限7日以上
print(f"[axis] I don't have to refresh now token. {dt_now}")
def schedule_checker():
schedule.every().monday.do(token_ref) # 毎週月曜日にtoken_refを実行
while True:
schedule.run_pending()
time.sleep(5)
if __name__ == "__main__":
ref_input()
Thread(target=schedule_checker).start()
補足
- 上記の
schedule
は起動時期によっては、更新できないこともあるので、必要に応じて書き換えること - Websocketのトークンを自動更新するコードを書く必要がある
おわりに
記事初めて書いてみました! なんだかエンジニア気分笑 ꉂꉂ(˃▿˂๑)