LoginSignup
11
17

More than 5 years have passed since last update.

docker上でmosquittoで受信したjsonデータをdbとElasticsearchに保存する

Last updated at Posted at 2016-09-27

kibana.png

今回は個人的に使うのでお金かかるAWSIoTであれこれってのはなしで、owncloudとか入れて使ってるconohaのdockerマシン上で全て片付けます。
(手順は書きませんが一応外部に開けるのは特定のハイポートのみにしてます。mosquittoへはSSLで通信させる予定です)

今回、主に参考にしたとこを下記にまとめます。

mosquitto
mosquitto-ssl
paho-mqtt
pymongo
elasticseach
BulkAPI
Bulk helpers
how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python
elasticsearch-py

ちょっと説明

  • データの流れ raspberry pi ⇒ mosquitto ⇒ db、elasticseach⇒kibana
  • 今回はmysqlに放り込みましたがmongodbでも似たような手順で出来ます。
  • elasticseachにしてもkibanaにしてもほぼ初めて使うような物なのでとりあえず動く程度の物です。
  • kibana上で使うつもりはなかったのですが、GPS情報を取得して位置情報も送る前提なのでgeo_pointも使用します。
  • kibanaコンテナは(outofmemory?かも)いつの間にか止まっているので、何かしら処置する予定です。

コンテナ作成

検証しながらなので、docke-composeやスクリプトファイルは使わず、ほとんど手作業です。
簡単なのでコンテナをさくさく立てていきましょう。

busybox(mysql用)

任意のディレクトリ

コマンド.
docker pull busybox
docker run -i -t -v /var/lib/mysql --name busybox01 busybox /bin/sh

mysql

任意のディレクトリ

Dcokerfile.
FROM mysql
EXPOSE 3306
ENV container Docker
ENV MYSQL_ROOT_USER root
ENV MYSQL_ROOT_PASSWORD rootpass
ENV MYSQL_DATABASE log_db
ENV MYSQL_USER test
ENV MYSQL_PASSWORD testpass
コマンド.
docker build -f ./Dockerfile -t mysql:mosql --no-cache=true .
docker run -d --name mosdb01 --volumes-from busybox01 -e MYSQL_ROOT_PASSWORD=rootpass mysql:mosql

Elasticseach

任意のディレクトリ

コマンド.
docker pull elasticsearch
docker run --name moselasticsearch01 -d elasticsearch

Kibana

任意のディレクトリ

コマンド.
docker pull kibana
docker run --name moskibana01 --link moselasticsearch01:elasticsearch -d -p 5601:5601 kibana

mosquitto

任意のディレクトリ
このコンテナ上でいろいろやるのでたくさん入れてます。要らないものは削除してください。

Dockerfile.
FROM       ubuntu
MAINTAINER Ototo
ENV        container docker
RUN        apt-get -y update
RUN        apt-get install -y mosquitto
RUN        apt-get install -y openssl
RUN        apt-get install -y python
RUN        apt-get install -y python-pip
RUN        apt-get install -y mysql-client
RUN        apt-get install -y vim
RUN        pip install --upgrade pip
RUN        apt-get install -y curl
RUN        pip install elasticsearch
RUN        pip install pymongo
RUN        pip install paho-mqtt
RUN        pip install PyMySQL
EXPOSE     1883
ENTRYPOINT service mosquitto start && /bin/bash --login
コマンド.
docker build -f ./Dockerfile -t ubuntu:mosquitto --no-cache=true .
docker run --name mosquitto01 -t -i -d -v /root/mosquitto/mosquitto:/etc/mosquitto --link mosdb01 --link moselasticsearch01 -p 1883:1883  ubuntu:mosquitto /bin/bash

コンテナにはいります。

docker attach [conteinarID]

mysqlにログインします。

mysql -h mosdb01 -u root -p
Enter password: rootpass

とりあえずテーブルだけ作って出ます。

mysql>use log_db

