はじめに
本記事では、弊社の新製品であるラズパイ対応センサシリーズの加速度センサを用い、ラズパイ上で加速度データの取得とFFT解析を行います。
さらに、4GPiによるLTE通信を用いて、ラズパイから解析結果をクラウド(本記事ではSORACOM Harvest)に送信し、可視化するIoT事例を紹介します。
ラズパイ対応センサシリーズを用いることで、ラズパイを用いて容易に加速度データを取得することができます。さらに、ラズパイ上での解析処理やデータ送信を組み合わせることで、手軽に遠隔の設備の状態監視などに活用できます。
また、ラズパイ対応センサシリーズは、屋外や工場などの厳しい環境下でも使用できるよう、センサ素子を堅牢な専用筐体に収めた業務用センサユニットとなっています。
本記事で使用する加速度センサにはLSM6DSRが搭載されており、筐体には防水・防塵対策が施されています。
使うもの
本記事で使用する機材は以下の通りです。
- Raspberry Pi(今回はRaspberry Pi 4 Model Bを使用)
- 4GPi
-
SORACOM IoT SIM(標準サイズ)
- SORACOM Harvestが有効化されたSIMを使用してください。
- センサ用4chインターフェース基板
- 加速度センサ
セットアップ
Raspberry Piと4GPiの初期設定および加速度センサを使用するための準備を行います。
ラズパイを起動し、こちら等を参考に4GPiのセットアップを実施してください。なお、セットアップ済みのOSイメージを使うことでこの手順を省略できます。
次に、SORACOMのAPNを登録します。
sudo nmcli con add type gsm apn soracom.io user sora password sora
続いて、加速度センサを使用するための設定を行います。/boot/firmware/config.txtの末尾に下記を追記してください。
dtoverlay=i2c-mux,pca9545,addr=0x73
dtoverlay=pca953x,pca9574,addr=0x21
dtparam=i2c_arm_baudrate=400000
I2Cを用い、センサデータを取得します。I2Cが有効化されていない場合は、sudo raspi-config を実行し、Interface Options からI2Cを有効化してください。
ラズパイを再起動してドライバをロードしてください。
再起動後、下記のようにI2Cのバスが生成されていることを確認してください。
mtx@raspberrypi:~ $ ls /dev/i2c*
/dev/i2c-1 /dev/i2c-21 /dev/i2c-23 /dev/i2c-25
/dev/i2c-20 /dev/i2c-22 /dev/i2c-24
プログラムの実装
ここでは、加速度センサからZ軸方向のデータを取得し、FFT解析によってピーク周波数とその大きさを算出し、SORACOM Harvestへ送信するPythonプログラムを作成します。
任意のディレクトリに移動し、以下を作成してください。
import requests,time,struct,numpy as np
from smbus2 import SMBus,i2c_msg
BUS,ADDR=22,0x6A
FFT_SIZE,FFT_REPEAT,ODR_HZ,CHUNK=1024,10,3333,64
WHO_AM_I,CTRL1_XL,CTRL3_C,CTRL10_C=0x0F,0x10,0x12,0x19
FIFO_CTRL1,FIFO_CTRL2,FIFO_CTRL3,FIFO_CTRL4=0x07,0x08,0x09,0x0A
FIFO_STATUS1,FIFO_STATUS2,FIFO_DATA_OUT_TAG=0x3A,0x3B,0x78
INTERNAL_FREQ_FINE,ACCEL_TAG=0x63,0x02
ODR_BITS={12.5:1,26:2,52:3,104:4,208:5,416:6,833:7,1667:8,3333:9,6667:10}
bus=SMBus(BUS)
def read_register(reg):
return bus.read_byte_data(ADDR,reg)
def write_register(reg,val):
bus.write_byte_data(ADDR,reg,val)
def bytes_to_int16(l,h):
return struct.unpack("<h",bytes([l,h]))[0]
def to_signed_int8(v):
return v-256 if v>127 else v
def get_fifo_level():
s1=read_register(FIFO_STATUS1)
s2=read_register(FIFO_STATUS2)
return ((s2&3)<<8)|s1
def read_fifo_samples(n):
write=i2c_msg.write(ADDR,[FIFO_DATA_OUT_TAG])
read=i2c_msg.read(ADDR,n*7)
bus.i2c_rdwr(write,read)
raw=list(read)
data=[]
for i in range(0,len(raw),7):
tag=(raw[i]>>3)&0x1F
x=bytes_to_int16(raw[i+1],raw[i+2])
y=bytes_to_int16(raw[i+3],raw[i+4])
z=bytes_to_int16(raw[i+5],raw[i+6])
data.append((tag,x,y,z))
return data
def collect_accel_z_samples(n):
write_register(FIFO_CTRL4,0)
time.sleep(0.01)
write_register(FIFO_CTRL4,6)
accel_z_samples=[]
while len(accel_z_samples)<n:
level=get_fifo_level()
if level==0:
continue
read_count=min(level,CHUNK,n-len(accel_z_samples))
for tag,_,_,z in read_fifo_samples(read_count):
if tag==ACCEL_TAG:
accel_z_samples.append(z)
return accel_z_samples
def calculate_fft_spectrum(samples):
samples=np.array(samples)
samples=(samples-np.mean(samples))*np.hanning(len(samples))
spectrum=np.abs(np.fft.rfft(samples))
spectrum[0]=0
return spectrum
def send_to_soracom(data):
requests.post("http://harvest.soracom.io",json=data).raise_for_status()
try:
if read_register(WHO_AM_I)!=0x6B:
raise RuntimeError("sensor NG")
write_register(CTRL3_C,0x44)
write_register(CTRL10_C,read_register(CTRL10_C)|0x20)
write_register(FIFO_CTRL4,0)
time.sleep(0.01)
write_register(FIFO_CTRL1,0)
write_register(FIFO_CTRL2,0)
odr_bits=ODR_BITS[ODR_HZ]
write_register(CTRL1_XL,odr_bits<<4)
write_register(FIFO_CTRL3,odr_bits)
sampling_freq_hz=ODR_HZ*(1+0.0015*to_signed_int8(read_register(INTERNAL_FREQ_FINE)))
fft_spectra=[]
for _ in range(FFT_REPEAT):
accel_z_samples=collect_accel_z_samples(FFT_SIZE)
fft_spectra.append(calculate_fft_spectrum(accel_z_samples))
average_spectrum=np.mean(fft_spectra,axis=0)
freqs=np.fft.rfftfreq(FFT_SIZE,1/sampling_freq_hz)
peak_index=int(np.argmax(average_spectrum))
peak_freq_z=float(freqs[peak_index])
peak_mag_z=float(average_spectrum[peak_index])
peak_mag_z_db = float(20*np.log10(max(peak_mag_z,1e-12)))
result={
"peak_freq_z_hz":round(peak_freq_z),
"peak_mag_z_db":round(peak_mag_z_db,2)
}
send_to_soracom(result)
print(result)
finally:
bus.close()
このプログラムでは、加速度センサから取得したZ軸データに対してFFT解析を行い、複数回のスペクトルを平均化します。その平均スペクトルからピーク周波数とピークの大きさを算出し、クラウドへ送信します。
本記事では、クラウドへ送信するデータ量を抑えつつ振動の特徴を把握するため、FFT結果全体ではなく、ピーク周波数とその大きさのみを送信対象としています。
ここでは、センサ素子 LSM6DSR をサンプリングレート3333Hzで動作させ、内蔵FIFO(First In First Out)を用いてデータをまとめて取得しています。
取得した加速度データは、1024点ごとにFFT解析し、この処理を10回繰り返して得られたスペクトルを平均化することで、測定ごとのバラつきを抑えています。
その後、平均化したスペクトルからピーク周波数とその大きさ(dB値に変換)を算出し、クラウドに送信しています。
これらの処理条件は以下のパラメータで調整できます。
ODR_HZ:サンプリングレート(12.5, 26, 52, 104, 208, 416, 833, 1667, 3333, 6667から選択できます。)
CHUNK:FIFOから一度に読み出すデータ数
FFT_SIZE:1回のFFTに使用するデータ数
FFT_REPEAT:FFTの実行回数(スペクトル平均回数)
最後に、Pythonモジュールを実行するための仮想環境を作成し、必要なパッケージをインストールします。
python3 -m venv venv
source venv/bin/activate
pip3 install requests numpy smbus2
インストールが完了したら、以下のコマンドを実行してスクリプトを起動します(仮想環境内で実行してください)。
python3 main.py
作成したスクリプトは仮想環境で実行してください。仮想環境を有効にするにはディレクトリを移動後、source venv/bin/activateを実行します。仮想環境を終了するにはdeactivateを実行します。
クラウドでのデータ確認
動作確認として、加速度センサをファンの上に置き、FFT解析結果をSORACOM Harvestへ送信して可視化しました。

15秒ごとに実行し、クラウド(SORACOM Harvest)上で下記のようにデータが確認できました。

送信したピーク周波数とピークの大きさをグラフ化すると、以下のようになります。赤がピーク周波数、青がピークの大きさです。

最初の2点はファン駆動中、3,4点目はファン停止中、5,6点目はファン再駆動時のデータです。
ファンの状態変化に応じて、ピーク周波数とピークの大きさが変化しており、振動の様子が変化していることを確認できました。
最後に
本記事では、ラズパイ対応センサシリーズの加速度センサを用いて、加速度データの取得からFFT解析、クラウドへの送信までを行う方法をご紹介しました。
ラズパイ対応センサシリーズを活用することで、現場設置を想定したシステムを手軽に構築できます。ぜひお試しください。

