LoginSignup
5
5

Raspberry Piに現在時刻+時報ボイス、天気予報、ニュースを喋らせる(2)

Last updated at Posted at 2023-06-10

初めに

普段ラズパイをサーバーとして利用しているのですが、OSをbuster(32bit)からbullseye(64bit)に再構築した際せっかくだし前回作った奴を改良し天気に関する情報はすべて気象庁からデータを取得することにしました。(ついでにВерныйの時報ボイスも再生させることにします。)
時報ボイスは毎時間再生し、天気予報とかは6-22時の間喋らせることにしました。

使用環境

・Raspberry pi 3B+
・OS:Raspberry pi OS(bullseye 64bit)

事前準備

・下記の記事を参考にAquasTalkを利用した"atalk"コマンドを使えるようにしてください。
raspberry piの音周り
・mpg321をインストールしてください(参考)
・時報ボイスをそれぞれ0~23.mp3として保存
・市町村区のコード,アメダスNo.を以下を参考に取得してください。

市町村区のコードの取得

このページから地方エリアのコードと市町村区のコードを取得しておきます。
例えば東京都千代田区の場合、関東甲信地方の東京都東京地方であり、そのあと千代田区を選択した際URLのclass20s&area_code=""の部分が市町村区のコードとなります。東京都千代田区は1310100になります。

アメダスNo.の取得

ここからもっとも近い場所のアメダスを選択してください。選択した際のURLのamdno=""がアメダスNo.となります。東京都千代田区の場合,44132です。
https://www.jma.go.jp/bosai/amedas/const/amedastable.json から探すこともできます。

実装

・適当な場所に以下のファイルを作ってください。

transweather.py
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

def WDR_INFO(code):
        WDR_INFO = {
                "0":"静穏",
                "1":"北北東",
                "2":"北東",
                "3":"東北東",
                "4":"",
                "5":"東南東",
                "6":"南東",
                "7":"南南東",
                "8":"",
                "9":"南南西",
                "10":"南西",
                "11":"西南西",
                "12":"西",
                "13":"西北西",
                "14":"北西",
                "15":"北北西",
                "16":""}
        return WDR_INFO[f"{code}"]

def wspeed(ws):
        if 0 <= ws <= 0.3:
                return (0, u"静穏")
        elif 0.3 <= ws < 1.6:
                return (1, u"至軽風")
        elif 1.6 <= ws < 3.4:
                return (2, u"軽風")
        elif 3.4 <= ws < 5.5:
                return (3, u"軟風")
        elif 5.5 <= ws < 8.0:
                return (4, u"和風")
        elif 8.0 <= ws < 10.8:
                return (5, u"疾風")
        elif 10.8 <= ws < 13.9:
                return (6, u"雄風")
        elif 13.9 <= ws < 17.2:
                return (7, u"強風")
        elif 17.2 <= ws < 20.8:
                return (8, u"疾強風")
        elif 20.8 <= ws < 24.5:
                return (9, u"大強風")
        elif 24.5 <= ws < 28.5:
                return (10, u"暴風")
        elif 28.5 <= ws < 32.7:
                return (11, u"烈風")
        elif 32.7 <= ws:
                return (12, u"颶風")

上記のファイルと同じディレクトリに以下のファイルを作ってください。

jma_talkweather.py
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import shlex
import subprocess
from datetime import datetime
from time import time
import urllib.request
import feedparser
import os.path
import json
import threading
import sys
import re
from bs4 import BeautifulSoup as bs
import transweather

LOAD_MP3 = "mpg321 -q"
CMD_SAY = "atalk"

#東京都千代田区
CLASS_AREA_CODE = "1310100" # 市町村区のコード
AMDNO = "44132" #アメダスNo.

da = datetime.now()
area_data = urllib.request.urlopen('https://www.jma.go.jp/bosai/common/const/area.json')
area_data = json.loads(area_data.read())
area = area_data["class20s"][CLASS_AREA_CODE]["name"]
class15s_area_code = area_data['class20s'][CLASS_AREA_CODE]['parent']
class10s_area_code = area_data['class15s'][class15s_area_code]['parent']
offices_area_code = area_data['class10s'][class10s_area_code]['parent']

def main():
        load_mp3()
        say_datetime()
        say_weather()
        say_news()
        say_datetime2()
        return

