1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

InfluxDB v2: MQTT で受信したデータを書き込む

Last updated at Posted at 2022-12-05

プログラム

MQTT サーバーから、次のような JSON を受信するとします。

{"temperature": 20.4}
subscribe_influx.py
#! /usr/bin/python
#
#	subscribe_influx.py
#
#						Dec/05/2022
#
# ------------------------------------------------------------------
import sys
import json
from time import sleep
import paho.mqtt.client as mqtt
import time
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS

# ------------------------------------------------------------------
def influxdb_write_proc(json_str):
#	print(json_str)
	unit_aa = json.loads(json_str)
	value = unit_aa["temperature"]
	print(value)
#
	pp = influxdb_client.Point("data") \
		.field("temperature", value)
	write_api.write(bucket=bucket, org=org, record=pp)

# ------------------------------------------------------------------
def on_connect(client, userdata, flags, respons_code):
#	print('status {0}'.format(respons_code))
	client.subscribe(topic)

# ------------------------------------------------------------------
def on_message(client, userdata, msg):
#	print(msg.topic + ' ' + str(msg.payload,'utf-8'))
	influxdb_write_proc(str(msg.payload,'utf-8'))

# ------------------------------------------------------------------
sys.stderr.write("*** 開始 ***\n")

host = 'example.com'
port = 1883
topic = 'example/test'

bucket = "tochigi"
org = "ekzemplaro"
token = '****VpyyJg6nskHEwcl6bMsjc1vyzY9b11hSyGiQMOg=='
#
url="http://localhost:8086"

db_client = influxdb_client.InfluxDBClient(
   url=url,
   token=token,
   org=org
)

write_api = db_client.write_api(write_options=SYNCHRONOUS)

client = mqtt.Client(protocol=mqtt.MQTTv311)

client.on_connect = on_connect
client.on_message = on_message

client.connect(host, port=port, keepalive=60)

client.loop_forever()

sys.stderr.write("*** 終了 ***\n")
# ------------------------------------------------------------------

テスト用プログラム

MQTT サーバーに 1秒毎にデータをパブリッシュします。

publish.py
#! /usr/bin/python
#
#	publish.py
#
#					Dec/05/2012
#
# ------------------------------------------------------------------
import  sys
import  json
from time import sleep
import paho.mqtt.client as mqtt

# ------------------------------------------------------------------
sys.stderr.write("*** 開始 ***\n")

host = 'example.com'
port = 1883
topic = 'example/test'

client = mqtt.Client(protocol=mqtt.MQTTv311)

client.connect(host, port=port, keepalive=60)

ttx = [20.4,21.3,22.7,23.5,24.5,23.6,22.2,20.1]
for it in range(len(ttx)):
	unit_aa = {}
	unit_aa['temperature'] = ttx[it]
	json_str = json.dumps(unit_aa)
	print(json_str)
	client.publish(topic, json_str)
	sleep(1.0)
#
client.disconnect()

sys.stderr.write("*** 終了 ***\n")
# ------------------------------------------------------------------

書き込まれたデータの確認

InfluxDB に書き込まれたデータのプログラムは、
InfluxDB v2: 時系列データの取り扱い例
にある
query_temperature.py

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?