0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

InfluxDB v2 (OSS版) PythonクライアントでUDPパケットデータを登録する

Last updated at Posted at 2024-01-31

 前回の記事ではinfluxdbにマイグレーションした気象データをpandasで読み込む方法を紹介しました。

InfluxDB v2 (OSS版) PythonクライアントでInfluxdbの気象データを取得する (Qiita@pipito-yukio

 今回は influxdata社 が提供する python ライブラリを使って、気象センサーから内部ネットワークにブロードキャストされるUDPパケットを受信して登録する方法を解説します。

QiitaPostNo20_overview.jpg

参考ドキュメント

検証環境

  • OS: Ubuntu 22-04
    ※ 本番環境は Raspberry Pi 4 Model B (Python3.9)
  • influxdbサービス(基本的に dockerコンテナ) が稼働していること
  • python仮想環境を作成ずみで下記のライブラリがインストール済みであること
    • influxdb-client
    • pandas ※UDPデータ登録後の確認
  • センサーがブロードキャストする UDPのポート(ここでは2222番) の受信が許可されていること

UDPパケットモニターの実装

(A) Raspberry Pi 4 のUDPパケットモニターサービス

(A)-1. PostgreSQLデータベース接続取得クラス
database/pddatabase.py
import json
import psycopg2


class PgDatabase(object):
    def __init__(self, configfile, hostname, readonly=False, autocommit=False, logger=None):
        self.logger = logger
        with open(configfile, 'r') as fp:
            dbconf = json.load(fp)

        # replace real lower hostname in dbconf["host"] = {hostname}.local
        dbconf["host"] = dbconf["host"].format(hostname=hostname)
        if self.logger is not None:
            self.logger.debug(f"dbconf: {dbconf}")
        # default connection is itarable curosr
        self.conn = psycopg2.connect(**dbconf)
        self.conn.set_session(readonly=readonly, autocommit=autocommit)
        if self.logger is not None:
            self.logger.info(self.conn)

    def get_connection(self):
        return self.conn

    def close(self):
        if self.conn is not None:
            self.logger.info("Close {} ".format(self.conn))
            try:
                self.conn.close()
            except:
                pass
A-2. PostgreSQL登録処理モジュール
  • データ登録関数 (insert) のみ抜粋しています
    get_did() は デバイス名からデバイスIDを取得する関数 ※割愛します
db/weatherdb.py
import logging
from psycopg2 import DatabaseError

INSERT_WEATHER = """
INSERT INTO weather.t_weather(did, measurement_time, temp_out, temp_in, humid, pressure) VALUES (
 %(did)s,
 %(measurement_time)s,
 %(temp_out)s,
 %(temp_in)s,
 %(humid)s,
 %(pressure)s
 )
"""

def insert(device_name, temp_out, temp_in, humid, pressure,
           measurement_time=None, conn=None, logger=None):
    """
    Insert weather sensor data to t_weather
    """
    isLogLevelDebug = logger.getEffectiveLevel() <= logging.DEBUG
    did = get_did(conn, device_name, logger=logger, isLogLevelDebug=isLogLevelDebug)
    rec = (did,
           measurement_time,
           to_float(temp_out),
           to_float(temp_in),
           to_float(humid),
           to_float(pressure)
           )
    if isLogLevelDebug:
        logger.debug(rec)
    try:
        with conn.cursor() as cursor:
            cursor.execute(INSERT_WEATHER,
                           {
                               'did': rec[0],
                               'measurement_time': rec[1],
                               'temp_out': rec[2],
                               'temp_in': rec[3],
                               'humid': rec[4],
                               'pressure': rec[5],
                            })
    except DatabaseError as err:
        if logger is not None:
            logger.warning("rec: {}\nerror:{}".format(rec, err))```
(A)-3. UDPモニターメインスクリプト
  • ライブラリのインポート
  • 各種定数の定義
UdpMonitorFromWeatherSensor.py
import os
import logging
import signal
import socket
from datetime import datetime
import db.weatherdb as wdb
from log import logsetting
from database.pgdatabase import PgDatabase 

"""
raspi-4 UDP packet Monitor from ESP Weather sensors With Insert sensors_pgdb on PostgreSQL
[UDP port] 2222
"""

# args option default
WEATHER_UDP_PORT = 2222
BUFF_SIZE = 1024
PATH_CONF = os.path.join(os.environ.get("PATH_LOGGER_CONF", os.path.expanduser("~/bin/pigpio/conf")))
PATH_DBCONN_FILE = os.path.join(PATH_CONF, "dbconf.json")

isLogLevelDebug = False
  • シャットダウンフック時に実行する関数
    • PostgreSQLデータベース接続オブジェクトのクローズ
    • UDPソケットリソースのクローズ
def detect_signal(signum, frame):
    """
    Detect shutdown, and execute cleanup.
    :param signum: Signal number
    :param frame: frame
    :return:
    """
    logger.info("signum: {}, frame: {}".format(signum, frame))
    if signum == signal.SIGTERM or signum == signal.SIGSTOP:
        # signal shutdown
        cleanup()
        # Current process terminate
        exit(0)


def cleanup():
    pgdb.close()
    udp_client.close()
  • UDPパケット受信とデータの登録処理 (到着時刻はローカル時刻)
    • UDPパケット(バイトデータ、カンマ区切り)をUTF-8でデコードし文字列に変換する
    • カンマで分割して個別データリスト取得
    • パケットの到着時のローカル時刻を登録用の文字列に変換する
    • データ登録モジュールの登録関数を呼び出してデータを登録する
def loop(client, conn):
    server_ip = ''
    while True:
        data, addr = client.recvfrom(BUFF_SIZE)
        if server_ip != addr:
            server_ip = addr
            logger.info("server ip: {}".format(server_ip))

        # from ESP output: device_name, temp_out, temp_in, humid, pressure
        line = data.decode("utf-8")
        record = line.split(",")
        # Insert weather DB with local time
        if isLogLevelDebug:
            logger.debug(line)
        # PostgreSQL timestamp.
        now_timestamp = datetime.now()
        s_timestamp = now_timestamp.strftime("%Y-%m-%d %H:%M:%S")
        wdb.insert(*record, measurement_time=s_timestamp, conn=conn, logger=logger)
  • メイン処理
    • シャットダウンフックを設定する
      【契機】UDPモニターサービスの再起動、端末シャットダウン
    • UDPソケットオブジェクトを生成する
    • PostgreSQL接続オブジェクトを生成する
    • UDPモニターの無限ループ関数を呼び出す

サービスとして実行されるので finally 句が実行されることはありません。

if __name__ == '__main__':
    # PostgreSQL database
    global pgdb

    logger = logsetting.create_logger("service_weather") # only fileHandler
    isLogLevelDebug = logger.getEffectiveLevel() <= logging.DEBUG
    signal.signal(signal.SIGTERM, detect_signal)

    hostname = socket.gethostname()
    # Receive broadcast.
    broad_address = ("", WEATHER_UDP_PORT)
    logger.info("{}: {}".format(hostname, broad_address))
    # UDP client
    udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_client.bind(broad_address)
    
    # Insert immediately commit.
    pgdb = PgDatabase(PATH_DBCONN_FILE, hostname, readonly=False, autocommit=True, logger=logger);
    conn = pgdb.get_connection()
    try:
        # load device cache
        wdb.load_device_cache(conn=conn, logger=logger)
        loop(udp_client, conn)
    except KeyboardInterrupt:
        pass
    finally:
        cleanup()

pythonのシャットダウンフックの詳細については下記記事をご覧ください。
Raspberry Pi zero pythonでOSシャットダウン時にクリーンアップ処理を実行するには (Qiita@pipito-yukio)

(B) 検証環境のパケットモニタースクリプト

(B)-1. ライブラリのインポート、DB接続定義
UdpWeatherMonitor.py
import socket

from datetime import datetime
from typing import Dict, List, Optional

from influxdb_client import WritePrecision, InfluxDBClient, Point
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write_api import SYNCHRONOUS

DB_CONF: Dict = {
    "url": "http://localhost:8086",
    "token": "0123456789abcdef",
    "org": "raspi-influxdb"
}
  • UDPパケット受信とデータの登録処理 (到着時刻はUTC時刻)
    • UDPパケット(バイトデータ、カンマ区切り)をUTF-8でデコードし文字列に変換する
    • カンマで分割して個別データリスト取得
    • 登録用辞書オブジェクトの "time" キーに 到着時のUTC 時刻を設定する
    • 登録用辞書オブジェクトのスキーマ名、タグ、観測データをそれぞれのキーに設定する
    • 登録用辞書オブジェクトから Point オブジェクトを生成する
    • Point オブジェクトを influxdb に登録する
def loop(udp_client: socket.socket, client: InfluxDBClient):
    server_ip = ''
    while True:
        data, addr = udp_client.recvfrom(1024)
        if server_ip != addr:
            server_ip = addr
            print(f"server ip: {server_ip}")

        # from ESP output: device_name, temp_out, temp_in, humid, pressure
        # コンソールにはローカル時刻を出力
        print(datetime.now())
        line: str = data.decode("utf-8")
        rec: List[str] = line.split(",")
        print(line)
        with client.write_api(write_options=SYNCHRONOUS) as write_api:
            data_dict: Dict = {
                "measurement": "weather",
                "tags": {"deviceName": rec[0]},
                "fields": {
                    "temp_out": float(rec[1]), "temp_in": float(rec[2]), "humid": float(rec[3]),
                    "pressure": float(rec[4])
                },
                "time": datetime.utcnow()
            }
            # データの登録
            try:
                # 測定時刻は秒まで
                point: Point = Point.from_dict(data_dict, WritePrecision.S)
                print(point)
                write_api.write(bucket="sensor-bucket", record=point)
            except InfluxDBError as err:
                # ひとまず終了させずエラー出力
                print(err)

Point.from_dict()は下記ソースコードのPyDocを参考に実装

influxdb_client/client/write/point.py の PyDoc から抜粋

influxdb_client/client/write/point.py
    @staticmethod
    def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs):
        """
        Initialize point from 'dict' structure.

        The expected dict structure is:
            - measurement
            - tags
            - fields
            - time

        Example:
            .. code-block:: python

                # Use default dictionary structure
                dict_structure = {
                    "measurement": "h2o_feet",
                    "tags": {"location": "coyote_creek"},
                    "fields": {"water_level": 1.0},
                    "time": 1
                }
                point = Point.from_dict(dict_structure, WritePrecision.NS)
# 以下長いので割愛
  • メイン処理
    • UDPソケットオブジェクトを生成する
    • influxdb クライアントオブジェクトを生成する
    • UDPモニターの無限ループ関数を呼び出す

CTRL + C で処理を終了し、UDPソケットオブジェクトとinfluxdbオブジェクトをクローズする

if __name__ == '__main__':
    db_client: Optional[InfluxDBClient] = None
    hostname = socket.gethostname()
    # Receive broadcast.
    broad_address = ("", 2222)
    print(f"{hostname}: {broad_address}")
    # UDP client
    udp_monitor: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_monitor.bind(broad_address)
    try:
        db_client = InfluxDBClient(**DB_CONF)
        loop(udp_monitor, db_client)
    except KeyboardInterrupt:
        print("KeyboardInterrupted!")
    finally:
        if db_client is not None:
            db_client.close()
        udp_monitor.close()

スクリプトの実行

事前にdockerコンテナを起動

$ cd docker/compose/influxdb/v2/
$ docker-compose up -d
Creating network "v2_default" with the default driver
Creating influxdb ... done

UDPモニタースクリプト実行

(py39_influxdb) $ python UdpWeatherMonitor.py 
Dell-T7500: ('', 2222)
server ip: ('192.168.0.31', 2222)
2024-01-31 12:38:50.934559
esp8266_1,3.1,16.9,48.5,1016.1
weather,deviceName=esp8266_1 humid=48.5,pressure=1016.1,temp_in=16.9,temp_out=3.1 1706672330
2024-01-31 12:48:35.052433
esp8266_1,3.2,17.0,53.9,1015.9
weather,deviceName=esp8266_1 humid=53.9,pressure=1015.9,temp_in=17,temp_out=3.2 1706672915
2024-01-31 12:58:19.695667
esp8266_1,2.9,17.1,56.1,1015.6
weather,deviceName=esp8266_1 humid=56.1,pressure=1015.6,temp_in=17.1,temp_out=2.9 1706673499
2024-01-31 13:08:04.199797
esp8266_1,2.9,17.2,56.6,1015.6
weather,deviceName=esp8266_1 humid=56.6,pressure=1015.6,temp_in=17.2,temp_out=2.9 1706674084
2024-01-31 13:17:48.500107
esp8266_1,2.6,17.2,57.0,1015.3
weather,deviceName=esp8266_1 humid=57,pressure=1015.3,temp_in=17.2,temp_out=2.6 1706674668
2024-01-31 13:27:32.678929
esp8266_1,2.8,17.0,55.9,1015.1
weather,deviceName=esp8266_1 humid=55.9,pressure=1015.1,temp_in=17,temp_out=2.8 1706675252
2024-01-31 13:37:16.405391
esp8266_1,2.7,16.9,54.9,1014.9
weather,deviceName=esp8266_1 humid=54.9,pressure=1014.9,temp_in=16.9,temp_out=2.7 1706675836
^CKeyboardInterrupted!

登録データの確認スクリプト実行

pandas のインデックス列("measurement_time") はローカル時刻

(py39_influxdb) $ python ReadWeather_panda.py 
from(bucket: "sensor-bucket")
|> range(start: 2024-01-30T15:00:00Z, stop: 2024-01-31T14:59:59Z)
|> filter(fn:(r) => r._measurement == "weather" and r.deviceName == "esp8266_1")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> keep(columns: ["_time", "temp_out", "temp_in", "humid", "pressure"])
|> rename(columns: {_time: "measurement_time"})

tables.len: 1
                       measurement_time  temp_out  temp_in  humid  pressure
measurement_time                                                           
2024-01-31 12:38:50 2024-01-31 12:38:50       3.1     16.9   48.5    1016.1
2024-01-31 12:48:35 2024-01-31 12:48:35       3.2     17.0   53.9    1015.9
2024-01-31 12:58:19 2024-01-31 12:58:19       2.9     17.1   56.1    1015.6
2024-01-31 13:08:04 2024-01-31 13:08:04       2.9     17.2   56.6    1015.6
2024-01-31 13:17:48 2024-01-31 13:17:48       2.6     17.2   57.0    1015.3
2024-01-31 13:27:32 2024-01-31 13:27:32       2.8     17.0   55.9    1015.1
2024-01-31 13:37:16 2024-01-31 13:37:16       2.7     16.9   54.9    1014.9

【参考】登録データ確認用のpythonスクリプト

ReadWeather_panda.py
from datetime import datetime, timezone
from io import StringIO
from typing import Dict, List, Tuple
# Require python 3.9
from zoneinfo import ZoneInfo

import pandas as pd
from pandas.core.frame import DataFrame

from influxdb_client import InfluxDBClient
from influxdb_client.client.query_api import QueryApi
from influxdb_client.client.flux_table import FluxTable, FluxRecord, TableList

"""
気象データDB for InfluxDB v2

influxdb into pandas DataFrame
"""

DB_CONF: Dict = {
    "url": "http://localhost:8086",
    "token": "0123456789abcdef",
    "org": "raspi-influxdb"
}

QRY_BODY: str = """from(bucket: "sensor-bucket")
|> range(start: {from_time}, stop: {to_time})
|> filter(fn:(r) => r._measurement == "weather" and r.deviceName == "esp8266_1")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> keep(columns: ["_time", "temp_out", "temp_in", "humid", "pressure"])
"""
# pandas DataFrame: ["measurement_time","temp_out","temp_in","humid","pressure"]
# "measurement_time" is pandas DataFrame index
QRY_RENAME: str = """|> rename(columns: {_time: "measurement_time"})
"""


def _tuple_list_to_stringIO(
        tuple_list: List[Tuple[str, float, float, float, float]]) -> StringIO:
    str_buffer = StringIO()
    str_buffer.write('"measurement_time","temp_out","temp_in","humid","pressure"\n')

    for (m_time, temp_in, temp_out, humid, pressure) in tuple_list:
        line = f'"{m_time}",{temp_in},{temp_out},{humid},{pressure}\n'
        str_buffer.write(line)

    # StringIO need Set first position
    str_buffer.seek(0)
    return str_buffer


def date_to_utc_timezone(iso_date: str, dt_format: str=None) -> str:
    dt_local: datetime
    if dt_format is None:
        dt_local = datetime.strptime(iso_date, "%Y-%m-%d")
    else:
        dt_local = datetime.strptime(iso_date, dt_format)
    # Change timestamp with timezone UTC
    dt_utc: datetime = dt_local.astimezone(timezone.utc)
    return dt_utc.strftime("%Y-%m-%dT%H:%M:%SZ")


def utc_datetime_to_local(dt_utc: datetime) -> str:
    # To JST Time
    dt = dt_utc.astimezone(ZoneInfo('Asia/Tokyo'))
    return dt.strftime("%Y-%m-%d %H:%M:%S")


if __name__ == '__main__':
    client: InfluxDBClient = InfluxDBClient(**DB_CONF)
    query_api: QueryApi = client.query_api()
    params: Dict = {
        "from_time": date_to_utc_timezone("2024-01-31 00:00:00", "%Y-%m-%d %H:%M:%S"),
        "to_time": date_to_utc_timezone("2024-01-31 23:59:59", "%Y-%m-%d %H:%M:%S"),
    }
    query: str = QRY_BODY.format(**params)
    # Rename _time to "measurement_time"
    query += QRY_RENAME
    print(query)

    # TableList is Generator (0|1)
    tables: TableList = query_api.query(query=query)
    print(f"tables.len: {len(tables)}")
    if len(tables) > 0:
        record: FluxRecord
        table: FluxTable
        data_list: List[Tuple[str, float, float, float, float]] = []
        for table in tables:
            for rec in table.records:
                # UTC Timestamp: _time
                # dt: datetime = rec.get_time()
                # Rename: _time -> "measurement_time"
                local_time: str = utc_datetime_to_local(rec["measurement_time"])
                data_list.append(
                    (local_time, rec["temp_out"], rec["temp_in"], rec["humid"], rec["pressure"])
                )

        # pandas dataframe
        _strIO: StringIO = _tuple_list_to_stringIO(data_list)
        df: DataFrame = pd.read_csv(_strIO, parse_dates=["measurement_time"])
        df.index = df["measurement_time"]
        print(df)
    else:
        print("Record not found!")
    client.close()

結論

 UDPパケットデータ登録処理の influxdb へのマイグレーション(既存はPostgreSQL)は、比較的容易であることがとがわかりました。
 
 次回は influxdb からデータを取得して pandas のDataFrameを生成し、matplotlib で可視化するモジュールを作成する方法を解説する予定です。

 Raspberry Pi 4 で稼働している UDPモニターサービスの実装については下記GitHubリポジトリで公開しています。
UDP Weather Sensor packet monitor for Raspberry pi 4

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?