mysql>CREATE TABLE `log_data` ( `id` int(11) NOT NULL AUTO_INCREMENT,
                                `date_time` datetime(1), 
                                `data` json NOT NULL,
                                PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

mysqll>exit

コンフィグを編集します。

vim /etc/mosquitto/mosquitto.conf

パスワードを有効にするため下記を追記

allow_anonymous false
password_file /etc/mosquitto/passwd

パスワードファイルの作成とユーザ、パスワードの設定

mosquitto_passwd -c /etc/mosquitto/passwd mqtt
Password: mqttpass
Reenter password: mqttpass

サービスのリスタート(最後にコンテナのリスタートでも良い)

service mosquitto restart

スクリプトファイルの作成

vim /etc/mosquitto/sub.py

スクリプト
ここでdbとelasticseachにぶち込んでます。例外処理入れてないので、好みの形で良いので書いた方が無難です。

sub.py
#!/usr/bin/env python
import paho.mqtt.client as mqtt
import pymysql.cursors
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime


username = "mqtt"
userpass = "mqttpass"
host = "localhost"
topic = "car/log"
eshost = "http://moselasticsearch01:9200"
esindex = "log"
estype = "carlog"


def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe(topic, qos=0)

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
    set_mysql(msg.payload)
    set_es(msg.payload)

#insert mysql
def set_mysql(data):
        connection = pymysql.connect(host= 'mosdb01',
                                     user='root',
                                     password='rootpass',
                                     db='log_db',
                                     charset='utf8',
                                     cursorclass=pymysql.cursors.DictCursor)
        with connection.cursor() as cursor:
            sql = "INSERT INTO log_data(date_time,data) VALUES ( %s, %s)"
            r = cursor.execute(sql, ( datetime.now().strftime( '%Y-%m-%d %H:%M:%S' ), data))
            print(r) # -> 1
            connection.commit()

#insert elasticseach
def set_es(data):
        es = Elasticsearch([eshost])
        actions = []
        action = {"_index": esindex,"_type": estype,"_source": data}
        actions.append(action)
        log = helpers.bulk(es, actions)

#main
if __name__ == '__main__':
        client = mqtt.Client()
        client.on_connect = on_connect
        client.on_message = on_message
        client.username_pw_set(username, password=userpass)
        client.connect(host,1883)
        client.loop_forever()

サービズの起動はbashログイン時にやらせるやっつけ仕事

vim ~/.bashrc

下記を追記

nohup python /etc/mosquitto/sub.py &

elasticseachにインデックスを登録するので下記ファイルを作成

index.json
{
  "mappings" : {
    "carlog" : {
      "properties" : {
        "battery_level" : {
          "type" : "float",
          "index" : "not_analyzed"
        },
        "gps_gga" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_gsa" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_gsv" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_rmc" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "location" : {
          "type" : "geo_point"
        },
        "oil_press" : {
          "type" : "float",
          "index" : "not_analyzed"
        },
        "oil_temp" : {
          "type" : "integer",
          "index" : "not_analyzed"
        },
        "timestamp" : {
          "type" : "date",
          "format": "YYYY-MM-dd HH:mm:ss"
        },
        "water_temp" : {
          "type" : "integer",
          "index" : "not_analyzed"
        }
      }
    }
  }
}

インデックスの登録

登録コマンド.
curl -XPUT moselasticsearch01:9200/log --data-binary @index.json

インデックスの登録が終わったらコンテナを出たらコンテナの再起動を行ってください。
ここまでで、docker上の作業は完了となります。

もし直にBulkAPIで登録するのであれば、コンテナ上で下記のファイル作成し、コマンドを実行することでで可能です。

log.json
{ "index" : {"_index" : "log","_type" : "carlog"}
{"timestamp":"2016-09-16 19:50:00" ,"battery_level":12.0, "location":{"lon":"144.1","lat":"43.5"}, "water_temp": 90, "oil_temp":80, "oil_press":2.0,  "gps_gsa":"$GPGSA", "gps_rmc":"$GPRMC", "gps_gga":"$GPGGA", "gps_gsv":"$GPGSV"}
コマンド.
curl -XPOST moselasticsearch01:9200/_bulk --data-binary @log.json

raspberry pi

piからコンテナにデータを投げます。
細かいことは抜きにして検証はpip install paho-mqtt実行後に下記スクリプトの実行します。

pub.py
#!/usr/bin/env python
import json
import paho.mqtt.client as mqtt
from datetime import datetime

#test_datqa
data = {"timestamp":"" ,"battery_level":0, "location":{"lon":"","lat":""}, "water_temp": 0, "oil_temp":0, "oil_press":0,  "gps_gsa":"", "gps_rmc":"", "gps_gga":"", "gps_gsv":""}

WTMP = 80
OTMP = 90
OPLS = 2.0
BLVL = 12.0
GSA = "$GPGSA"
GGA = "$GPGGA"
RMC ="$GPRMC"
GSV = "$GPGSV"
data["timestamp"]=datetime.now().strftime( '%Y-%m-%d %H:%M:%S' )
data["battery_level"] = BLVL
data["water_temp"] = WTMP
data["oil_temp"] = OTMP
data["oil_press"] = OPLS
data["gps_gsa"] = GSA
data["gps_gga"] = GGA
data["gps_rmc"] = RMC
data["gps_gsv"] = GSV
data["location"]["lon"] = "139.79"
data["location"]["lat"] = "35.67"
json_data = json.dumps(data)

username = "mqtt"
userpass = "mqttpass"
host = "mosquittoサーバのIP"
topic = "car/log"

#main
if __name__ == '__main__':
    client = mqtt.Client()
    client.username_pw_set(username, password=userpass)
    client.connect(host,1883, 60)
    client.publish(topic, payload=json_data, qos=1, retain=False)
    client.disconnect()

あとはkibanaにアクセスしてデータが来てるか確認しましょう。
discover.png

11
17
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
17