今回は個人的に使うのでお金かかるAWSIoTであれこれってのはなしで、owncloudとか入れて使ってるconohaのdockerマシン上で全て片付けます。
(手順は書きませんが一応外部に開けるのは特定のハイポートのみにしてます。mosquittoへはSSLで通信させる予定です)
#今回、主に参考にしたとこを下記にまとめます。
mosquitto
mosquitto-ssl
paho-mqtt
pymongo
elasticseach
BulkAPI
Bulk helpers
how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python
elasticsearch-py
#ちょっと説明
- データの流れ raspberry pi ⇒ mosquitto ⇒ db、elasticseach⇒kibana
- 今回はmysqlに放り込みましたがmongodbでも似たような手順で出来ます。
- elasticseachにしてもkibanaにしてもほぼ初めて使うような物なのでとりあえず動く程度の物です。
- kibana上で使うつもりはなかったのですが、GPS情報を取得して位置情報も送る前提なのでgeo_pointも使用します。
- kibanaコンテナは(outofmemory?かも)いつの間にか止まっているので、何かしら処置する予定です。
#コンテナ作成
検証しながらなので、docke-composeやスクリプトファイルは使わず、ほとんど手作業です。
簡単なのでコンテナをさくさく立てていきましょう。
##busybox(mysql用)
任意のディレクトリ
docker pull busybox
docker run -i -t -v /var/lib/mysql --name busybox01 busybox /bin/sh
##mysql
任意のディレクトリ
FROM mysql
EXPOSE 3306
ENV container Docker
ENV MYSQL_ROOT_USER root
ENV MYSQL_ROOT_PASSWORD rootpass
ENV MYSQL_DATABASE log_db
ENV MYSQL_USER test
ENV MYSQL_PASSWORD testpass
docker build -f ./Dockerfile -t mysql:mosql --no-cache=true .
docker run -d --name mosdb01 --volumes-from busybox01 -e MYSQL_ROOT_PASSWORD=rootpass mysql:mosql
##Elasticseach
任意のディレクトリ
docker pull elasticsearch
docker run --name moselasticsearch01 -d elasticsearch
##Kibana
任意のディレクトリ
docker pull kibana
docker run --name moskibana01 --link moselasticsearch01:elasticsearch -d -p 5601:5601 kibana
##mosquitto
任意のディレクトリ
このコンテナ上でいろいろやるのでたくさん入れてます。要らないものは削除してください。
FROM ubuntu
MAINTAINER Ototo
ENV container docker
RUN apt-get -y update
RUN apt-get install -y mosquitto
RUN apt-get install -y openssl
RUN apt-get install -y python
RUN apt-get install -y python-pip
RUN apt-get install -y mysql-client
RUN apt-get install -y vim
RUN pip install --upgrade pip
RUN apt-get install -y curl
RUN pip install elasticsearch
RUN pip install pymongo
RUN pip install paho-mqtt
RUN pip install PyMySQL
EXPOSE 1883
ENTRYPOINT service mosquitto start && /bin/bash --login
docker build -f ./Dockerfile -t ubuntu:mosquitto --no-cache=true .
docker run --name mosquitto01 -t -i -d -v /root/mosquitto/mosquitto:/etc/mosquitto --link mosdb01 --link moselasticsearch01 -p 1883:1883 ubuntu:mosquitto /bin/bash
コンテナにはいります。
docker attach [conteinarID]
mysqlにログインします。
mysql -h mosdb01 -u root -p
Enter password: rootpass
とりあえずテーブルだけ作って出ます。
mysql>use log_db
mysql>CREATE TABLE `log_data` ( `id` int(11) NOT NULL AUTO_INCREMENT,
`date_time` datetime(1),
`data` json NOT NULL,
PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
mysqll>exit
コンフィグを編集します。
vim /etc/mosquitto/mosquitto.conf
パスワードを有効にするため下記を追記
allow_anonymous false
password_file /etc/mosquitto/passwd
パスワードファイルの作成とユーザ、パスワードの設定
mosquitto_passwd -c /etc/mosquitto/passwd mqtt
Password: mqttpass
Reenter password: mqttpass
サービスのリスタート(最後にコンテナのリスタートでも良い)
service mosquitto restart
スクリプトファイルの作成
vim /etc/mosquitto/sub.py
スクリプト
ここでdbとelasticseachにぶち込んでます。例外処理入れてないので、好みの形で良いので書いた方が無難です。
#!/usr/bin/env python
import paho.mqtt.client as mqtt
import pymysql.cursors
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
username = "mqtt"
userpass = "mqttpass"
host = "localhost"
topic = "car/log"
eshost = "http://moselasticsearch01:9200"
esindex = "log"
estype = "carlog"
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe(topic, qos=0)
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
set_mysql(msg.payload)
set_es(msg.payload)
#insert mysql
def set_mysql(data):
connection = pymysql.connect(host= 'mosdb01',
user='root',
password='rootpass',
db='log_db',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
sql = "INSERT INTO log_data(date_time,data) VALUES ( %s, %s)"
r = cursor.execute(sql, ( datetime.now().strftime( '%Y-%m-%d %H:%M:%S' ), data))
print(r) # -> 1
connection.commit()
#insert elasticseach
def set_es(data):
es = Elasticsearch([eshost])
actions = []
action = {"_index": esindex,"_type": estype,"_source": data}
actions.append(action)
log = helpers.bulk(es, actions)
#main
if __name__ == '__main__':
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username, password=userpass)
client.connect(host,1883)
client.loop_forever()
サービズの起動はbashログイン時にやらせるやっつけ仕事
vim ~/.bashrc
下記を追記
nohup python /etc/mosquitto/sub.py &
elasticseachにインデックスを登録するので下記ファイルを作成
{
"mappings" : {
"carlog" : {
"properties" : {
"battery_level" : {
"type" : "float",
"index" : "not_analyzed"
},
"gps_gga" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_gsa" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_gsv" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_rmc" : {
"type" : "string",
"index" : "not_analyzed"
},
"location" : {
"type" : "geo_point"
},
"oil_press" : {
"type" : "float",
"index" : "not_analyzed"
},
"oil_temp" : {
"type" : "integer",
"index" : "not_analyzed"
},
"timestamp" : {
"type" : "date",
"format": "YYYY-MM-dd HH:mm:ss"
},
"water_temp" : {
"type" : "integer",
"index" : "not_analyzed"
}
}
}
}
}
インデックスの登録
curl -XPUT moselasticsearch01:9200/log --data-binary @index.json
インデックスの登録が終わったらコンテナを出たらコンテナの再起動を行ってください。
ここまでで、docker上の作業は完了となります。
もし直にBulkAPIで登録するのであれば、コンテナ上で下記のファイル作成し、コマンドを実行することでで可能です。
{ "index" : {"_index" : "log","_type" : "carlog"}
{"timestamp":"2016-09-16 19:50:00" ,"battery_level":12.0, "location":{"lon":"144.1","lat":"43.5"}, "water_temp": 90, "oil_temp":80, "oil_press":2.0, "gps_gsa":"$GPGSA", "gps_rmc":"$GPRMC", "gps_gga":"$GPGGA", "gps_gsv":"$GPGSV"}
curl -XPOST moselasticsearch01:9200/_bulk --data-binary @log.json
#raspberry pi
piからコンテナにデータを投げます。
細かいことは抜きにして検証はpip install paho-mqtt
実行後に下記スクリプトの実行します。
#!/usr/bin/env python
import json
import paho.mqtt.client as mqtt
from datetime import datetime
#test_datqa
data = {"timestamp":"" ,"battery_level":0, "location":{"lon":"","lat":""}, "water_temp": 0, "oil_temp":0, "oil_press":0, "gps_gsa":"", "gps_rmc":"", "gps_gga":"", "gps_gsv":""}
WTMP = 80
OTMP = 90
OPLS = 2.0
BLVL = 12.0
GSA = "$GPGSA"
GGA = "$GPGGA"
RMC ="$GPRMC"
GSV = "$GPGSV"
data["timestamp"]=datetime.now().strftime( '%Y-%m-%d %H:%M:%S' )
data["battery_level"] = BLVL
data["water_temp"] = WTMP
data["oil_temp"] = OTMP
data["oil_press"] = OPLS
data["gps_gsa"] = GSA
data["gps_gga"] = GGA
data["gps_rmc"] = RMC
data["gps_gsv"] = GSV
data["location"]["lon"] = "139.79"
data["location"]["lat"] = "35.67"
json_data = json.dumps(data)
username = "mqtt"
userpass = "mqttpass"
host = "mosquittoサーバのIP"
topic = "car/log"
#main
if __name__ == '__main__':
client = mqtt.Client()
client.username_pw_set(username, password=userpass)
client.connect(host,1883, 60)
client.publish(topic, payload=json_data, qos=1, retain=False)
client.disconnect()