概要
数年前にウェザーニュースのリポート特典で,WxBeacon2という簡易気象観測機器をもらいました.
観測データは,ウェザーニュースアプリの画面上で閲覧することができるようになっています.
しかし観測データは14日前までしか見ることができず,エクスポート機能もありません.
そこで,色々調べながらPythonを使ってWxBeacon2の観測データを取得することに成功したので,本記事はその解説を行います.
WxBeacon2の仕様
WxBeacon2の仕様書はWeb上で手に入ります.
ページ数は,上記PDFの右下に付されている番号で数えます.
何かいっぱい書いてありますが,全部を読む必要はありません.必要な箇所だけをつまみ食いしていきましょう.
動作フロー(書込み)
まずは動作フローを確認して概要をつかみます.4ページ目にフローチャートが掲載されています.
どうやらモードが2つあって「データ保存有」「データ保存無」があるようです.デフォルトでは保存有になっているかと思いますので,今回は「データ保存有」だけを見ればよいでしょう.
まず,測定フラグがTrueなら測定がなされ,Latest dataというものを更新するようです.
9ページ「2.1.1 Latest data」によると,
測定間隔ごとにデータを更新し Latest data に反映する.
とあります.最新の観測データがLatest dataに格納されると考えておけば良いでしょう.
次に,Time informationが書込済みかどうかを判定しています.
24ページ「2.3.1 Time information」によると,
フラッシュメモリに記録されるデータの時刻合わせ用に対向機から UNIX 時間を設定する.
本設定に基づく時刻情報はフラッシュメモリのページ毎に記録される.
*注意:測定データのメモリ記録は本 Characteristic へ時刻セットを行わないと開始されない.
だそうです.つまり,時刻がBeaconにセットされてないと保存させませんよーということですね.
時刻がセットされていれば,次の「データ保存→行番号更新」の処理に移ります.
その次に,行番号が12かそれ未満かの条件分岐があります.
行番号が12に達すると,その後の処理で「ページ更新」がされるので,1ページあたりのデータ上限は13であることがわかります(行番号は下図のように0スタート).
動作フロー(読み出し)
観測データが記録されるしくみは分かりましたが,一番知りたいのは,記録されたデータをどう読み出すかです.
下図は読み出し時の動作フローです.
まず,最新ページを読み込みます(Read Latest page).Latest pageの内容は以下のようになっていて,最新値が保存されているページ番号・行の情報が入っています.この情報をもとに,所望のページ番号や行を特定します.
次の処理に「Write Request Page」とありますが,これは何ページの何行目を読み出したいかを指定する工程です.Request Pageに書き込むことで,後述するResponse Dataに所望のデータが格納されます.
次の処理「Read Response flag」は,読み出しが完了したかを確認する工程です.更新完了フラグが立ったら,次の「Read Response data」に進みます.
↓で書かれているように,Request pageで指定された行を先頭にして(行番号12から0の方向に)データが格納されています.
つまり,5を行指定すると「5,4,3,2,1,0」の順番でデータが格納されるということです.なのでページ全体の値を取得したいならRequest pageの段階では行=12としておけばよいことになりますね.
なお,ページをまたいで値を取得したいときは,最初に戻ってRequest pageからやり直す必要があります.
実装
BeaconとBLEで接続する
WxBeacon2は,Bluetoothで接続できます.
Windows環境で使えるライブラリとしては,bleak
というものがあります.
pip install bleak
でインストールできます.
接続部分のプログラムを以下に示します.データ取得等の処理は# PROCESS....
で示した部分で実装することになります.
'Env'
はBeaconのデバイス名で,Envが見つかるまでリトライするような実装としています.
PCのBluetooth接続をオンにするのを忘れずに!
import asyncio
import bleak
async def connect(device_name, loop):
# Discover devices
devices = await bleak.BleakScanner.discover()
device = next((d for d in devices if d.name == device_name), None)
if not device:
raise bleak.exc.BleakDeviceNotFoundError(f'Device "{device_name}" not found.')
# Connect to the device
async with bleak.BleakClient(device, loop=loop) as client:
print('Connected!')
# PROCESS....
#############
if __name__ == '__main__':
while True:
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(connect('Env', loop))
break
except asyncio.exceptions.TimeoutError:
print('Timeout. Retrying...')
except bleak.exc.BleakDeviceNotFoundError:
print('Device not found. Retrying...')
Read Latest page
動作フロー(読み出し)で説明したように,まずは最新ページを取得します.
#...
import struct
def unix_to_jst(unix_time, return_str=False):
'''Convert UNIX time to JST time'''
jst_time = datetime.datetime.fromtimestamp(unix_time)
if return_str:
return jst_time.strftime("%Y/%m/%d %H:%M:%S")
else:
return jst_time
#.........
#.........
# Get the latest page
data = await client.read_gatt_char('0c4c3002-7700-46f4-aa96-d5e974e32a54')
(latest_page_time, interval, latest_page, latest_row) = struct.unpack('<IHHB', data)
latest_page_time = unix_to_jst(latest_page_time + interval * latest_row)
#.........
#.........
最新ページがある場所にアクセスするには,.read_gatt_char()
を使います.引数にはUUIDというものを入れます.ベースとなるUUIDは7ページに記載されています.
例えば今回のように最新ページを読みたい場合は,このXXXX
の部分に3002
を入れればいいわけです(3002
はCharacteristics UUID:
に続けて書いてある数字).
プログラムに戻りますが,UUIDを指定してread_gatt_charしてあげると,変数data
にバイナリデータが格納されます.
しかしバイナリデータのままでは読めないので,struct
というライブラリのunpack(format, buffer)
を使ってパースします.format
にはデータ仕様に沿った文字列を入れ,buffer
にはバイナリデータを入れます.
パースできると,仕様通り
- latest_page_time:最新ページを開始したUNIX時間
- interval:測定間隔
- latest_page:最新ページ
- latest_row:最新行
が取得できるはずです.
Write Request page
次は動作フロー(読み出し)の「Write Request page」で,何ページ目の何行目まで取得するかを指定します.
Request pageのCharacteristic UUIDは3003
です.
#.........
# Get the page data
for page in reversed(range(latest_page + 1)):
# Request the page
set_value = struct.pack('<HB', page, latest_row if page == latest_page else 12)
await client.write_gatt_char('0c4c3003-7700-46f4-aa96-d5e974e32a54', set_value)
#.........
最新ページのときは(if page == latest_page
)最新行を指定し,それ以外のときは最終行12を指定しています.
Read Response flag
次は動作フロー(読み出し)の「Read Response flag」で,レスポンスページの更新が完了したかを確認します.
Read Response flagのCharacteristic UUIDは3004
です.
#.........
# Check for updates
flag, retry = 0, 0
while flag != 1:
response_flag = await client.read_gatt_char('0c4c3004-7700-46f4-aa96-d5e974e32a54')
(flag, start_time) = struct.unpack('<BI', response_flag)
retry += 1
if retry > 1:
print(f'Retrying request page (Page:{page}, Status:{flag})')
#.........
flag
が1
になるまでリトライするようにしています.
Read Response data
次は動作フロー(読み出し)の「Read Response data」で,観測値を読み出します.
Read Response dataのCharacteristic UUIDは3005
です.
#.........
# Get the page data
for row in reversed(range(latest_row+1 if page == latest_page else 12+1)):
if unix_to_jst(start_time + interval * row) < datefrom: # Do not retrieve data before the specified date and time
break_flag = True
break
page_value = await client.read_gatt_char('0c4c3005-7700-46f4-aa96-d5e974e32a54')
(row, temp, hum, light, uv, press, noise, discom, heat, batt) = struct.unpack('<BhhhhhhhhH', page_value)
time = unix_to_jst(start_time + interval * row, return_str=True)
temp /= 100
hum /= 100
uv /= 100
press /= 10
noise /= 100
discom /= 100
heat /= 100
batt /= 1000
# Write to file
mode = 'a' if os.path.exists(output) else 'w'
with open(output, mode) as f:
f.write(f'{time},{temp},{hum},{light},{uv},{press},{noise},{discom},{heat},{batt}\n')
if break_flag:
break
#.........
ページ内の値をまとめて読み出せるわけではなく,client.read_gatt_char('0c4c3005-7700-46f4-aa96-d5e974e32a54')
を繰り返し行うことで1行ずつ順番に読み出せます(逆順であることに注意).仕様書の2.1.5節を参照.
取得したデータは,外部ファイルに1行ずつ保存しています.
取得したデータを見てみる
このような感じで,2週間以上前のデータも取得することができました.
ソースコード
Qiitaアドベントカレンダーで使ったコードはこちらのレポジトリにまとめています.
本記事のコードは09-beacon2
の中に入っています.
なお,GitHubのコードでは.pyを実行時に
$ python get.py -d "2023/12/12 00:00:00" -o "output.txt"
のように-d
,-o
オプションで,それぞれ取得開始時刻と出力先を指定すればいいようになっています.