プログラム
MQTT サーバーから、次のような JSON を受信するとします。
{"temperature": 20.4}
subscribe_influx.py
#! /usr/bin/python
#
# subscribe_influx.py
#
# Dec/05/2022
#
# ------------------------------------------------------------------
import sys
import json
from time import sleep
import paho.mqtt.client as mqtt
import time
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
# ------------------------------------------------------------------
def influxdb_write_proc(json_str):
# print(json_str)
unit_aa = json.loads(json_str)
value = unit_aa["temperature"]
print(value)
#
pp = influxdb_client.Point("data") \
.field("temperature", value)
write_api.write(bucket=bucket, org=org, record=pp)
# ------------------------------------------------------------------
def on_connect(client, userdata, flags, respons_code):
# print('status {0}'.format(respons_code))
client.subscribe(topic)
# ------------------------------------------------------------------
def on_message(client, userdata, msg):
# print(msg.topic + ' ' + str(msg.payload,'utf-8'))
influxdb_write_proc(str(msg.payload,'utf-8'))
# ------------------------------------------------------------------
sys.stderr.write("*** 開始 ***\n")
host = 'example.com'
port = 1883
topic = 'example/test'
bucket = "tochigi"
org = "ekzemplaro"
token = '****VpyyJg6nskHEwcl6bMsjc1vyzY9b11hSyGiQMOg=='
#
url="http://localhost:8086"
db_client = influxdb_client.InfluxDBClient(
url=url,
token=token,
org=org
)
write_api = db_client.write_api(write_options=SYNCHRONOUS)
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_message = on_message
client.connect(host, port=port, keepalive=60)
client.loop_forever()
sys.stderr.write("*** 終了 ***\n")
# ------------------------------------------------------------------
テスト用プログラム
MQTT サーバーに 1秒毎にデータをパブリッシュします。
publish.py
#! /usr/bin/python
#
# publish.py
#
# Dec/05/2012
#
# ------------------------------------------------------------------
import sys
import json
from time import sleep
import paho.mqtt.client as mqtt
# ------------------------------------------------------------------
sys.stderr.write("*** 開始 ***\n")
host = 'example.com'
port = 1883
topic = 'example/test'
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.connect(host, port=port, keepalive=60)
ttx = [20.4,21.3,22.7,23.5,24.5,23.6,22.2,20.1]
for it in range(len(ttx)):
unit_aa = {}
unit_aa['temperature'] = ttx[it]
json_str = json.dumps(unit_aa)
print(json_str)
client.publish(topic, json_str)
sleep(1.0)
#
client.disconnect()
sys.stderr.write("*** 終了 ***\n")
# ------------------------------------------------------------------
書き込まれたデータの確認
InfluxDB に書き込まれたデータのプログラムは、
InfluxDB v2: 時系列データの取り扱い例
にある
query_temperature.py