2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Qiita全国学生対抗戦Advent Calendar 2024

Day 23

緊急地震速報を受信した瞬間にNHK総合の録画を開始する(EPGStation)

Last updated at Posted at 2024-12-22

この記事は Qiita全国学生対抗戦 Advent Calendar 2024 の23日目になります。よろしくお願いします!!

導入

以前の記事で、RaspberryPi 4を使った地上波のテレビサーバを構築しました。

(宣伝)

実に快適な録画溜め生活を送っていたのですが、ふと、こう思いました。

「緊急地震速報の録画、自動でできるんじゃないか?」

実は、緊急地震速報やその他ニュース速報の瞬間などの動画はYoutube上で一定の人気があるジャンルで、最近は地震の後すぐに投稿されるほど盛り上がっています(体感)。
EPGStationという録画システムには、外部から指定したチャンネルの録画を開始できるAPIが存在しており、それを使えば「地震発生→速報→録画」の流れが自動化できるのではと思った次第です。

とりあえずAPI叩いてみる

EPGStationのAPIドキュメント(Swagger UI製で便利)によると、http://localhost:8888/api/reservesから予約を追加できるようですが、
前提として必要なのが、チャンネルごとに決まったchannelIdです。/api/channelsで取得できます。

programIdを使うことで、録画する番組を指定することも可能ですが、緊急時の編成変更などを考慮し、時間指定で予約を追加しています。programIdは番組ごとに異なるので、録画開始時に/api/schedules/broadcastingから取得する必要があります。

/api/channels(一部)
[
  {
    "id": 3273601024,
    "serviceId": 1024,
    "networkId": 32736,
    "name": "NHK総合1・東京",
    "halfWidthName": "NHK総合1・東京",
    "hasLogoData": true,
    "channelType": "GR",
    "channel": "27",
    "type": 1,
    "remoteControlKeyId": 1
  },

今回はNHK総合1・東京(JOAK-DTV)のみを録画対象とするので、channelId3273601024をハードコーディングします。
ついでに前述のprogramIdも指定してPOSTしてみたのが以下の通り。

curl -X 'POST' \
  'http://localhost:8888/api/reserves' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "allowEndLack": true,
  "tags": [
    0
  ],
  "encodeOption": {
    "mode1": "H.264",
    "isDeleteOriginalAfterEncode": false
  },
  "programId": 327360102407384,
  "timeSpecifiedOption": {
    "name": "string",
    "channelId": 3273601024,
    "startAt": 0,
    "endAt": 0
  }
}'

idDeletedOriginamAfterEncodeは文字通り、エンコード後にm2tsファイルを削除するかどうかの指定です。
成功するとreserveIdが返ってきます。

Pythonでまとめて書く

IDとってReserveにぶち込むだけです。なお、残念ながら予算の関係で1チューナーしか用意できませんでしたので、最初にすでに録画中かどうかを確認し、そうでなければ録画を開始するというフローにしました。2番組同時録画ができないわけですからね。
録画時間については、発生から1時間の指定としました(が、正常に動作するか確認中です)。startAt endAtはUnixタイムスタンプのミリ秒になります。

rec.py
import time
import requests

IS_RECORDING_URL = 'http://192.168.2.136:8888/api/recording?offset=0&limit=20&isHalfWidth=false'
RESERVE_URL = 'http://192.168.2.136:8888/api/reserves'
HEADERS = {
    "accept": "application/json",
    "Content-Type": "application/json"
}

def is_recording():
    # 録画中か確認
    is_recording = requests.get(IS_RECORDING_URL).json()
    if is_recording['total'] == 0:
        return False
    else:
        return True

def start_recording():
    #録画を開始
    start_at = int(time.time() * 1000)
    end_at = start_at + (60 * 60 * 1000)

    data = {
        "allowEndLack": True,
        "tags": [0],
        "encodeOption": {
            "mode1": "H.264",
            "isDeleteOriginalAfterEncode": False
        },
        "timeSpecifiedOption": {
            "name": "地震情報受信",
            "channelId": 3273601024,
            "startAt": start_at,
            "endAt": end_at
        }
    }

    response = requests.post(RESERVE_URL, headers=HEADERS, json=data)

    response_data = response.json()
    reserve_id = response_data['reserveId']
    print(f"ReserveId: {reserve_id}")

    return reserve_id

def main():
    if  is_recording():
        print('Recording already')
    else:
        start_recording()
        print('Recording start')

if __name__ == "__main__":
    main()

リクエストのdataで指定しているnameの立ち位置がよく分かりません。EPGStation上での番組名は、録画開始時に放送されている番組名、録画ファイルの名前も別に指定したもので保存されています。
唯一このnameを表示するのが、私が下の記事で参考にさせていたDiscord通知用Botのみです。

EEW配信サービスの選定

「WebSocketでの配信」かつ「無料」という条件で、今回はP2P地震速報を採用しました。登録なしですぐに受信を開始できたのも助かりました。

