5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

気象分野でPython&機械学習Advent Calendar 2023

Day 9

【気象観測】Windows&PythonでWxBeacon2からデータ取得

Last updated at Posted at 2023-12-11

概要

数年前にウェザーニュースのリポート特典で,WxBeacon2という簡易気象観測機器をもらいました.

観測データは,ウェザーニュースアプリの画面上で閲覧することができるようになっています.

しかし観測データは14日前までしか見ることができず,エクスポート機能もありません.

そこで,色々調べながらPythonを使ってWxBeacon2の観測データを取得することに成功したので,本記事はその解説を行います.

WxBeacon2の仕様

WxBeacon2の仕様書はWeb上で手に入ります.

ページ数は,上記PDFの右下に付されている番号で数えます.

何かいっぱい書いてありますが,全部を読む必要はありません.必要な箇所だけをつまみ食いしていきましょう.

動作フロー(書込み)

まずは動作フローを確認して概要をつかみます.4ページ目にフローチャートが掲載されています.

どうやらモードが2つあって「データ保存有」「データ保存無」があるようです.デフォルトでは保存有になっているかと思いますので,今回は「データ保存有」だけを見ればよいでしょう.

image.png

まず,測定フラグがTrueなら測定がなされ,Latest dataというものを更新するようです.

9ページ「2.1.1 Latest data」によると,

測定間隔ごとにデータを更新し Latest data に反映する.

とあります.最新の観測データがLatest dataに格納されると考えておけば良いでしょう.

次に,Time informationが書込済みかどうかを判定しています.

24ページ「2.3.1 Time information」によると,

フラッシュメモリに記録されるデータの時刻合わせ用に対向機から UNIX 時間を設定する.
本設定に基づく時刻情報はフラッシュメモリのページ毎に記録される.
*注意:測定データのメモリ記録は本 Characteristic へ時刻セットを行わないと開始されない.

だそうです.つまり,時刻がBeaconにセットされてないと保存させませんよーということですね.

時刻がセットされていれば,次の「データ保存→行番号更新」の処理に移ります.

その次に,行番号が12かそれ未満かの条件分岐があります.
行番号が12に達すると,その後の処理で「ページ更新」がされるので,1ページあたりのデータ上限は13であることがわかります(行番号は下図のように0スタート).

image.png

動作フロー(読み出し)

観測データが記録されるしくみは分かりましたが,一番知りたいのは,記録されたデータをどう読み出すかです.

下図は読み出し時の動作フローです.

image.png

まず,最新ページを読み込みます(Read Latest page).Latest pageの内容は以下のようになっていて,最新値が保存されているページ番号・行の情報が入っています.この情報をもとに,所望のページ番号や行を特定します.

image.png

次の処理に「Write Request Page」とありますが,これは何ページの何行目を読み出したいかを指定する工程です.Request Pageに書き込むことで,後述するResponse Dataに所望のデータが格納されます.

image.png

次の処理「Read Response flag」は,読み出しが完了したかを確認する工程です.更新完了フラグが立ったら,次の「Read Response data」に進みます.

↓で書かれているように,Request pageで指定された行を先頭にして(行番号12から0の方向に)データが格納されています.
つまり,5を行指定すると「5,4,3,2,1,0」の順番でデータが格納されるということです.なのでページ全体の値を取得したいならRequest pageの段階では行=12としておけばよいことになりますね.

image.png

なお,ページをまたいで値を取得したいときは,最初に戻ってRequest pageからやり直す必要があります.

実装

BeaconとBLEで接続する

WxBeacon2は,Bluetoothで接続できます.

Windows環境で使えるライブラリとしては,bleakというものがあります.
pip install bleakでインストールできます.

接続部分のプログラムを以下に示します.データ取得等の処理は# PROCESS....で示した部分で実装することになります.

'Env'はBeaconのデバイス名で,Envが見つかるまでリトライするような実装としています.

PCのBluetooth接続をオンにするのを忘れずに!

get.py
import asyncio
import bleak

async def connect(device_name, loop):
    # Discover devices
    devices = await bleak.BleakScanner.discover()
    device = next((d for d in devices if d.name == device_name), None)
    if not device:
        raise bleak.exc.BleakDeviceNotFoundError(f'Device "{device_name}" not found.')

    # Connect to the device
    async with bleak.BleakClient(device, loop=loop) as client:
        print('Connected!')

        # PROCESS....
        #############
        
if __name__ == '__main__':
    while True:
        try:
            loop = asyncio.get_event_loop()
            loop.run_until_complete(connect('Env', loop))
            break
        except asyncio.exceptions.TimeoutError:
            print('Timeout. Retrying...')
        except bleak.exc.BleakDeviceNotFoundError:
            print('Device not found. Retrying...')

Read Latest page

動作フロー(読み出し)で説明したように,まずは最新ページを取得します.