def jma():
                global jma_data
                #気象警報・注意報
                #コードと名称の取得
                warning_info_url = "https://www.jma.go.jp/bosai/warning/#area_type=class20s&area_code=%s&lang=ja" % (CLASS_AREA_CODE)
                warnings_soup = bs(urllib.request.urlopen(warning_info_url).read(), 'html.parser')
                warnings_contents = warnings_soup.find_all('script')[10]
                warnings_text_list = str(warnings_contents).split("},")
                warnings_list = [re.findall(r'\w+', warning) for warning in warnings_text_list if ':{name1:"' in warning]
                trans_warning = {}
                for warning_datas in warnings_list:
                        warning_text = ''
                        for warning_data in warning_datas:
                                if warning_data in 'elem':
                                        break
                                if warning_data in ['c', 'name1', 'name2']:
                                        continue
                                if warning_data.isdecimal():
                                        warning_code = '\\' + warning_data
                                        continue
                                warning_text += warning_data.encode('ascii').decode('unicode-escape')
                        trans_warning[warning_code] = warning_text
                #情報の取得
                warning_url = "https://www.jma.go.jp/bosai/warning/data/warning/%s.json" % (offices_area_code)
                warning_info = urllib.request.urlopen(url=warning_url)
                warning_info = json.loads(warning_info.read())
                warning_codes = [warning["code"]
                        for class_area in warning_info["areaTypes"][1]["areas"]
                        if class_area["code"] == CLASS_AREA_CODE
                        for warning in class_area["warnings"]
                        if warning["status"] != "解除" and warning["status"] != "発表警報・注意報はなし"]
                warning_texts = [trans_warning[code] for code in warning_codes]

                #現在の気温・湿度
                amedas_da = da.timestamp() - 600
                amedas_da = datetime.fromtimestamp(amedas_da)
                amedas_time = f"{(amedas_da.hour//3)*3:02}"
                temp_url = "https://www.jma.go.jp/bosai/amedas/data/point/%s/%s_%s.json" % (AMDNO, amedas_da.strftime("%Y%m%d"), amedas_time)
                temp_data = urllib.request.urlopen(url=temp_url)
                temp_data = json.loads(temp_data.read())
                last_time = [timelist for timelist in temp_data][-1]
                now_temp = temp_data[last_time]["temp"][0]
                now_humidity = temp_data[last_time]["humidity"][0]
                if temp_data[last_time]["windDirection"][0] == 0:
                        now_windDirection = []
                else:
                        now_windDirection = transweather.WDR_INFO(temp_data[last_time]["windDirection"][0]) + "の風"
                now_wind_speed = temp_data[last_time]["wind"][0]

                #明日の風速
                tommorow_wind_speed_url = "https://www.jma.go.jp/bosai/jmatile/data/wdist/VPFD/%s.json" % (class10s_area_code)
                tommorow_wind_speed_data = urllib.request.urlopen(url=tommorow_wind_speed_url)
                tommorow_wind_speed_data = json.loads(tommorow_wind_speed_data.read())
                for counter,weather_time in enumerate(tommorow_wind_speed_data["areaTimeSeries"]["timeDefines"]):
                        if  da.strftime("%Y-%m-%d") not in weather_time["dateTime"]:
                                weather_time = counter
                                break
                tommorow_wind_speed = 0
                for counter,tommorow_wind_speeds in enumerate(tommorow_wind_speed_data["areaTimeSeries"]["wind"][weather_time::]):
                        idx = tommorow_wind_speeds["range"].find(" ")
                        tommorow_wind_speed = tommorow_wind_speed + int(tommorow_wind_speeds["range"][idx+1:])
                tommorow_wind_speed = round(tommorow_wind_speed/counter + 1)

                #天気&明日の風向き
                weather_url = "https://www.jma.go.jp/bosai/forecast/data/forecast/%s.json" % (offices_area_code)
                weather_data =urllib.request.urlopen(url=weather_url)
                weather_data = json.loads(weather_data.read())
                if da.strftime("%Y-%m-%d") in weather_data[0]["timeSeries"][0]["timeDefines"][0]:
                        today = 0
                        tommorow = today + 1
                else:
                        today = 1
                        tommorow = today + 1
                weathers = [(area_number,weather) for area_number,weather in enumerate(weather_data[0]["timeSeries"][0]["areas"])
                        if weather["area"]["code"] == class10s_area_code]
                today_weather = weathers[0][1]["weathers"][today]
                tommorow_weather = weathers[0][1]["weathers"][tommorow]
                tommorow_windDirection = weathers[0][1]["winds"][tommorow]
                today = ""
                for counter,i in enumerate(weather_data[0]["timeSeries"][2]["timeDefines"]):
                        if da.strftime("%Y-%m-%d") in i and today == "":
                                today = counter
                                tommorow = today + 2
                if today == "":
                        tommorow = 0
                if tommorow > counter:
                        tommorow = []
                temps = [area["temps"] for area in weather_data[0]["timeSeries"][2]["areas"]
                        if area["area"]["code"] == AMDNO]
                if tommorow != 0:
                        today_min = temps[0][today]
                        today_max = temps[0][today + 1]
                        if today_min == today_max:
                                today_min = []
                else:
                        today_min = today_max = []
                if tommorow !=[]:
                        tommorow_min = temps[0][tommorow]
                        tommorow_max = temps[0][tommorow + 1]
                else:
                        tommorow_min = tommorow_max = []

                #降水確率
                last_da = da.timestamp() - 21600
                last_da = datetime.fromtimestamp(last_da)
                last_time = f"{((last_da.hour)//6)*6:02}"
                now_da = da.timestamp() + 21600
                now_da = datetime.fromtimestamp(now_da)
                now_time = f"{((now_da.hour)//6)*6:02}"
                now_pop = 0
                tommorow_pop = 0
                for counter,timeDegines in enumerate(weather_data[0]["timeSeries"][1]["timeDefines"]):
                        if now_da.strftime("%Y-%m-%d") + "T" + now_time in timeDegines:
                                now_pop = weather_data[0]["timeSeries"][1]["areas"][weathers[0][0]]["pops"][counter]
                        if now_da.strftime("%Y-%m-%d") not in timeDegines and last_da.strftime("%Y-%m-%d") + "T" + last_time not in timeDegines:
                                break
                if counter + 1 == len(weather_data[0]["timeSeries"][1]["timeDefines"]):
                        tommorow_pop = []
                else:
                        tommorow_pop = max(weather_data[0]["timeSeries"][1]["areas"][weathers[0][0]]["pops"][counter::])

                #出力
                jma_data = {
                        "warning_texts":warning_texts,
                        "now_temp":now_temp,
                        "now_humidity":now_humidity,
                        "now_windDirection":now_windDirection,
                        "now_wind_speed":now_wind_speed,
                        "tommorow_windDirection":tommorow_windDirection,
                        "tommorow_wind_speed":tommorow_wind_speed,
                        "today_weather":today_weather,
                        "tommorow_weather":tommorow_weather,
                        "today_min":today_min,
                        "today_max":today_max,
                        "tommorow_min":tommorow_min,
                        "tommorow_max":tommorow_max,
                        "now_pop":now_pop,
                        "tommorow_pop":tommorow_pop
                }
                return jma_data

