0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

MQTT で受信したデータを InfluxDB v1.8 に入れる

Posted at

プログラム

subscribe_temperature.py
#! /usr/bin/python
#
#	subscribe_temperature.py
#
#						Dec/08/2022
#
# ------------------------------------------------------------------
import sys
import json
import requests
import paho.mqtt.client as mqtt

# ------------------------------------------------------------------
def influxdb_write_proc(temp_in):
	print(temp_in)
#
	url="http://localhost:8086/write?db=example"
	raw_data='temp,IDX=1 tempValue=%f' % temp_in
	rr=requests.post(url,data=raw_data)
	print(rr)
# ------------------------------------------------------------------
def to_influxdb_proc(json_str):
#	print(json_str)
	unit_aa = json.loads(json_str)
	if "t" in unit_aa:
		value = unit_aa["t"]
		print(value)
		influxdb_write_proc(value)
	else:
		sys.stderr.write("*** error *** to_influxdb_proc ***\n")
#
#
# ------------------------------------------------------------------
def on_connect(client, userdata, flags, respons_code):
#	print('status {0}'.format(respons_code))
	client.subscribe(topic)
sy
# ------------------------------------------------------------------
def on_message(client, userdata, msg):
#	print(msg.topic + ' ' + str(msg.payload,'utf-8'))
	to_influxdb_proc(str(msg.payload,'utf-8'))

# ------------------------------------------------------------------
sys.stderr.write("*** 開始 ***\n")

host_mqtt = 'localhost'
port = 1883
topic = 'esp32/temperature'
#
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_message = on_message

client.connect(host_mqtt, port=port, keepalive=60)

client.loop_forever()

sys.stderr.write("*** 終了 ***\n")
# ------------------------------------------------------------------

Systemd で動かす

プログラムを
/usr/local/subscribe に置く

/etc/systemd/system/subscribe_temperature.service
[Unit]
Description = start up mqtt_client subscribe_temperature

[Service]
WorkingDirectory=/var/tmp/subscribe_temperature
ExecStart=/usr/local/subscribe/subscribe_temperature.py
Type=simple

[Install]
WantedBy=default.target

作業用の Directory を作成

mkdir /var/tmp/subscribe_temperature

起動

sudo systemctl start subscribe_temperature

起動したことを確認

sudo systemctl status subscribe_temperature

テスト

パブリッシュ

HOST="localhost"
#
mosquitto_pub -d -t orz -m '{"t": 21.6}' -h $HOST --topic esp32/temperature
sleep 20
#
mosquitto_pub -d -t orz -m '{"t": 22.4}' -h $HOST --topic esp32/temperature

InfluxDB で確認

influx -precision rfc3339
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?