概要
毎朝、天気をチェックしたりニュースを見たり...といろいろ大変なので、朝に知りたいことを自動でLINEで教えてくれるプログラムを作ります。
具体的には、1.天気と2.経済指標を通知します。
- 天気に関しては、私の場合はすみかも職場も東京なので東京の天気を知らせます。
- 経済指標に関しては、私たち日本人が眠っている間に取引が行われる米国市場の代表的な経済指標、ダウ経済指標を通知します。
実装方法に関しては、AWSのLambdaにプログラムをアップロードし、CloudWatchでそれを定期的に実行させることにします。
プログラムはPythonで書きます。
Messaging APIの利用
LINEでメッセージを送るには、Messaging APIというAPIが必要になります。
こちらのLINE公式のドキュメントに、利用の仕方が書いてあります。
プログラムの作成
このプログラムでは、①天気情報と②経済指標の情報を外部から取ってくる必要があります。
- 天気情報は、Open Weather Mapから取ってきます。
- 経済指標は、Alpha Vantageから取ってきます。
それぞれ、リンク先に行けば無料でAPIキーが取得できます。
これらを利用したプログラムは、以下の通りです。
LineBot.py
# -*- coding: utf-8 -*-
from linebot import LineBotApi
from linebot.models import TextSendMessage
from linebot.exceptions import LineBotApiError
import requests
import os
#LINEでメッセージを送るためのキー
line_bot_key = os.environ["line_bot_key"]
#送信先のLINEのID
line_user_key = os.environ["line_user_key"]
#Open Weather MapのAPIキー
weather_api_key = os.environ["weather_api_key"]
#Alpha VantageのAPIキー
finance_api_key = os.environ["finance_api_key"]
def push_info(json_input, context):
"""
Lambdaから呼ばれて実行される関数
"""
line_bot_api = LineBotApi(line_bot_key)
text_message = get_text_message()
#一応送信内容はログに出しておきます
print(text_message)
try:
line_bot_api.push_message(line_user_key, TextSendMessage(text=text_message))
except LineBotApiError as e:
#エラーが起こり送信できなかった場合
print(e)
def get_text_message():
"""
送信するテキストを作成する関数
"""
weather = get_weather()
NY_Dow_chg, NY_Dow_price = get_NY_Dow()
text_message = "おはようございます。\n"
#もし天気情報が取得できなかったら、謝る
text_message += "今日の天気は%sです。\n" % weather if (weather != None) else "ごめんなさい、天気の情報が取得できませんでした。\n"
#もし金融情報が取得できなかったら、謝る
text_message += "昨日のNYダウは%sドルの%sドルです。" % (NY_Dow_chg, NY_Dow_price) if (NY_Dow_chg != None and NY_Dow_price != None) else "ごめんなさい、金融情報が取得できませんでした。"
return text_message
def get_weather():
"""
天気を取得する関数
"""
#Open Weather MapのAPIを利用するためのurl(latとlonに、それぞれ緯度と経度を入れる)
#以下の例だと、東京都中央区の天気を取得します
url = "http://api.openweathermap.org/data/2.5/weather?lat=35.67&lon=139.77&appid=" + weather_api_key
json_data = get_api(url)
try:
weather_data = json_data["weather"][0]["main"]
except KeyError:
return None
#天気の英語と日本語を、辞書型で保持
weather_dict = {"Clouds":"曇り","Clear":"晴れ","Rain":"雨"}
try:
return weather_dict[weather_data]
except KeyError:
#天気を翻訳できなかった場合
print("New weather detected: %s" % wether_data)
return weather_data
def get_NY_Dow():
"""
金融情報を取得する関数
"""
#Alpha VantageのAPIを利用するためのurl
url = "https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol=DJI&apikey=" + finance_api_key
json_data = get_api(url)
try:
#前日比を、小数点第3位を四捨五入
price_chg = round(float(json_data["Global Quote"]["09. change"]) * 100) / 100
#前日比を文字列に変え、正であれば「プラス」、負であれば「マイナス」を付け足す
price_chg = "プラス" + str(price_chg) if price_chg > 0 else "マイナス" + str(price_chg)[1:]
#終値を、小数点第3位で四捨五入
price = round(float(json_data["Global Quote"]["05. price"]) * 100 / 100)
#終値を文字列に変換
price = str(price)
return price_chg, price
except KeyError:
print("An error occured while decoding NY Dow data")
return None, None
def get_api(url):
"""
APIを利用するための関数
"""
#リクエストを送信
header = {"content-type": "application/json"}
r = requests.get(url,headers=header)
data = r.json()
return data
毎朝定期実行する
上記のプログラムをLambdaにアップロードしたら、それを毎朝定期的に実行するようにします。
Cloud Watchを開き、以下の「ルール」をクリックします。
開いた画面で「ルールの作成」をクリックしたら、以下の画面のように設定していきます。
- 「スケジュール」を選択し、定期的に実行をするようにします
- Cron式で実行する時間を指定します
- 定期的に実行する、Lambda関数(ここでは、上記のコードをアップロードしたもの)を指定します
これで、毎朝自動で天気と経済指標を教えてくれるようになります。