def load_mp3():
        text = "/時報ボイスのディレクトリのパス/%s.mp3" % (da.hour)
        if 1<= da.hour <=4:
                text = LOAD_MP3 + " -g 40 " + text
        else:
                text = LOAD_MP3 + " " + text
        print(text)
        proc = subprocess.Popen(shlex.split(text))
        proc.communicate()
        return

def say_datetime():
        if 6<= da.hour <=22:
                text = "天気予報です。%s年%s月%s日、%s時をお知らせします。" % (da.year, da.month, da.day, da.hour)
                text = CMD_SAY + ' ' + text
                print(text)
                proc = subprocess.Popen(shlex.split(text))
                proc.communicate()
                return
        else:
                exit()

def say_weather():
        try:
                title_text = u'%sの天気' % (area)
                weather_text = u'%sは%s、風力%s。%s。天気は%s。'
                pop_text = '降水確率は%s%です。'
                now_text = u'現在の気温は%s度、湿度%s%です。'
                temperature_text = u'%sの予想最高気温は、%s度。予想最低気温は、%s度です。'

                #気象警報
                if jma_data["warning_texts"] == []:
                        warning_text = "現在発表されている気象警報・注意報はありません。"
                else:
                        warning_text = "現在、%sが発表されています。" % ("".join(jma_data["warning_texts"]))

                #今日
                now_pop = jma_data["now_pop"]
                wind_deg_now = jma_data["now_windDirection"]
                (wind_speed_now,wind_speed_name) = transweather.wspeed(jma_data["now_wind_speed"])
                today_w = jma_data["today_weather"]
                today_pop_txt = pop_text % (now_pop)
                today_weather_txt = weather_text % (u"今日", jma_data["now_windDirection"], wind_speed_now, wind_speed_name, today_w)
                if jma_data["today_min"] == [] and jma_data["today_min"] != jma_data["today_max"]:
                        today_temp_txt = u'今日の予想最高気温は、%s度です。' % (jma_data["today_max"])
                elif jma_data["today_min"] == jma_data["today_max"]:
                        today_temp_txt = ""
                else:
                        today_temp_txt = temperature_text % (u"今日", jma_data["today_max"], jma_data["today_min"])
                today_now_txt = now_text % (jma_data["now_temp"], jma_data["now_humidity"])

                #明日
                tommorow_pop = jma_data["tommorow_pop"]
                wind_deg_tommorow = jma_data["tommorow_windDirection"]
                (wind_speed_tommorow,wind_speed_name) = transweather.wspeed(jma_data["tommorow_wind_speed"])
                tommorow_weather = jma_data["tommorow_weather"]
                if tommorow_pop == []:
                        tommorow_pop_txt = ""
                else:
                        tommorow_pop_txt = pop_text % (tommorow_pop)
                tommorow_weather_txt = weather_text % (u"明日", wind_deg_tommorow, wind_speed_tommorow, wind_speed_name, tommorow_weather)
                if jma_data["tommorow_max"] == jma_data["tommorow_min"]:
                        tommorow_temp_txt = ""
                else:
                        tommorow_temp_txt = temperature_text % (u"明日", jma_data["tommorow_max"], jma_data["tommorow_min"])

                #実行
                weather_text = (title_text + ' ' + today_weather_txt + ' ' + today_pop_txt + ' ' + warning_text + ' ' + today_now_txt + ' ' + today_temp_txt + ' ' + tommorow_weather_txt + ' ' + tommorow_pop_txt + ' ' + tommorow_temp_txt).replace("-", "マイナス")
                say_text = '''%s '%s' ''' % (CMD_SAY + " -b -s 90", weather_text)
        except:
                print("エラーが出ました。:", sys.exc_info()[0])
                say_text = "atalk エラー発生。ログを確認してください"
        finally:
                print(say_text)
                proc = subprocess.Popen(shlex.split(say_text))
                proc.communicate()
        return

