前回の記事ではinfluxdbにマイグレーションした気象データをpandasで読み込む方法を紹介しました。
InfluxDB v2 (OSS版) PythonクライアントでInfluxdbの気象データを取得する (Qiita@pipito-yukio
今回は influxdata社 が提供する python ライブラリを使って、気象センサーから内部ネットワークにブロードキャストされるUDPパケットを受信して登録する方法を解説します。
参考ドキュメント
-
influxdata 社 Documentation
Python client library -
influxdata 社 python クライアントのソースコード
influxdata/influxdb-client-python
検証環境
- OS: Ubuntu 22-04
※ 本番環境は Raspberry Pi 4 Model B (Python3.9) - influxdbサービス(基本的に dockerコンテナ) が稼働していること
- python仮想環境を作成ずみで下記のライブラリがインストール済みであること
- influxdb-client
- pandas ※UDPデータ登録後の確認
- センサーがブロードキャストする UDPのポート(ここでは2222番) の受信が許可されていること
UDPパケットモニターの実装
(A) Raspberry Pi 4 のUDPパケットモニターサービス
(A)-1. PostgreSQLデータベース接続取得クラス
import json
import psycopg2
class PgDatabase(object):
def __init__(self, configfile, hostname, readonly=False, autocommit=False, logger=None):
self.logger = logger
with open(configfile, 'r') as fp:
dbconf = json.load(fp)
# replace real lower hostname in dbconf["host"] = {hostname}.local
dbconf["host"] = dbconf["host"].format(hostname=hostname)
if self.logger is not None:
self.logger.debug(f"dbconf: {dbconf}")
# default connection is itarable curosr
self.conn = psycopg2.connect(**dbconf)
self.conn.set_session(readonly=readonly, autocommit=autocommit)
if self.logger is not None:
self.logger.info(self.conn)
def get_connection(self):
return self.conn
def close(self):
if self.conn is not None:
self.logger.info("Close {} ".format(self.conn))
try:
self.conn.close()
except:
pass
A-2. PostgreSQL登録処理モジュール
- データ登録関数 (insert) のみ抜粋しています
get_did() は デバイス名からデバイスIDを取得する関数 ※割愛します
import logging
from psycopg2 import DatabaseError
INSERT_WEATHER = """
INSERT INTO weather.t_weather(did, measurement_time, temp_out, temp_in, humid, pressure) VALUES (
%(did)s,
%(measurement_time)s,
%(temp_out)s,
%(temp_in)s,
%(humid)s,
%(pressure)s
)
"""
def insert(device_name, temp_out, temp_in, humid, pressure,
measurement_time=None, conn=None, logger=None):
"""
Insert weather sensor data to t_weather
"""
isLogLevelDebug = logger.getEffectiveLevel() <= logging.DEBUG
did = get_did(conn, device_name, logger=logger, isLogLevelDebug=isLogLevelDebug)
rec = (did,
measurement_time,
to_float(temp_out),
to_float(temp_in),
to_float(humid),
to_float(pressure)
)
if isLogLevelDebug:
logger.debug(rec)
try:
with conn.cursor() as cursor:
cursor.execute(INSERT_WEATHER,
{
'did': rec[0],
'measurement_time': rec[1],
'temp_out': rec[2],
'temp_in': rec[3],
'humid': rec[4],
'pressure': rec[5],
})
except DatabaseError as err:
if logger is not None:
logger.warning("rec: {}\nerror:{}".format(rec, err))```
(A)-3. UDPモニターメインスクリプト
- ライブラリのインポート
- 各種定数の定義
import os
import logging
import signal
import socket
from datetime import datetime
import db.weatherdb as wdb
from log import logsetting
from database.pgdatabase import PgDatabase
"""
raspi-4 UDP packet Monitor from ESP Weather sensors With Insert sensors_pgdb on PostgreSQL
[UDP port] 2222
"""
# args option default
WEATHER_UDP_PORT = 2222
BUFF_SIZE = 1024
PATH_CONF = os.path.join(os.environ.get("PATH_LOGGER_CONF", os.path.expanduser("~/bin/pigpio/conf")))
PATH_DBCONN_FILE = os.path.join(PATH_CONF, "dbconf.json")
isLogLevelDebug = False
- シャットダウンフック時に実行する関数
- PostgreSQLデータベース接続オブジェクトのクローズ
- UDPソケットリソースのクローズ
def detect_signal(signum, frame):
"""
Detect shutdown, and execute cleanup.
:param signum: Signal number
:param frame: frame
:return:
"""
logger.info("signum: {}, frame: {}".format(signum, frame))
if signum == signal.SIGTERM or signum == signal.SIGSTOP:
# signal shutdown
cleanup()
# Current process terminate
exit(0)
def cleanup():
pgdb.close()
udp_client.close()
- UDPパケット受信とデータの登録処理 (到着時刻はローカル時刻)
- UDPパケット(バイトデータ、カンマ区切り)をUTF-8でデコードし文字列に変換する
- カンマで分割して個別データリスト取得
- パケットの到着時のローカル時刻を登録用の文字列に変換する
- データ登録モジュールの登録関数を呼び出してデータを登録する
def loop(client, conn):
server_ip = ''
while True:
data, addr = client.recvfrom(BUFF_SIZE)
if server_ip != addr:
server_ip = addr
logger.info("server ip: {}".format(server_ip))
# from ESP output: device_name, temp_out, temp_in, humid, pressure
line = data.decode("utf-8")
record = line.split(",")
# Insert weather DB with local time
if isLogLevelDebug:
logger.debug(line)
# PostgreSQL timestamp.
now_timestamp = datetime.now()
s_timestamp = now_timestamp.strftime("%Y-%m-%d %H:%M:%S")
wdb.insert(*record, measurement_time=s_timestamp, conn=conn, logger=logger)
- メイン処理
-
シャットダウンフックを設定する
【契機】UDPモニターサービスの再起動、端末シャットダウン - UDPソケットオブジェクトを生成する
- PostgreSQL接続オブジェクトを生成する
- UDPモニターの無限ループ関数を呼び出す
-
シャットダウンフックを設定する
※サービスとして実行されるので finally 句が実行されることはありません。
if __name__ == '__main__':
# PostgreSQL database
global pgdb
logger = logsetting.create_logger("service_weather") # only fileHandler
isLogLevelDebug = logger.getEffectiveLevel() <= logging.DEBUG
signal.signal(signal.SIGTERM, detect_signal)
hostname = socket.gethostname()
# Receive broadcast.
broad_address = ("", WEATHER_UDP_PORT)
logger.info("{}: {}".format(hostname, broad_address))
# UDP client
udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_client.bind(broad_address)
# Insert immediately commit.
pgdb = PgDatabase(PATH_DBCONN_FILE, hostname, readonly=False, autocommit=True, logger=logger);
conn = pgdb.get_connection()
try:
# load device cache
wdb.load_device_cache(conn=conn, logger=logger)
loop(udp_client, conn)
except KeyboardInterrupt:
pass
finally:
cleanup()
pythonのシャットダウンフックの詳細については下記記事をご覧ください。
Raspberry Pi zero pythonでOSシャットダウン時にクリーンアップ処理を実行するには (Qiita@pipito-yukio)
(B) 検証環境のパケットモニタースクリプト
(B)-1. ライブラリのインポート、DB接続定義
import socket
from datetime import datetime
from typing import Dict, List, Optional
from influxdb_client import WritePrecision, InfluxDBClient, Point
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write_api import SYNCHRONOUS
DB_CONF: Dict = {
"url": "http://localhost:8086",
"token": "0123456789abcdef",
"org": "raspi-influxdb"
}
- UDPパケット受信とデータの登録処理 (到着時刻はUTC時刻)
- UDPパケット(バイトデータ、カンマ区切り)をUTF-8でデコードし文字列に変換する
- カンマで分割して個別データリスト取得
- 登録用辞書オブジェクトの "time" キーに 到着時のUTC 時刻を設定する
- 登録用辞書オブジェクトのスキーマ名、タグ、観測データをそれぞれのキーに設定する
- 登録用辞書オブジェクトから Point オブジェクトを生成する
- Point オブジェクトを influxdb に登録する
def loop(udp_client: socket.socket, client: InfluxDBClient):
server_ip = ''
while True:
data, addr = udp_client.recvfrom(1024)
if server_ip != addr:
server_ip = addr
print(f"server ip: {server_ip}")
# from ESP output: device_name, temp_out, temp_in, humid, pressure
# コンソールにはローカル時刻を出力
print(datetime.now())
line: str = data.decode("utf-8")
rec: List[str] = line.split(",")
print(line)
with client.write_api(write_options=SYNCHRONOUS) as write_api:
data_dict: Dict = {
"measurement": "weather",
"tags": {"deviceName": rec[0]},
"fields": {
"temp_out": float(rec[1]), "temp_in": float(rec[2]), "humid": float(rec[3]),
"pressure": float(rec[4])
},
"time": datetime.utcnow()
}
# データの登録
try:
# 測定時刻は秒まで
point: Point = Point.from_dict(data_dict, WritePrecision.S)
print(point)
write_api.write(bucket="sensor-bucket", record=point)
except InfluxDBError as err:
# ひとまず終了させずエラー出力
print(err)
Point.from_dict()は下記ソースコードのPyDocを参考に実装
influxdb_client/client/write/point.py の PyDoc から抜粋
@staticmethod
def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs):
"""
Initialize point from 'dict' structure.
The expected dict structure is:
- measurement
- tags
- fields
- time
Example:
.. code-block:: python
# Use default dictionary structure
dict_structure = {
"measurement": "h2o_feet",
"tags": {"location": "coyote_creek"},
"fields": {"water_level": 1.0},
"time": 1
}
point = Point.from_dict(dict_structure, WritePrecision.NS)
# 以下長いので割愛
- メイン処理
- UDPソケットオブジェクトを生成する
- influxdb クライアントオブジェクトを生成する
- UDPモニターの無限ループ関数を呼び出す
※ CTRL + C で処理を終了し、UDPソケットオブジェクトとinfluxdbオブジェクトをクローズする
if __name__ == '__main__':
db_client: Optional[InfluxDBClient] = None
hostname = socket.gethostname()
# Receive broadcast.
broad_address = ("", 2222)
print(f"{hostname}: {broad_address}")
# UDP client
udp_monitor: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_monitor.bind(broad_address)
try:
db_client = InfluxDBClient(**DB_CONF)
loop(udp_monitor, db_client)
except KeyboardInterrupt:
print("KeyboardInterrupted!")
finally:
if db_client is not None:
db_client.close()
udp_monitor.close()
スクリプトの実行
事前にdockerコンテナを起動
$ cd docker/compose/influxdb/v2/
$ docker-compose up -d
Creating network "v2_default" with the default driver
Creating influxdb ... done
UDPモニタースクリプト実行
(py39_influxdb) $ python UdpWeatherMonitor.py
Dell-T7500: ('', 2222)
server ip: ('192.168.0.31', 2222)
2024-01-31 12:38:50.934559
esp8266_1,3.1,16.9,48.5,1016.1
weather,deviceName=esp8266_1 humid=48.5,pressure=1016.1,temp_in=16.9,temp_out=3.1 1706672330
2024-01-31 12:48:35.052433
esp8266_1,3.2,17.0,53.9,1015.9
weather,deviceName=esp8266_1 humid=53.9,pressure=1015.9,temp_in=17,temp_out=3.2 1706672915
2024-01-31 12:58:19.695667
esp8266_1,2.9,17.1,56.1,1015.6
weather,deviceName=esp8266_1 humid=56.1,pressure=1015.6,temp_in=17.1,temp_out=2.9 1706673499
2024-01-31 13:08:04.199797
esp8266_1,2.9,17.2,56.6,1015.6
weather,deviceName=esp8266_1 humid=56.6,pressure=1015.6,temp_in=17.2,temp_out=2.9 1706674084
2024-01-31 13:17:48.500107
esp8266_1,2.6,17.2,57.0,1015.3
weather,deviceName=esp8266_1 humid=57,pressure=1015.3,temp_in=17.2,temp_out=2.6 1706674668
2024-01-31 13:27:32.678929
esp8266_1,2.8,17.0,55.9,1015.1
weather,deviceName=esp8266_1 humid=55.9,pressure=1015.1,temp_in=17,temp_out=2.8 1706675252
2024-01-31 13:37:16.405391
esp8266_1,2.7,16.9,54.9,1014.9
weather,deviceName=esp8266_1 humid=54.9,pressure=1014.9,temp_in=16.9,temp_out=2.7 1706675836
^CKeyboardInterrupted!
登録データの確認スクリプト実行
pandas のインデックス列("measurement_time") はローカル時刻
(py39_influxdb) $ python ReadWeather_panda.py
from(bucket: "sensor-bucket")
|> range(start: 2024-01-30T15:00:00Z, stop: 2024-01-31T14:59:59Z)
|> filter(fn:(r) => r._measurement == "weather" and r.deviceName == "esp8266_1")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> keep(columns: ["_time", "temp_out", "temp_in", "humid", "pressure"])
|> rename(columns: {_time: "measurement_time"})
tables.len: 1
measurement_time temp_out temp_in humid pressure
measurement_time
2024-01-31 12:38:50 2024-01-31 12:38:50 3.1 16.9 48.5 1016.1
2024-01-31 12:48:35 2024-01-31 12:48:35 3.2 17.0 53.9 1015.9
2024-01-31 12:58:19 2024-01-31 12:58:19 2.9 17.1 56.1 1015.6
2024-01-31 13:08:04 2024-01-31 13:08:04 2.9 17.2 56.6 1015.6
2024-01-31 13:17:48 2024-01-31 13:17:48 2.6 17.2 57.0 1015.3
2024-01-31 13:27:32 2024-01-31 13:27:32 2.8 17.0 55.9 1015.1
2024-01-31 13:37:16 2024-01-31 13:37:16 2.7 16.9 54.9 1014.9
【参考】登録データ確認用のpythonスクリプト
from datetime import datetime, timezone
from io import StringIO
from typing import Dict, List, Tuple
# Require python 3.9
from zoneinfo import ZoneInfo
import pandas as pd
from pandas.core.frame import DataFrame
from influxdb_client import InfluxDBClient
from influxdb_client.client.query_api import QueryApi
from influxdb_client.client.flux_table import FluxTable, FluxRecord, TableList
"""
気象データDB for InfluxDB v2
influxdb into pandas DataFrame
"""
DB_CONF: Dict = {
"url": "http://localhost:8086",
"token": "0123456789abcdef",
"org": "raspi-influxdb"
}
QRY_BODY: str = """from(bucket: "sensor-bucket")
|> range(start: {from_time}, stop: {to_time})
|> filter(fn:(r) => r._measurement == "weather" and r.deviceName == "esp8266_1")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> keep(columns: ["_time", "temp_out", "temp_in", "humid", "pressure"])
"""
# pandas DataFrame: ["measurement_time","temp_out","temp_in","humid","pressure"]
# "measurement_time" is pandas DataFrame index
QRY_RENAME: str = """|> rename(columns: {_time: "measurement_time"})
"""
def _tuple_list_to_stringIO(
tuple_list: List[Tuple[str, float, float, float, float]]) -> StringIO:
str_buffer = StringIO()
str_buffer.write('"measurement_time","temp_out","temp_in","humid","pressure"\n')
for (m_time, temp_in, temp_out, humid, pressure) in tuple_list:
line = f'"{m_time}",{temp_in},{temp_out},{humid},{pressure}\n'
str_buffer.write(line)
# StringIO need Set first position
str_buffer.seek(0)
return str_buffer
def date_to_utc_timezone(iso_date: str, dt_format: str=None) -> str:
dt_local: datetime
if dt_format is None:
dt_local = datetime.strptime(iso_date, "%Y-%m-%d")
else:
dt_local = datetime.strptime(iso_date, dt_format)
# Change timestamp with timezone UTC
dt_utc: datetime = dt_local.astimezone(timezone.utc)
return dt_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
def utc_datetime_to_local(dt_utc: datetime) -> str:
# To JST Time
dt = dt_utc.astimezone(ZoneInfo('Asia/Tokyo'))
return dt.strftime("%Y-%m-%d %H:%M:%S")
if __name__ == '__main__':
client: InfluxDBClient = InfluxDBClient(**DB_CONF)
query_api: QueryApi = client.query_api()
params: Dict = {
"from_time": date_to_utc_timezone("2024-01-31 00:00:00", "%Y-%m-%d %H:%M:%S"),
"to_time": date_to_utc_timezone("2024-01-31 23:59:59", "%Y-%m-%d %H:%M:%S"),
}
query: str = QRY_BODY.format(**params)
# Rename _time to "measurement_time"
query += QRY_RENAME
print(query)
# TableList is Generator (0|1)
tables: TableList = query_api.query(query=query)
print(f"tables.len: {len(tables)}")
if len(tables) > 0:
record: FluxRecord
table: FluxTable
data_list: List[Tuple[str, float, float, float, float]] = []
for table in tables:
for rec in table.records:
# UTC Timestamp: _time
# dt: datetime = rec.get_time()
# Rename: _time -> "measurement_time"
local_time: str = utc_datetime_to_local(rec["measurement_time"])
data_list.append(
(local_time, rec["temp_out"], rec["temp_in"], rec["humid"], rec["pressure"])
)
# pandas dataframe
_strIO: StringIO = _tuple_list_to_stringIO(data_list)
df: DataFrame = pd.read_csv(_strIO, parse_dates=["measurement_time"])
df.index = df["measurement_time"]
print(df)
else:
print("Record not found!")
client.close()
結論
UDPパケットデータ登録処理の influxdb へのマイグレーション(既存はPostgreSQL)は、比較的容易であることがとがわかりました。
次回は influxdb からデータを取得して pandas のDataFrameを生成し、matplotlib で可視化するモジュールを作成する方法を解説する予定です。
Raspberry Pi 4 で稼働している UDPモニターサービスの実装については下記GitHubリポジトリで公開しています。
UDP Weather Sensor packet monitor for Raspberry pi 4