Python
GPS
mqtt
Raspberrypi3
MarkLogic
MarkLogicDay 23

MarkLogicでRaspberryPi3のセンサー情報を取り込んでみよう(7)Raspberry PiからMarkLogicにGPSやセンサーデータを送信する

この記事はMarkLogic Advent Calendar 2017の23日目です。

はじめに

これまでMarkLogicとRaspberry Piを連携させるための下準備を行ってきました。今回から実際にRaspberry Pi3を使い始めます。
今回はRaspberry Pi3にGPSモジュールとSense HATをつなげて、位置情報とセンサー情報をMQTT+TLSでパブリッシュし、MarkLogicに登録してみます。

環境

サーバサイドは以下の環境を使用します。

環境 バージョン
CentOS7 7.4.1708
Node.js v8.9.1
npm 5.5.1
Mosca 2.7.0
MQTT.js 2.14.0
MarkLogic9 9.0-3
MarkLogic Node.js Client API 2.0.3

Raspberry Pi3側は以下の環境を使用します。

環境 バージョン
Raspberry Pi3 Model B
RASPBIAN STRETCH WITH DESKTOP November 2017
Python3 3.6.3
paho-mqtt 1.3.1
GLOBALSAT BU-353S4 USB GPSレシーバー -
Sense HAT 2.2.0

Sense HATとは

Sense HATはRaspberryPiのアドオンボードです。
Astro-piという、子供達が作成したRaspberryPiのコードを宇宙で走らせよう、というプロジェクトで使用されたボードです。以下のセンサー等を搭載しています。

  • 8x8 RGBドットマトリクスLED
  • 5ボタンジョイスティック
  • ジャイロスコープ
  • 加速度センサー
  • 磁気センサー
  • 温度計
  • 気圧計
  • 湿度計

今回はこれらのうち、温度、気圧、湿度の情報を取得して、GPSの座標情報と共にMarkLogicに送信しようと思います。

Python3の最新バージョンを導入する

Raspberry Pi3のPython3を最新化します。pyenvを使用します。

$ sudo apt-get install libssh-dev
$ git clone git://github.com/yyuu/pyenv.git ~/.pyenv
$ sudo vi ~/.bash_profile

export PYENV_ROOT="$HOME/.pyenv"
export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"

$ sudo source ~/.pyenv
$ pyenv install 3.6.3
$ pyenv global 3.6.3

$ python3.6 -V
Python 3.6.3

Python用のMQTTクライアントをインストールする

Python用のMQTTクライアントであるpaho-mqttをインストールします。pipを使います。

$ pip install paho-mqtt

Raspberry Pi3でGPSモジュールを扱えるようにする

Raspberry Pi3にGPSモジュールを扱う方法はこちらをご覧下さい。

GPSで位置情報を取得し、MQTT+TLSでパブリッシュする

PythonでGPSの情報を取得し、MQTTでブローカーに送信する処理を作成します。1秒間隔で送信し続けます。

前回の記事を参考に、接続先のブローカーやMarkLogicのサブスクライバーを実行しておいてください。

import paho.mqtt.client as mqtt
from datetime import datetime
from gps3 import gps3
from time import sleep
import json

# GPSへの接続定義
gps_socket = gps3.GPSDSocket()
data_stream = gps3.DataStream()
gps_socket.connect()
gps_socket.watch()

# MQTTブローカーの情報
host = 'my-broker'
port = 8443
topic = 'raspi/topic1'
username = 'publisher'
password = 'publisher'

# MQTTブローカーへの接続処理
def on_connect(client, userdata, flags, result):
  print("Connected result : " + str(result))

# GPSの情報を取得してディクショナリで返却する。
def get_gps_data():
  for new_data in gps_socket:
    if new_data:
      data_stream.unpack(new_data)
      time = data_stream.TPV['time']
      lat = data_stream.TPV['lat']
      long = data_stream.TPV['lon']
      alt = data_stream.TPV['alt']
      speed = data_stream.TPV['speed']
      gpsDict = {
        'time': time,
        'lat' : lat,
        'long' : long,
        'alt' : alt,
        'speed': speed
      }
      return gpsDict

if __name__ == '__main__':
  # MQTTブローカーに接続する。TLSでユーザ認証あり。
  client = mqtt.Client(protocol=mqtt.MQTTv311)
  client.username_pw_set(username, password)
  client.on_connect = on_connect
  client.tls_set("my_ca.crt")
  client.connect(host, port=port, keepalive=60)

  # 送信するメッセージ
  dateTime = str(datetime.now())
  dictMessage = {
    "username": username,
    "date": dateTime
  }

  # 1秒間隔でMQTTにGPS情報を送信する。
  while True:
    gpsDict = get_gps_data()
    dictMessage.update({"gps":gpsDict})
    print(dictMessage)
    client.publish(topic, json.dumps(dictMessage))
    sleep(1)

MarkLogicで確認する

上記のコードを実行すると、MQTTブローカーにGPSのデータを送信し続けます。以下のようなデータになります。
MarkLogic用のサブスクライバーが動作していれば、これらを保存してくれます。

{'username': 'publisher',
 'date': '2017-12-13 22:49:41.537968',
 'gps': {
   'time': '2017-12-13T13:49:46.000Z',
   'lat': 35.XXXXXXXXX,
   'long': 139.XXXXXXXX,
   'alt': 17.355,
   'speed': 0.513}}

MarkLogicで確認してみます。WebブラウザでQueryConsole(8000番ポート)にアクセスし、以下のXQueryを実行します。

xquery version "1.0-ml";

