概要
オムロンの環境センサ2JCIE-BU01とRaspberry Pi、InfluxDB、Grafanaを組み合わせて各種環境データや加速度データ(地震波形)を取得するシステムを構築してみました。2JCIE-BU01の環境情報(温度・湿度・気圧・CO2濃度等)を取得するプログラムの記事はいくつかあるのですが、加速度データについては通信プロトコルが別扱いになっており、探した範囲では解説が見当たらなかったため、デバイスのマニュアルを参考に、環境データと一緒に加速度データも取得するプログラムをPythonで書きました。ついでに震度も計算します。
Grafanaによる表示例
地震発生時における2JCIE-BU01の挙動
2JCIE-BU01は、加速度データを常に記録しているわけではありません。地震が発生した時のみ、以下のように動作します。
- 一定の揺れを検知したら、100Hzで加速度記録を開始。
- 地震か否かの判定を開始。vibration_informationを 0 → 1 に変更。
- 地震だと判定したらvibration_informationを 2 に変更しそのまま記録継続。地震でないと判定した場合は 0 に戻し、加速度データは破棄。
- 2分間加速度を取得しデバイス内のリングブァッファ(地震10回分)に保存。
- 記録が終了したらvibration_informationを 2 → 0 に戻す。
加速度データは、5の時点で初めて取得可能になります。したがって本プログラムは、vibration_informationが 2 からそれ以外に変化するのをトリガーとして、リングブァッファの1番目(最新版)の地震加速度データ(2分間分)を取得し、InfluxDBに受け渡します。
プログラム内容
プログラムは、m.yukiさまのRaspberry Pi + 2JCIE-BU01 + Grafanaでデータ取得・可視化してみるに載っている2つのPythonコードを改変したもの(とは言うものの原型はほとんど残っていませんが)になっています。センサーやInfluxDB、Grafana等についてはほぼ同じ環境を想定しているので、プログラムを差し替えるだけで動くはずです。Raspberry Piでなくとも、Linux系OSならだいたい動作するのではないでしょうか。ただ一つ、本プログラムではnumpyを使用しているため、入っていない場合はpip install numpy
によりインストールする必要があります。
import serial
import numpy as np
from struct import pack, unpack
from datetime import datetime, timezone
class Sensor:
class SensorError(Exception):
pass
def __init__(self):
# Open the serial port.
self._ser = serial.serial_for_url("hwgrep:///dev/ttyUSB0",
115200, serial.EIGHTBITS,
serial.PARITY_NONE, timeout=1)
self._ser.reset_input_buffer()
self._ser.reset_output_buffer()
# To record the start time of earthquake, set the timecounter;
# otherwise, it will remain at 0.
current_timecounter = int.from_bytes(
self.read(b'\x01\x02\x52'), 'little')
if current_timecounter == 0:
self.read(b'\x02\x02\x52' + pack('<Q', 1))
# To protect the flash memory, set the interval for writing sensor
# data to 3600sec.
if int.from_bytes(self.read(b'\x01\x03\x52'), 'little') < 3600:
self.read(b'\x02\x03\x52' + pack('<H', 3600))
time.sleep(150) # This process takes about 2min.
#self.read(b'\x02\x16\x51\x02') # Erase acceleration data.
#time.sleep(150) # This process takes about 2min.
# Set the LED to ...
self.read(b'\x02\x11\x51\x00\x00\x00\x00\x00') # off.
#self.read(b'\x02\x11\x51\x08\x00\x00\x00\x00') # according to the acceleration.
def __del__(self):
if self._ser and self._ser.is_open:
self._ser.close()
def read(self, command_payload=None) -> bytearray:
# Send out the command_payload and return the data
# included in the response payload.
if command_payload:
self.write(command_payload)
# header + len(payload + crc)
header_payloadlen = self._ser.read(4)
if header_payloadlen[0:2] != b'RB':
raise self.SensorError("Invalid Header")
payload_len = int.from_bytes(header_payloadlen[2:4],'little') - 2
rest = self._ser.read(payload_len + 2)
payload = rest[0:payload_len]
frame_crc = rest[payload_len:]
if self._calc_crc(header_payloadlen + payload) != frame_crc :
raise self.SensorError("CRC mismatch")
return payload[3:] # Extract data from the payload.
def write(self, payload) -> None:
# Add a header and other elements to the payload and send it.
command = b'RB' \
+ pack('<H', len(payload) + 2) \
+ payload
command = command + self._calc_crc(command)
self._ser.write(command)
self._ser.flush()
def _calc_crc(self, data) -> bytearray:
# Calculate the value of CRC-16/MODBUS
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
crc = (crc >> 1) ^ 0xA001 if crc & 0x01 else crc >> 1
return pack('<H', crc)
class DataProcessor:
def __init__(self, sensor: Sensor):
self.sensor = sensor
def get_current_data(self) -> dict:
# Read the latest sensor data.
data = self.sensor.read(b'\x01\x21\x50')
keys = ["temperature", "relative_humidity", "ambient_light",
"barometric_pressure", "sound_noise", "eTVOC", "eCO2",
"discomfort_index", "heat_stroke", "vibration_information",
"si_value", "pga", "seismic_intensity"]
values = unpack('<hHHLHHHHhBHHH', data[1:28])
units = [0.01, 0.01, 1, 0.001, 0.01, 1, 1, 0.01, 0.01, 1, 0.1, 0.1, 0.001]
retval = dict([ [k, v * u] for k, v, u in zip(keys, values, units)])
retval["time_measured"] = datetime.now(timezone.utc)
return retval
def get_earthquake_data(self) -> list:
# Read the acceleration data from device memory.
# The earthquake time data may have an error margin of
# approximately 1 second.
# Read the current timecounter
current_timecounter = int.from_bytes(self.sensor.read(b'\x01\x01\x52'),
'little')
# Read the accelleration memory header.
data = self.sensor.read(b'\x01\x3e\x50\x00\x01')
# The timecounter of earthquake end.
data_timecounter = int.from_bytes(data[6:14], 'little')
# Calculate the UTC time of earthquake end.
earthquake_end_time = datetime.now(timezone.utc).timestamp() \
- (current_timecounter - data_timecounter)
# Number of pages of the acceleration data,
# including the header as page 0.
pages = int.from_bytes(data[0:2], 'little') # May be 376.
# Send a command to transmit acceleration data.
self.sensor.write(b'\x01\x3f\x50\x00\x01\x01\x00'
+ pack('<H', pages - 1))
# Get all of the acceleration data pages.
rawdata_all = [self.sensor.read() for _ in range(pages - 1)]
# The data recording frequency is assumed to be 100 Hz.
# [utctime, accX, accY, accZ] x 32 data/page x 375 pages
acceleration_data = [
[earthquake_end_time
+ ((int.from_bytes(data[0:2],'little')-pages)*32+j)*0.01]
+ [x*0.1 for x in unpack('<hhh', data[36+j*6 : 36+j*6+6])]
for data in rawdata_all for j in range(32)]
return acceleration_data
@staticmethod
def calc_shindo(acceleration_data) -> float:
# Caluculate the "measured shindo" index.
f = 100 # The frequncy of the data.
sample_size = len(acceleration_data)
freqs = np.fft.rfftfreq(sample_size, 1.0 / f) #The frequencies of FFT.
# Low cut filter
lo_cut_filter = np.sqrt(1.0 - np.exp(-8.0 * freqs * freqs * freqs))
# High cut filter
x = freqs * 0.1
x2 = x * x
hi_cut_filter = 1.0 / np.sqrt((((((0.000155*x2 + 0.00134)*x2
+ 0.009664)*x2 + 0.0557)*x2
+ 0.241)*x2 + 0.694)*x2 + 1.0)
# Periodic effect filter
periodic_filter = np.sqrt(np.reciprocal(freqs
,where = freqs != 0.0))
# Combine filters
combined_filter = lo_cut_filter * hi_cut_filter * periodic_filter
# Extract 3-axes data and apply FFT.
accXf = np.fft.rfft([row[0] for row in acceleration_data])
accYf = np.fft.rfft([row[1] for row in acceleration_data])
accZf = np.fft.rfft([row[2] for row in acceleration_data])
# Apply the filter and restore data.
filteredX = np.fft.irfft(accXf * combined_filter)
filteredY = np.fft.irfft(accYf * combined_filter)
filteredZ = np.fft.irfft(accZf * combined_filter)
# Norm of the filtered acceleration vector.
filteredAcc = np.sqrt(filteredX * filteredX
+ filteredY * filteredY
+ filteredZ * filteredZ)
# Find the value of 'a' such that the total time when the absolute
# value of the acceleration exceeds 'a' is 0.3 seconds.
a = sorted(filteredAcc)[-int(0.3 * f)]
return max(0.0, 2.0 * np.log10(a) + 0.94) if a > 0.1 else 0.0
from get_2jciebu_data import *
import influxdb_client, time
from influxdb_client import Point
from influxdb_client.client.write_api import WriteOptions
#InfluxDB settings.
token = "Your token here"
org = "Your organization here"
url = "http://localhost:8086"
bucket="Your backet name here"
class Database:
def __init__(self, url, token, org, bucket):
self.client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(WriteOptions(batch_size=7000,
flush_interval=5000))
self.bucket = bucket
self.org = org
def write(self, data) -> None:
self.write_api.write(bucket=self.bucket, org=self.org, record=data)
def close(self) -> None:
self.write_api.close()
self.client.close()
def main():
# To detect the end of an earthquake, store the previous value of vibration.
vibration_prev = 0
time.sleep(2) # Wait until the device times out.
sensor = Sensor()
data_processor = DataProcessor(sensor)
database = Database(url, token, org, bucket)
while True:
try:
data = data_processor.get_current_data()
fields = ["temperature", "relative_humidity", "ambient_light",
"barometric_pressure", "sound_noise", "eTVOC", "eCO2",
"discomfort_index", "heat_stroke", "vibration_information",
"si_value", "pga", "seismic_intensity"]
point = Point("2jciebu").tag("location", "home").time(data["time_measured"])
for field in fields:
if field in data:
point.field(field, data[field])
database.write(point)
if vibration_prev == 2 and data["vibration_information"] < 2:
# Earthquake end is detected.
acceleration_data = data_processor.get_earthquake_data()
# Write acceleration data to InfluxDB.
points = [
Point("2jciebu").tag("location", "home").time(int(row[0]*1e9))
.field("acceleration_X", row[1])
.field("acceleration_Y", row[2])
.field("acceleration_Z", row[3])
for row in acceleration_data]
database.write(points)
# Write measured shindo value to InfluxDB.
# t * 0.01 sec is the time point to calculate measured shindo.
# (Must be 30 <= t < 12000 )
points = [
Point("2jciebu").tag("location", "home")
.time(int(acceleration_data[t][0]*1e9))
.field("measured_shindo",
data_processor.calc_shindo(acceleration_data[:t+1][1:]))
for t in [30, 50, 100, 200, 300, 500, 700,
1000, 2000, 3000, 6000, 11999]]
database.write(points)
# Processing earthquake data takes time, so do not sleep.
time.sleep(0)
else:
time.sleep(3) # Wait for 3 seconds before next measurement.
vibration_prev = data["vibration_information"]
except KeyboardInterrupt:
database.close()
del sensor
exit(0)
except Sensor.SensorError:
# If an error occurs in communication, terminate the program.
database.close()
del sensor
exit(1)
except ValueError:
pass
if __name__ == "__main__":
main()
一応テストでは上手く動作していますが、例外などの扱いは少し甘いかもしれません。
注意点
まず、本プログラムを最初に起動すると、150秒間はデータ取得が始まりません。この間は待っていて下さい。これは、地震の開始時刻を記録するために、デバイス内のタイマーを有効にする必要があり、これを有効にすると、環境データのデバイスへの毎秒の書き込みが有効にされてしまうため、フラッシュメモリを保護するために、1時間ごとに変更するためです。この変更に2分間かかるとされています。次の起動からは、これはパスします。
また地震データのテストについて、他の環境データは自動的に流れてくるのに対し、加速度データは揺れを検知しないと記録が始まりません。センサーを人工的に揺らして地震を作れば良いのですが、注意点がいくつかあります。
- 揺れの開始から2分間経たないと加速度データは取得できません。
- 小刻みに周波数の高い揺れを数秒間与えると上手く行くようです。大きく振る必要はありません。むしろ大きすぎると地震ではないと判定されるようです。
- もちろん小さすぎる揺れも、地震とは判定されません。
ちなみに元日の能登半島の地震は、筆者の環境では震度2でしたが、地震とは判定されず記録できませんでした。おおよそ震度3から4程度の揺れは必要なようです。
また、加速度データは量が多いため、地震検知2分後に始まるデバイスからの転送には、10秒以上かかります。その間、他の環境データは、データの取得間隔を何秒に設定していても欠測となることに注意してください。これを防止する方法もないではないのですが、労多い割に欠測値が数個拾えるだけなので、とりあえずは実装していません。
そしてプロトコルの仕様上、加速度データの時刻情報には1秒以上の精度がありません。つまり、最終的に表示される地震の波形は、最大で1秒程度全体が左右にずれていると考えてください。1つの地震波形の中ならば、相対的な時刻は合っています。
震度データについて
震度データも取得または計算します。地震の大きさについては、4つのデータが出力されます。
- shindo 加速度データから気象庁の計算法を使って求められた計測震度。正確には、リンク先の用語でいえば「計測震度Ⅰ」が求められます。この「計測震度Ⅰ」を「計測震度」を経由し表1の階級表で換算したものが「震度」となります(次項も参照)。
- si_value 建物への被害を重視した地震の大きさの指標(SI値)
- pga 最大加速度の値
- seismic_intensity 3のSI値から、回帰分析により求めたと思われる計測震度値
このうちsi_value, pga, seismic_intensityはデバイス内で、shindoが本Pythonプログラム内で計算されます。seismic_intensityが一度SI値を経由するのに対して、shindoは加速度値から直接正式な計算方法で計算されるため、気象庁発表の震度値に近くなっているのではないかと期待できます。ただし、前述のように加速度データは地震検知から2分経たないと提供されないため、計測震度の計算・表示もそのタイミングになります。si_value, pga, seismic_intensityは、リアルタイムで更新されます。
shindoは、記録開始から0.3, 0.5, 1, 2, 3, 5, 7, 10, 20, 30, 60, 120秒後(プログラム内で指定可能)までに累積されたデータをもとに計算されます。計測震度は、加速度値をフィルターにかけてソートし、上から0.3秒分のところの値から計算するとされているため、データが多いほど増加、または横ばいになります。気象庁で発表される震度は60秒分のデータですが、震度が増えていく様子を見たいという需要、また後半で地震が大きくなった場合などを考慮し、このような仕様としました。
計測震度Ⅰと震度の関係
前述のように、本プログラムで出力されるshindoは気象庁の計算法で言う計測震度Ⅰです。この値の少数第3位を四捨五入し少数第2位を切り捨てたのが本来の計測震度、さらにそれをリンク先の表1で換算したものが震度となります。この表を見ると、一見計測震度Ⅰを四捨五入したものが(震度5,6の強弱は別として)震度となるように思えますが、計測震度を経由しているため、正確には次のように換算しなければならないことに注意が必要です。
計測震度Ⅰ(shindo) | 計測震度 | 震度 |
---|---|---|
0.495未満 | 0.5未満 | 震度0 |
0.495以上1.495未満 | 0.5以上1.5未満 | 震度1 |
1.495以上2.495未満 | 1.5以上2.5未満 | 震度2 |
2.495以上3.495未満 | 2.5以上3.5未満 | 震度3 |
3.495以上4.495未満 | 3.5以上4.5未満 | 震度4 |
4.495以上4.995未満 | 4.5以上5.0未満 | 震度5弱 |
4.995以上5.495未満 | 5.0以上5.5未満 | 震度5強 |
5.495以上5.995未満 | 5.5以上6.0未満 | 震度6弱 |
5.995以上6.495未満 | 6.0以上6.5未満 | 震度6強 |
6.495以上 | 6.5以上 | 震度7 |
と、理屈上はそうなのですが、この違いが問題になるほどセンサーの精度が高くないので、四捨五入したら震度になるとしてもさほど問題は無いかもしれません。
最後に
2JCIE-BU01は2025年1月に生産を停止するようです。必要な方は早めに手に入れておきましょう。後継製品が出るのなら良いのですが…。