get.py
#...
import struct

def unix_to_jst(unix_time, return_str=False):
    '''Convert UNIX time to JST time'''
    jst_time = datetime.datetime.fromtimestamp(unix_time)
    if return_str:
        return jst_time.strftime("%Y/%m/%d %H:%M:%S")
    else:
        return jst_time
#.........
#.........
       # Get the latest page
        data = await client.read_gatt_char('0c4c3002-7700-46f4-aa96-d5e974e32a54')
        (latest_page_time, interval, latest_page, latest_row) = struct.unpack('<IHHB', data)
        latest_page_time = unix_to_jst(latest_page_time + interval * latest_row)
#.........
#.........

最新ページがある場所にアクセスするには,.read_gatt_char()を使います.引数にはUUIDというものを入れます.ベースとなるUUIDは7ページに記載されています.

image.png

例えば今回のように最新ページを読みたい場合は,このXXXXの部分に3002を入れればいいわけです(3002Characteristics UUID:に続けて書いてある数字).

プログラムに戻りますが,UUIDを指定してread_gatt_charしてあげると,変数dataにバイナリデータが格納されます.

しかしバイナリデータのままでは読めないので,structというライブラリのunpack(format, buffer)を使ってパースします.formatにはデータ仕様に沿った文字列を入れ,bufferにはバイナリデータを入れます.

パースできると,仕様通り

  • latest_page_time:最新ページを開始したUNIX時間
  • interval:測定間隔
  • latest_page:最新ページ
  • latest_row:最新行

が取得できるはずです.

Write Request page

次は動作フロー(読み出し)の「Write Request page」で,何ページ目の何行目まで取得するかを指定します.

Request pageのCharacteristic UUIDは3003です.

get.py
#.........
        # Get the page data
        for page in reversed(range(latest_page + 1)):
            # Request the page
            set_value = struct.pack('<HB', page, latest_row if page == latest_page else 12)
            await client.write_gatt_char('0c4c3003-7700-46f4-aa96-d5e974e32a54', set_value)
#.........

最新ページのときは(if page == latest_page)最新行を指定し,それ以外のときは最終行12を指定しています.

Read Response flag

次は動作フロー(読み出し)の「Read Response flag」で,レスポンスページの更新が完了したかを確認します.

Read Response flagのCharacteristic UUIDは3004です.

get.py
#.........
            # Check for updates
            flag, retry = 0, 0
            while flag != 1:
                response_flag = await client.read_gatt_char('0c4c3004-7700-46f4-aa96-d5e974e32a54')
                (flag, start_time) = struct.unpack('<BI', response_flag)
                retry += 1
                if retry > 1:
                    print(f'Retrying request page (Page:{page}, Status:{flag})')
#.........

flag1になるまでリトライするようにしています.

Read Response data

次は動作フロー(読み出し)の「Read Response data」で,観測値を読み出します.

Read Response dataのCharacteristic UUIDは3005です.

get.py
#.........
            # Get the page data
            for row in reversed(range(latest_row+1 if page == latest_page else 12+1)):
                if unix_to_jst(start_time + interval * row) < datefrom:  # Do not retrieve data before the specified date and time
                    break_flag = True
                    break
                page_value = await client.read_gatt_char('0c4c3005-7700-46f4-aa96-d5e974e32a54')
                (row, temp, hum, light, uv, press, noise, discom, heat, batt) = struct.unpack('<BhhhhhhhhH', page_value)
                time = unix_to_jst(start_time + interval * row, return_str=True)
                temp /= 100
                hum /= 100
                uv /= 100
                press /= 10
                noise /= 100
                discom /= 100
                heat /= 100
                batt /= 1000
                # Write to file
                mode = 'a' if os.path.exists(output) else 'w'
                with open(output, mode) as f:
                    f.write(f'{time},{temp},{hum},{light},{uv},{press},{noise},{discom},{heat},{batt}\n')
            if break_flag:
                break
#.........

ページ内の値をまとめて読み出せるわけではなく,client.read_gatt_char('0c4c3005-7700-46f4-aa96-d5e974e32a54')を繰り返し行うことで1行ずつ順番に読み出せます(逆順であることに注意).仕様書の2.1.5節を参照.

取得したデータは,外部ファイルに1行ずつ保存しています.

取得したデータを見てみる

このような感じで,2週間以上前のデータも取得することができました.

image.png

ソースコード

Qiitaアドベントカレンダーで使ったコードはこちらのレポジトリにまとめています.

本記事のコードは09-beacon2の中に入っています.

なお,GitHubのコードでは.pyを実行時に

$ python get.py -d "2023/12/12 00:00:00" -o "output.txt"

のように-d-oオプションで,それぞれ取得開始時刻と出力先を指定すればいいようになっています.

5
3
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?