def say_news():
        news_text = ""
        news = ["0", "4"]
        for loop in news:
                RSS_URL = 'https://www.nhk.or.jp/rss/news/cat%s.xml' % (loop)
                news_data = feedparser.parse(RSS_URL)
                for i, entry in enumerate(news_data.entries):
                        newstime = ",".join(map(str, entry.published_parsed[:5]))
                        newstime = datetime.strptime(newstime, "%Y,%m,%d,%H,%M")
                        if i == 0 and loop == news[0]:
                                say_text = "続いてニュースです。" + entry.summary
                        else:
                                say_text = "次のニュースです。" +  entry.summary
                        say_text = '''%s '%s' ''' % (CMD_SAY, say_text)
                        if da.timestamp() - newstime.timestamp() + 32400 <= 86400 and entry.summary not in news_text:
                                print(say_text)
                                proc = subprocess.Popen(shlex.split(say_text))
                                proc.communicate()
                                news_text = news_text + entry.summary
        print(news_text)
        return

def say_datetime2():
        da = datetime.now()
        text = "%s時%s分です。" % (da.hour, da.minute)
        text = CMD_SAY + ' ' + text
        print(text)
        proc = subprocess.Popen(shlex.split(text))
        proc.communicate()
        return

### Execute
if __name__ == "__main__":
        th1 = threading.Thread(target=jma)
        th2 = threading.Thread(target=main)
        th1.start()
        th2.start()

実際に実行してみるとこんな感じになると思います。

python jma_talkweather.py

atalk 天気予報です。2023年6月10日、13時をお知らせします。
atalk -b -s 90 '千代田区の天気 今日は南南東の風、風力2。軽風。天気はくもり 所により 夕方 から 夜のはじめ頃 雨。 降水確率は20%です。 現在発表されている気象警報・注意報はありません。 現在の気温は24.2度、湿度80%です。 今日の予想最高 気温は、26度です。 明日は南の風、風力5。疾風。天気はくもり 時々 雨。 降水確率は70%です。 明日の予想最高気温は、22度。予想最低気温は、20度です。'

参考文献

Raspberry Piに現在時刻、天気予報、ニュースを喋らせる
現在出ている気象警報・注意報を取得する
新しい気象庁サイトからJSONデータが取得できる件

5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5