緊急地震速報の仕様について
緊急地震速報には「予報」と「警報」という2つの種類があります。

  • 予報 : 専門機関や特定の利用者向けに提供される情報
  • 警報 : 一般の人向けの情報、震度5弱以上の揺れが予測される場合に発表

(参考 : 気象庁ホームページより)
https://www.data.jma.go.jp/eew/data/nc/shikumi/shousai.html

一般的に、テレビでチャイム音とともに緊急地震速報が放送されるのは、この「警報」が発表された際です。今回採用したサービスは「警報」のみの提供となります。
本記事ではテレビでの速報が録画できれば良いため、このサービスがぴったりというわけです。

(「予報」も受信できるサービスはありますが、当然、ほとんどは有料となっています。例: DM-D.D.S)

受信用コードはこちらの方のものを参考にさせていただきました。クライアント側のみを構築しています。

このようにDiscordへ地震発生を通知することもできるようです。今回はこの機能も流用させていただきました。(画像の内容はダミーデータ)

image.png

デバッグについて
当たり前ですが、このシステムのデバッグは地震発生時にしかできません。それでは困ります。
ありがたいことに過去の地震情報を配信してくれるサーバが用意されていますので、これをテスト用に使うのが便利です。

wss://api-realtime-sandbox.p2pquake.net/v2/history

(Discordなどへ再配信を行う際は、ユーザを混乱させないよう注意してください。テストであることを明示するか、Webhook.siteのような受信サービスを利用することをおすすめします。)

色々書き換えて結果的に完成したのが以下です。WebSocketから受け取ったデータをとりあえずDiscordへ通知しています。速報で震度が4以上であった場合、このコードから録画開始用のコードを呼び出しています。

eew.py
import rec # 同ディレクトリのrec.pyを読み込み
import asyncio
import json
import aiohttp
from websockets import client
from websockets.exceptions import ConnectionClosed
from datetime import datetime

GATEWAY_URL = 'wss://api.p2pquake.net/v2/ws'
WEBHOOK_URL = 'https://discord.com/api/webhooks/xxxxx/xxxxx'

MT = {
    "-1": "Unknown",
    "10": "震度1",
    "20": "震度2",
    "30": "震度3",
    "40": "震度4",
    "50": "震度5弱",
    "55": "震度5強",
    "60": "震度6弱",
    "65": "震度6強",
    "70": "震度7",
}

COLOR_CODES = {
    "-1": 16777215,
    "10": 3955330,
    "20": 1999590,
    "30": 7923420,
    "40": 16777110,
    "50": 16765440,
    "55": 16750080,
    "60": 15741440,
    "65": 12451840,
    "70": 9175080,
}

async def connect():
    print("P2P地震情報APIサーバに接続中...")
    while True:
        try:
            async for ws in client.connect(GATEWAY_URL):
                print("サーバに接続しました")
                try:
                    while True:
                        recv = await ws.recv()
                        data = json.loads(recv)

                        if data["code"] == 551:  # Earthquake data
                            await handle_earthquake_data(data)
                        elif data["code"] == 556:  # Warning data
                            print("Warning data received:", data)
                except ConnectionClosed:
                    print("接続が切断されました。再接続します...")
                    break
        except Exception as e:
            print("Error:", e)
            await asyncio.sleep(5)

async def handle_earthquake_data(data):
    try:
        earthquake = data["earthquake"]
        max_scale = str(earthquake["maxScale"])
        magnitude = earthquake["hypocenter"].get("magnitude", "Unknown")
        depth = earthquake["hypocenter"].get("depth", "Unknown")
        location = earthquake["hypocenter"].get("name", "Unknown")
        time_str = earthquake["time"]
        time = datetime.strptime(time_str, '%Y/%m/%d %H:%M:%S').strftime('%Y年%m月%d日 %H時%M分')

        if int(max_scale) >= 40:
            rec.main()

        description = (
            f"{time}頃、マグニチュード{magnitude}、最大{MT[max_scale]}の地震が発生しました。"
            f"深さは{depth}km、発生場所は{location}です。"
        )

        await send_discord_notification(description, COLOR_CODES[max_scale])
    except Exception as e:
        print("Error processing earthquake data:", e)

async def send_discord_notification(description, color):
    payload = {
        "username": "QuakeWatch",
        "embeds": [
            {
                "title": "地震発生",
                "description": description,
                "color": color,
                "footer": {"text": "ソース: 気象庁 / P2P地震情報API"},
            }
        ]
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(WEBHOOK_URL, json=payload) as resp:
            if resp.status == 204:
                print("Discordへの通知に成功しました")
            else:
                print(f"Discordへの通知に失敗しました: {resp.status}")

if __name__ == "__main__":
    asyncio.run(connect())

このコードをsystemedなどで自動常時起動しておけばよいでしょう。そのあたりはお好みで〜

終わりに

まだ実際の地震を観測しておらず、正常な動作は保証できません。その点はご了承ください。
運用中に気づいたことがあれば追記等していきますので、お待ちいただけたら幸いです。

お読みいただきありがとうございました!他の方のアドカレ記事もおもしろいので是非!!

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?