for $i in cts:uris("", (), cts:directory-query("/mqtt/","infinity"))
  let $mqttDoc := fn:doc($i)  
  let $topic := $mqttDoc/topic

  return if($topic eq 'raspi/topic1') then($mqttDoc) else()

結果は以下のようになります。

{
  "receiveDate": 1513113452231, 
  "topic": "raspi/topic1", 
  "message": {
    "username": "publisher", 
    "date": "2017-12-13 22:49:41.537968", 
    "gps": {
      "time": "2017-12-13T13:49:46.000Z", 
      "lat": 35.XXXXXXXXX, 
      "long": 139.XXXXXXXXX, 
      "alt": 62.406, 
      "speed": 0.106
    }
  }
}

Sense-HATのソフトウェアをインストール

続いてSense-HATのデータを取得してみます。
Sense-HATを扱うために必要なソフトウェアをインストールします。

$ pip install sense-hat

なお、RTIMUが見つからない等のエラーが発生した場合、以下の手順でインストールします。
以下の記事を参考にさせて頂きました。ありがとうございます。
らずぱいで、MPU-9250 - 9軸センサモジュール(3軸加速度+3軸ジャイロ+3軸コンパス)

$ sudo apt-get install -y cmake libqt4-dev
$ git clone https://github.com/RPi-Distro/RTIMULib.git
$ mkdir RTIMULib/Linux/build
$ cd RTIMULib/Linux/build
$ cmake ..
$ make -j4
$ cd ~/RTIMULib2/Linux/python
$ python setup.py build
$ sudo python setup.py install

動作確認

以下のような簡単なコードを作成し実行します。
Sense HATのLEDに、"Hello world!"と表示されれば成功です。

from sense_hat import SenseHat

sense = SenseHat()

sense.show_message("Hello world!")

温度、湿度、気圧を計測してみる

さっそく、温度、湿度、気圧を計測してみます。
Sense HATのAPIリファレンスはこちらです。

from sense_hat import SenseHat
sense = SenseHat()

t = sense.get_temperature()
p = sense.get_pressure()
h = sense.get_humidity()

msg = "Temperature = %s, Pressure=%s, Humidity=%s" % (t,p,h)
print(msg)

以下のように表示されます。

Temperature = 26.89080810546875, Pressure=1018.50341796875, Humidity=37.405784606933594

GPSの情報と共にMarkLogicに送信する

先ほど作成したGPSの情報をパブリッシュするコードに、温度、湿度、気圧も追加してみます。

from time import sleep
import paho.mqtt.client as mqtt
from datetime import datetime
from gps3 import gps3
import json
from sense_hat import SenseHat
sense = SenseHat()

#GPSへの接続定義
gps_socket = gps3.GPSDSocket()
data_stream = gps3.DataStream()
gps_socket.connect()
gps_socket.watch()

# MQTTブローカーの情報
host = 'my-broker'
port = 8443
topic = 'raspi/topic1'
username = 'publisher'
password = 'publisher'

# MQTTブローカーへの接続処理
def on_connect(client, userdata, flags, result):
  print("Connected result : " + str(result))

# GPSの情報を取得してディクショナリで返却する。
def get_gps_data():
  for new_data in gps_socket:
    if new_data:
      data_stream.unpack(new_data)
      time = data_stream.TPV['time']
      lat = data_stream.TPV['lat']
      long = data_stream.TPV['lon']
      alt = data_stream.TPV['alt']
      speed = data_stream.TPV['speed']
      gpsDict = {
        'time': time,
        'lat' : lat,
        'long' : long,
        'alt' : alt,
        'speed': speed
      }
      return gpsDict

# Sense HATの温度、気圧、湿度を取得してディクショナリで返却する。
def get_sense_hat():
  t = sense.get_temperature()
  p = sense.get_pressure()
  h = sense.get_humidity()
  senseHatDict = {
    'temperature': t,
    'pressure': p,
    'humidity': h
  }
  return senseHatDict

if __name__ == '__main__':
  # MQTTブローカーに接続する。TLSでユーザ認証あり。
  client = mqtt.Client(protocol=mqtt.MQTTv311)
  client.username_pw_set(username, password)
  client.on_connect = on_connect
  client.tls_set("my_ca.crt")
  client.connect(host, port=port, keepalive=60)

  # 送信するメッセージ
  dateTime = str(datetime.now())
  dictMessage = {
    "username": username,
    "date": dateTime
  }

  # 1秒間隔でMQTTにGPS情報を送信する。
  while True:
    gpsDict = get_gps_data()
    senseHatDict = get_sense_hat()
    dictMessage.update({"gps":gpsDict})
    dictMessage.update({"senseHat":senseHatDict})
    print(dictMessage)
    client.publish(topic, json.dumps(dictMessage))
    sleep(1)

MarkLogicには以下のように格納されました。

{
 "receiveDate": 1513200853913, 
 "topic": "raspi/topic1", 
 "message": {
  "username": "publisher", 
  "date": "2017-12-14 06:34:13.893559", 
 "gps": {
  "time": "2017-12-13T21:34:20.330Z", 
  "lat": 35.XXXXXXXXX, 
  "long": 139.XXXXXXXXX, 
  "alt": 62.406, 
  "speed": 0.106
 }, 
 "senseHat": {
  "temperature": 27.6654415130615, 
  "pressure": 1018.63134765625, 
  "humidity": 37.4350891113281
  }
 }
}

おわりに

ようやくRaspberry Pi3からMarkLogicにデータを送れるようになりました。
GPSとセンサー情報をMarkLogicに蓄積することで、データの活用の幅が広がりそうです。
次回は、MarkLogicのリアルタイムアラート機能を活用して、センサー情報をリアルタイムに監視する方法を紹介してみます。