やりたいこと
pythonでmqttプロトコルでDr.Sum Data Funnelにデータを送ってみます。
Dr.Sum Data Funnelとは
Dr.Sum Data Funnelは、センサデータなどをリアルタイムに収集、加工するためのツールです。
Message Brokerを内包しているため、python側はそこに向けてデータをpublishするだけでOKです。
subscribeはData Funnel Server側で行います。
詳しくはマニュアルを参照
データを送ってみる!
paho-mqttのインストール
pythonでmqttでのデータ送信を行うためにはpaho-mqttというライブラリが必要です。
$pip install paho-mqtt
コード
アクセス情報を設定して、publishしています。
デフォルトのポートは以下を参照
ポートについて:
import numpy as np
import paho.mqtt.client as mqtt
import json
from time import sleep
from datetime import datetime
# --------------------------------------------------------
HOST = "localhost" # Data Funnelがインストールしてあるサーバのホスト名orIPアドレスを指定
PORT = 1883 # Massage Brokerサーバのデフォルトのポート
USERNAME = "funnel" # ユーザ名
PASSWORD = "****" #パスワード
KEEP_ALIVE = 60
TOPIC = "test"
data = {
"sensorId" : 'A0001'
}
# --------------------------------------------------------
def publish_count(client, topic):
count = 1
amp = 1
numSampling = 16
while(True):
# データの生成
value = amp * np.sin(np.pi*count/numSampling)
value = round(value,2)
data['value'] = value
data['generateTime'] = datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")
# データをパブリッシュ
message = json.dumps(data)
rc = client.publish(topic, message)
# 正常にpublishできたかステータスで判断
if( rc[0] == 0):
print (str(count).zfill(3) + ': ' + message)
else:
error_msg = 'トピックのパブリッシュ時にエラーが発生しています。' + '\nstatus code ==> ' + str(rc[0])
print(error_msg)
raise Exception(error_msg)
count = count + 1
sleep(1)
if __name__ == '__main__':
initFlag = True # mqtt.Clientインスタンス生成~connectまでは一回だけ行う。
while(True):
try:
if(initFlag):
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.username_pw_set(USERNAME, PASSWORD)
client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)
initFlag = False
publish_count(client,TOPIC)
except Exception as e:
print(e)
sleep(60) #60秒待機
client.disconnect() # つなぎ直してエラーから復帰
client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)
実行結果
生成したデータと時刻を出力しています。
001: {"sensorId": "A0001", "value": 0.2, "generateTime": "2021/11/16 17:18:54.409496"}
002: {"sensorId": "A0001", "value": 0.38, "generateTime": "2021/11/16 17:18:55.415888"}
003: {"sensorId": "A0001", "value": 0.56, "generateTime": "2021/11/16 17:18:56.423402"}
004: {"sensorId": "A0001", "value": 0.71, "generateTime": "2021/11/16 17:18:57.430556"}
005: {"sensorId": "A0001", "value": 0.83, "generateTime": "2021/11/16 17:18:58.439436"}
006: {"sensorId": "A0001", "value": 0.92, "generateTime": "2021/11/16 17:18:59.446920"}
007: {"sensorId": "A0001", "value": 0.98, "generateTime": "2021/11/16 17:19:00.455045"}
008: {"sensorId": "A0001", "value": 1.0, "generateTime": "2021/11/16 17:19:01.463581"}
009: {"sensorId": "A0001", "value": 0.98, "generateTime": "2021/11/16 17:19:02.476018"}
010: {"sensorId": "A0001", "value": 0.92, "generateTime": "2021/11/16 17:19:03.483368"}
011: {"sensorId": "A0001", "value": 0.83, "generateTime": "2021/11/16 17:19:04.488869"}
012: {"sensorId": "A0001", "value": 0.71, "generateTime": "2021/11/16 17:19:05.495719"}
013: {"sensorId": "A0001", "value": 0.56, "generateTime": "2021/11/16 17:19:06.503537"}
014: {"sensorId": "A0001", "value": 0.38, "generateTime": "2021/11/16 17:19:07.512592"}
015: {"sensorId": "A0001", "value": 0.2, "generateTime": "2021/11/16 17:19:08.521854"}
016: {"sensorId": "A0001", "value": 0.0, "generateTime": "2021/11/16 17:19:09.530849"}
017: {"sensorId": "A0001", "value": -0.2, "generateTime": "2021/11/16 17:19:10.542617"}
...
まとめ
pythonでデータをDr.Sum Data FunnelにMQTTで送ってみました。
次回はDr.Sum Data Funnel側でデータを受け取って、受け取ったデータをDr.Sumに溜めていこうと思います。