この記事はMarkLogic Advent Calendar 2017の23日目です。
はじめに
これまでMarkLogicとRaspberry Piを連携させるための下準備を行ってきました。今回から実際にRaspberry Pi3を使い始めます。
今回はRaspberry Pi3にGPSモジュールとSense HATをつなげて、位置情報とセンサー情報をMQTT+TLSでパブリッシュし、MarkLogicに登録してみます。
環境
サーバサイドは以下の環境を使用します。
環境 | バージョン |
---|---|
CentOS7 | 7.4.1708 |
Node.js | v8.9.1 |
npm | 5.5.1 |
Mosca | 2.7.0 |
MQTT.js | 2.14.0 |
MarkLogic9 | 9.0-3 |
MarkLogic Node.js Client API | 2.0.3 |
Raspberry Pi3側は以下の環境を使用します。
環境 | バージョン |
---|---|
Raspberry Pi3 Model B | |
RASPBIAN STRETCH WITH DESKTOP | November 2017 |
Python3 | 3.6.3 |
paho-mqtt | 1.3.1 |
GLOBALSAT BU-353S4 USB GPSレシーバー | - |
Sense HAT | 2.2.0 |
Sense HATとは
Sense HATはRaspberryPiのアドオンボードです。
Astro-piという、子供達が作成したRaspberryPiのコードを宇宙で走らせよう、というプロジェクトで使用されたボードです。以下のセンサー等を搭載しています。
- 8x8 RGBドットマトリクスLED
- 5ボタンジョイスティック
- ジャイロスコープ
- 加速度センサー
- 磁気センサー
- 温度計
- 気圧計
- 湿度計
今回はこれらのうち、温度、気圧、湿度の情報を取得して、GPSの座標情報と共にMarkLogicに送信しようと思います。
Python3の最新バージョンを導入する
Raspberry Pi3のPython3を最新化します。pyenvを使用します。
$ sudo apt-get install libssh-dev
$ git clone git://github.com/yyuu/pyenv.git ~/.pyenv
$ sudo vi ~/.bash_profile
export PYENV_ROOT="$HOME/.pyenv"
export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
$ sudo source ~/.pyenv
$ pyenv install 3.6.3
$ pyenv global 3.6.3
$ python3.6 -V
Python 3.6.3
Python用のMQTTクライアントをインストールする
Python用のMQTTクライアントであるpaho-mqttをインストールします。pipを使います。
$ pip install paho-mqtt
Raspberry Pi3でGPSモジュールを扱えるようにする
Raspberry Pi3にGPSモジュールを扱う方法はこちらをご覧下さい。
GPSで位置情報を取得し、MQTT+TLSでパブリッシュする
PythonでGPSの情報を取得し、MQTTでブローカーに送信する処理を作成します。1秒間隔で送信し続けます。
前回の記事を参考に、接続先のブローカーやMarkLogicのサブスクライバーを実行しておいてください。
import paho.mqtt.client as mqtt
from datetime import datetime
from gps3 import gps3
from time import sleep
import json
# GPSへの接続定義
gps_socket = gps3.GPSDSocket()
data_stream = gps3.DataStream()
gps_socket.connect()
gps_socket.watch()
# MQTTブローカーの情報
host = 'my-broker'
port = 8443
topic = 'raspi/topic1'
username = 'publisher'
password = 'publisher'
# MQTTブローカーへの接続処理
def on_connect(client, userdata, flags, result):
print("Connected result : " + str(result))
# GPSの情報を取得してディクショナリで返却する。
def get_gps_data():
for new_data in gps_socket:
if new_data:
data_stream.unpack(new_data)
time = data_stream.TPV['time']
lat = data_stream.TPV['lat']
long = data_stream.TPV['lon']
alt = data_stream.TPV['alt']
speed = data_stream.TPV['speed']
gpsDict = {
'time': time,
'lat' : lat,
'long' : long,
'alt' : alt,
'speed': speed
}
return gpsDict
if __name__ == '__main__':
# MQTTブローカーに接続する。TLSでユーザ認証あり。
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.tls_set("my_ca.crt")
client.connect(host, port=port, keepalive=60)
# 送信するメッセージ
dateTime = str(datetime.now())
dictMessage = {
"username": username,
"date": dateTime
}
# 1秒間隔でMQTTにGPS情報を送信する。
while True:
gpsDict = get_gps_data()
dictMessage.update({"gps":gpsDict})
print(dictMessage)
client.publish(topic, json.dumps(dictMessage))
sleep(1)
MarkLogicで確認する
上記のコードを実行すると、MQTTブローカーにGPSのデータを送信し続けます。以下のようなデータになります。
MarkLogic用のサブスクライバーが動作していれば、これらを保存してくれます。
{'username': 'publisher',
'date': '2017-12-13 22:49:41.537968',
'gps': {
'time': '2017-12-13T13:49:46.000Z',
'lat': 35.XXXXXXXXX,
'long': 139.XXXXXXXX,
'alt': 17.355,
'speed': 0.513}}
MarkLogicで確認してみます。WebブラウザでQueryConsole(8000番ポート)にアクセスし、以下のXQueryを実行します。
xquery version "1.0-ml";
for $i in cts:uris("", (), cts:directory-query("/mqtt/","infinity"))
let $mqttDoc := fn:doc($i)
let $topic := $mqttDoc/topic
return if($topic eq 'raspi/topic1') then($mqttDoc) else()
結果は以下のようになります。
{
"receiveDate": 1513113452231,
"topic": "raspi/topic1",
"message": {
"username": "publisher",
"date": "2017-12-13 22:49:41.537968",
"gps": {
"time": "2017-12-13T13:49:46.000Z",
"lat": 35.XXXXXXXXX,
"long": 139.XXXXXXXXX,
"alt": 62.406,
"speed": 0.106
}
}
}
Sense-HATのソフトウェアをインストール
続いてSense-HATのデータを取得してみます。
Sense-HATを扱うために必要なソフトウェアをインストールします。
$ pip install sense-hat
なお、RTIMUが見つからない等のエラーが発生した場合、以下の手順でインストールします。
以下の記事を参考にさせて頂きました。ありがとうございます。
らずぱいで、MPU-9250 - 9軸センサモジュール(3軸加速度+3軸ジャイロ+3軸コンパス)
$ sudo apt-get install -y cmake libqt4-dev
$ git clone https://github.com/RPi-Distro/RTIMULib.git
$ mkdir RTIMULib/Linux/build
$ cd RTIMULib/Linux/build
$ cmake ..
$ make -j4
$ cd ~/RTIMULib2/Linux/python
$ python setup.py build
$ sudo python setup.py install
動作確認
以下のような簡単なコードを作成し実行します。
Sense HATのLEDに、"Hello world!"と表示されれば成功です。
from sense_hat import SenseHat
sense = SenseHat()
sense.show_message("Hello world!")
温度、湿度、気圧を計測してみる
さっそく、温度、湿度、気圧を計測してみます。
Sense HATのAPIリファレンスはこちらです。
from sense_hat import SenseHat
sense = SenseHat()
t = sense.get_temperature()
p = sense.get_pressure()
h = sense.get_humidity()
msg = "Temperature = %s, Pressure=%s, Humidity=%s" % (t,p,h)
print(msg)
以下のように表示されます。
Temperature = 26.89080810546875, Pressure=1018.50341796875, Humidity=37.405784606933594
GPSの情報と共にMarkLogicに送信する
先ほど作成したGPSの情報をパブリッシュするコードに、温度、湿度、気圧も追加してみます。
from time import sleep
import paho.mqtt.client as mqtt
from datetime import datetime
from gps3 import gps3
import json
from sense_hat import SenseHat
sense = SenseHat()
#GPSへの接続定義
gps_socket = gps3.GPSDSocket()
data_stream = gps3.DataStream()
gps_socket.connect()
gps_socket.watch()
# MQTTブローカーの情報
host = 'my-broker'
port = 8443
topic = 'raspi/topic1'
username = 'publisher'
password = 'publisher'
# MQTTブローカーへの接続処理
def on_connect(client, userdata, flags, result):
print("Connected result : " + str(result))
# GPSの情報を取得してディクショナリで返却する。
def get_gps_data():
for new_data in gps_socket:
if new_data:
data_stream.unpack(new_data)
time = data_stream.TPV['time']
lat = data_stream.TPV['lat']
long = data_stream.TPV['lon']
alt = data_stream.TPV['alt']
speed = data_stream.TPV['speed']
gpsDict = {
'time': time,
'lat' : lat,
'long' : long,
'alt' : alt,
'speed': speed
}
return gpsDict
# Sense HATの温度、気圧、湿度を取得してディクショナリで返却する。
def get_sense_hat():
t = sense.get_temperature()
p = sense.get_pressure()
h = sense.get_humidity()
senseHatDict = {
'temperature': t,
'pressure': p,
'humidity': h
}
return senseHatDict
if __name__ == '__main__':
# MQTTブローカーに接続する。TLSでユーザ認証あり。
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.tls_set("my_ca.crt")
client.connect(host, port=port, keepalive=60)
# 送信するメッセージ
dateTime = str(datetime.now())
dictMessage = {
"username": username,
"date": dateTime
}
# 1秒間隔でMQTTにGPS情報を送信する。
while True:
gpsDict = get_gps_data()
senseHatDict = get_sense_hat()
dictMessage.update({"gps":gpsDict})
dictMessage.update({"senseHat":senseHatDict})
print(dictMessage)
client.publish(topic, json.dumps(dictMessage))
sleep(1)
MarkLogicには以下のように格納されました。
{
"receiveDate": 1513200853913,
"topic": "raspi/topic1",
"message": {
"username": "publisher",
"date": "2017-12-14 06:34:13.893559",
"gps": {
"time": "2017-12-13T21:34:20.330Z",
"lat": 35.XXXXXXXXX,
"long": 139.XXXXXXXXX,
"alt": 62.406,
"speed": 0.106
},
"senseHat": {
"temperature": 27.6654415130615,
"pressure": 1018.63134765625,
"humidity": 37.4350891113281
}
}
}
おわりに
ようやくRaspberry Pi3からMarkLogicにデータを送れるようになりました。
GPSとセンサー情報をMarkLogicに蓄積することで、データの活用の幅が広がりそうです。
次回は、MarkLogicのリアルタイムアラート機能を活用して、センサー情報をリアルタイムに監視する方法を紹介してみます。