LoginSignup
3
0

More than 1 year has passed since last update.

【Dr.Sum Data Funnel】pythonでDr.Sum Data FunnelにMQTTでデータを送ってみた

Last updated at Posted at 2021-11-19

やりたいこと

pythonでmqttプロトコルでDr.Sum Data Funnelにデータを送ってみます。
画像2.png

Dr.Sum Data Funnelとは

Dr.Sum Data Funnelは、センサデータなどをリアルタイムに収集、加工するためのツールです。
Message Brokerを内包しているため、python側はそこに向けてデータをpublishするだけでOKです。
subscribeはData Funnel Server側で行います。
詳しくはマニュアルを参照

データを送ってみる!

paho-mqttのインストール

pythonでmqttでのデータ送信を行うためにはpaho-mqttというライブラリが必要です。

$pip install paho-mqtt

コード

アクセス情報を設定して、publishしています。
デフォルトのポートは以下を参照
ポートについて:

import numpy as np
import paho.mqtt.client as mqtt
import json
from time import sleep
from datetime import datetime

#  --------------------------------------------------------

HOST = "localhost" # Data Funnelがインストールしてあるサーバのホスト名orIPアドレスを指定
PORT = 1883 #  Massage Brokerサーバのデフォルトのポート
USERNAME = "funnel" # ユーザ名
PASSWORD = "****" #パスワード
KEEP_ALIVE = 60 
TOPIC = "test"

data = {
    "sensorId" : 'A0001'
}
#  --------------------------------------------------------

def publish_count(client, topic):
  count = 1

  amp = 1
  numSampling = 16

  while(True):
    # データの生成
    value = amp * np.sin(np.pi*count/numSampling)
    value = round(value,2)

    data['value'] = value
    data['generateTime'] = datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")

    # データをパブリッシュ
    message = json.dumps(data)
    rc = client.publish(topic, message)

    # 正常にpublishできたかステータスで判断
    if( rc[0] == 0): 
      print (str(count).zfill(3) + ': ' + message)
    else: 
      error_msg = 'トピックのパブリッシュ時にエラーが発生しています。' + '\nstatus code ==> ' + str(rc[0])
      print(error_msg)
      raise Exception(error_msg)

    count = count + 1
    sleep(1)

if __name__ == '__main__':
  initFlag = True # mqtt.Clientインスタンス生成~connectまでは一回だけ行う。
  while(True):
    try:
      if(initFlag):
        client = mqtt.Client(protocol=mqtt.MQTTv311)
        client.username_pw_set(USERNAME, PASSWORD)
        client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)
        initFlag = False

      publish_count(client,TOPIC)
    except Exception as e:
      print(e)
      sleep(60) #60秒待機
      client.disconnect() # つなぎ直してエラーから復帰
      client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)

実行結果

生成したデータと時刻を出力しています。

001: {"sensorId": "A0001", "value": 0.2, "generateTime": "2021/11/16 17:18:54.409496"}
002: {"sensorId": "A0001", "value": 0.38, "generateTime": "2021/11/16 17:18:55.415888"}
003: {"sensorId": "A0001", "value": 0.56, "generateTime": "2021/11/16 17:18:56.423402"}
004: {"sensorId": "A0001", "value": 0.71, "generateTime": "2021/11/16 17:18:57.430556"}
005: {"sensorId": "A0001", "value": 0.83, "generateTime": "2021/11/16 17:18:58.439436"}
006: {"sensorId": "A0001", "value": 0.92, "generateTime": "2021/11/16 17:18:59.446920"}
007: {"sensorId": "A0001", "value": 0.98, "generateTime": "2021/11/16 17:19:00.455045"}
008: {"sensorId": "A0001", "value": 1.0, "generateTime": "2021/11/16 17:19:01.463581"}
009: {"sensorId": "A0001", "value": 0.98, "generateTime": "2021/11/16 17:19:02.476018"}
010: {"sensorId": "A0001", "value": 0.92, "generateTime": "2021/11/16 17:19:03.483368"}
011: {"sensorId": "A0001", "value": 0.83, "generateTime": "2021/11/16 17:19:04.488869"}
012: {"sensorId": "A0001", "value": 0.71, "generateTime": "2021/11/16 17:19:05.495719"}
013: {"sensorId": "A0001", "value": 0.56, "generateTime": "2021/11/16 17:19:06.503537"}
014: {"sensorId": "A0001", "value": 0.38, "generateTime": "2021/11/16 17:19:07.512592"}
015: {"sensorId": "A0001", "value": 0.2, "generateTime": "2021/11/16 17:19:08.521854"}
016: {"sensorId": "A0001", "value": 0.0, "generateTime": "2021/11/16 17:19:09.530849"}
017: {"sensorId": "A0001", "value": -0.2, "generateTime": "2021/11/16 17:19:10.542617"}
...

まとめ

pythonでデータをDr.Sum Data FunnelにMQTTで送ってみました。
次回はDr.Sum Data Funnel側でデータを受け取って、受け取ったデータをDr.Sumに溜めていこうと思います。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0