前回の記事でラズパイ4で稼働するPostgreSQL(dockerコンテナ)を InfluxDB v2(OSS版) にマイグレーションする方法を解説しました。※ただし検証環境はUbuntu 22-04
InfluxDB v2 OSS版 (dockerコンテナ) へのマイグレーションの検討 (Qiita@pipito-yukio)
今回はSQLiteデータベースからInfluxDB v2 用 の CSVファイルを生成し、InfluxDBにインポートする方法を解説します。
検証環境
- OS: Ubuntu 22-04
※本番環境は Raspberry Pi 4 Model B を想定 - docker-compose がインストール済み
- SQLite3データベースがインストール済み
1. SQLite3データベース
1-1. テーブル定義
-
観測機器テーブル: t_device
- デバイスID
- デバイス名
-
気象データテーブル: t_weather
- デバイスID
- 測定時刻: Unix epoch タイムスタンプ
- 外気温
- 室内気温
- 室内湿度
- 室内気圧
-- 観測機器テーブル
CREATE TABLE IF NOT EXISTS t_device(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR UNIQUE NOT NULL
);
-- 気象データテーブル
CREATE TABLE IF NOT EXISTS t_weather(
did INTEGER NOT NULL,
measurement_time INTEGER NOT NULL, -- unixepoch
temp_out real,
temp_in real,
humid real,
pressure real,
PRIMARY KEY (did, measurement_time),
FOREIGN KEY (did) REFERENCES t_devices (id) ON DELETE CASCADE
);
【参考】UDPパケットから測定時刻を生成する python アプリの実装 ※抜粋
import time
# ...UDPパケット到着後の測定時刻の生成処理のみ抜粋
# From ESP output: device_name, temp_out, temp_in, humid, pressure
line = data.decode("utf-8")
record = line.split(",")
# Insert weather DB with local time
# unix_tmstmp = int(time.time()) # time.time() is UTC unix epoch
local_time = time.localtime()
unix_tmstmp = time.mktime(local_time)
wdb.insert(*record, measurement_time=unix_tmstmp, logger=logger)
1-2. 記録されているデータ
- 観測機器テーブル
"id","name"
1,esp8266_1
- 気象データテーブル
"did","measurement_time","temp_out","temp_in","humid","pressure"
1,1704034851,-2.9,14.0,51.7,1009.6
1,1704035435,-2.7,13.8,51.7,1009.8
1,1704036019,-2.8,13.6,51.8,1009.9
1-3. InfluxDB用の測定時刻(UNIX epoch) の2つの形式での出力比較
- InfluxDB(v2)の Timestamp 列の有効フォーマット
- A. Unix epoch: "measurement_time" 列の値
[出力例] 1704034851 - B. RFC3339 形式: "UTC datetime" 列の値
[出力例] 2023-12-31T15:00:51Z
- A. Unix epoch: "measurement_time" 列の値
"measurement_time","JST datetime","UTC datetime","temp_out"
1704034851,"2024-01-01 00:00:51",2023-12-31T15:00:51Z,-2.9
1704035435,"2024-01-01 00:10:35",2023-12-31T15:10:35Z,-2.7
1704036019,"2024-01-01 00:20:19",2023-12-31T15:20:19Z,-2.8
参考までに上記出力のSQLは下記のとおりです
※ JST ローカル時刻は測定時刻の確認用で実際の出力には使用しない
SELECT
measurement_time, -- UNIX epoch time
datetime(measurement_time, 'unixepoch', 'localtime'), --JST ローカル時刻
strftime('%Y-%m-%dT%H:%M:%SZ', datetime(measurement_time, 'unixepoch')),-- RFC3339
temp_out --, temp_in, humid, pressure
FROM
t_weather tw INNER JOIN t_device td ON tw.did = td.id
WHERE
td.name = 'センサーデバイス名' AND measurment_time_range
ORDER BY measurement_time;
2. InfluxDB (v2) へのマイグレーション
これから解説するデータベースのマイグレーションイメージは下記のとおりです。
2-1. InfluxDB v2 公式ドキュメント
- Get started writing data
- InfluxDB schema design
- Write data with the influx CLI
- Write CSV data to InfluxDB
2-2. CSVファイルのコメント行でテーブル名と列の型を定義する
- 気象データテーブルに該当するスキーマ名 (measurement): weather
※ コメント行の 1行目に定義 - 列の型定義 ※ コメント行の 2行目に定義
- tag: 観測機器のデバイス名 ※文字列型
- dateTime: UNIX epoch タイムスタンプ or RFC3339 形式のタイムスタンプ
- データ列: すべて double型
- 全ての列名定義
2-2-A. UNIX epoch タイムスタンプを使用
#constant measurement,weather
#datatype tag,dateTime:number,double,double,double,double
deviceName,time,temp_out,temp_in,humid,pressure
esp8266_1,1704034851,-2.9,14.0,51.7,1009.6
esp8266_1,1704035435,-2.7,13.8,51.7,1009.8
esp8266_1,1704036019,-2.8,13.6,51.8,1009.9
2-2-B. RFC3339 形式のタイムスタンプを使用
#constant measurement,weather
#datatype tag,dateTime:RFC3339,double,double,double,double
deviceName,time,temp_out,temp_in,humid,pressure
esp8266_1,2023-12-31T15:00:51Z,-2.9,14.0,51.7,1009.6
esp8266_1,2023-12-31T15:10:35Z,-2.7,13.8,51.7,1009.8
esp8266_1,2023-12-31T15:20:19Z,-2.8,13.6,51.8,1009.9
2-3. マイグレーション用CSV出力シェルスクリプト
- スクリプトの入力パラメータ
- 観測機器デバイス名
- 検索開始日
- 検索終了日
- 出力先ディレクトリ
#!/bin/bash
# 引数の次の日を取得: next_to_date "${to_date}"
next_to_date() {
retval=$(date -d "$1 1 days" +'%F');
echo "$retval"
}
# influxdb用CSV出力 timestamp: timezone (UTC with GMT location)
get_csv() {
device_name="$1"
where_range="$2";
cat<<-EOF | sqlite3 "$PATH_WEATHER_DB" -csv
SELECT
td.name,
-- UNIX epoch timestamp
measurement_time,
-- RFC3339 timestamp
-- strftime('%Y-%m-%dT%H:%M:%SZ', measurement_time, 'unixepoch'),
temp_out,temp_in,humid,pressure
FROM
t_weather tw INNER JOIN t_device td ON td.id = tw.did
WHERE
td.name = '${device_name}' AND ${where_range}
ORDER BY measurement_time;
EOF
}
# 検索日は JST 時刻とする (SQLiteデータベースは JSTのタイムスタンプで保存している)
# getcsv_weather_unixtime_to_influxdb.sh esp8266_1, 2024-01-01, 2024-01-01 ~/data/influxdb/csv
# All parameter required
from_date="$2"
to_date="$3"
eclude_to_date=$(next_to_date "$to_date");
echo "eclude_to_date: ${eclude_to_date}"
cond_from="(datetime(measurement_time, 'unixepoch', 'localtime') >= '"${from_date}"')"
cond_to_next="(datetime(measurement_time, 'unixepoch', 'localtime') < '"${eclude_to_date}"')"
where_range="(${cond_from} AND ${cond_to_next})"
csv_filepath="$4/t_weather_unixtime.csv"
# For influxdb headers
schema='#constant measurement,weather'
data_type='#datatype tag,dateTime:number,double,double,double,double'
header='deviceName,time,temp_out,temp_in,humid,pressure'
echo $schema > "${csv_filepath}"
echo $data_type >> "${csv_filepath}"
echo $header >> "${csv_filepath}"
get_csv "$1" "${where_range}" >> "${csv_filepath}"
if [ $? = 0 ]; then
echo "Output t_weather csv to ${csv_filepath}"
row_count=$(cat "${csv_filepath}" | wc -l)
# レコード件数: コメント行2行とヘッダー行を差し引く
row_count=$(( row_count - 3))
echo "Record count: ${row_count}"
else
echo "Output error"
fi
(A) タイムスタンプに UNIX epoch を出力する場合
measurement_time,
(B) タイムスタンプに RFC3339フォーマットを出力する場合
strftime('%Y-%m-%dT%H:%M:%SZ', measurement_time, 'unixepoch'),
2-4. InfluxDB用CSVをエクスポート
# ラズパイゼロから気象データベースを検証環境にコピー
$ scp pi@raspi-zero:~/db/weather.db .
weather.db 100% 7284KB 3.5MB/s 00:02
$ # UNIX タイムスタンプ出力スクリプト
$ ./getcsv_weather_unixtime_to_influxdb.sh esp8266_1 2022-01-01 2024-01-30 ~/data/influxdb/csv
eclude_to_date: 2024-01-31
Output t_weather csv to /home/yukio/data/influxdb/csv/t_weather_unixtime.csv
Record count: 111567
出力されたCSVの抜粋
#constant measurement,weather
#datatype tag,dateTime:number,double,double,double,double
deviceName,time,temp_out,temp_in,humid,pressure
esp8266_1,1640962928,-8.6,13.6,41.9,1001.3
esp8266_1,1640963512,-8.6,12.9,39.2,1001.4
esp8266_1,1640964095,-8.5,12.8,43.2,1001.4
# 以下省略
3. InfluxDB (v2) にインポート
3-0. docker-compose 用リソースファイル
$ tree -a
.
├── .env
├── docker-compose.yml
└── influxv2.env
検証環境(Ubuntu) と 本番環境 (Raspbrry Pi 4) の切り替え用環境設定
#HOST_DB_VOLUME=/home/pi/influxdb-v2
#HOST_HOME=/home/pi
HOST_DB_VOLUME=/home/yukio/influxdb-v2
HOST_HOME=/home/yukio
# Time zone
TZ=Asia/Tokyo
- カスタムボリューム指定
- ホスト側のデータベース用ディレクトリ
- マイグレーション用のCSVを参照するためのホスト側のデータディレクトリ
※コンテナではサービス名 (influxdb) のユーザが作成されます
version: '3'
services:
influxdb:
image: influxdb:2.7-alpine
container_name: influxdb
env_file:
- influxv2.env
- ./.env
volumes:
# Mount for influxdb data directory and configuration
- "${HOST_DB_VOLUME}:/var/lib/influxdb2:rw"
- "${HOST_HOME}/data/influxdb/:/home/influxdb"
ports:
- "8086:8086"
influxdb v2 用のdockerコンテナ環境設定ファイル
DOCKER_INFLUXDB_INIT_MODE=setup
DOCKER_INFLUXDB_INIT_USERNAME=developer
DOCKER_INFLUXDB_INIT_PASSWORD=[developer password]
DOCKER_INFLUXDB_INIT_ORG=raspi-influxdb
DOCKER_INFLUXDB_INIT_BUCKET=sensor-bucket
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=0123456789abcdef
3-1. コンテナのビルドと起動
- docker-compose コマンド実行 ※コンソール出力は一部省略
2$ docker-compose up --build
Creating network "v2_default" with the default driver
Pulling influxdb (influxdb:2.7-alpine)...
2.7-alpine: Pulling from library/influxdb
c926b61bad3b: Pull complete
# ...一部省略...
c370875bed35: Pull complete
Digest: sha256:2ec0745cc2eed5444c599e274e59580162c840f43d917a493cab07ee9d41f746
Status: Downloaded newer image for influxdb:2.7-alpine
Creating influxdb ... done
Attaching to influxdb
influxdb | {
influxdb | "bolt-path": "/var/lib/influxdb2/influxd.bolt",
influxdb | "engine-path": "/var/lib/influxdb2/engine",
influxdb | "nats-port": 4222,
influxdb | "http-bind-address": ":9999"
influxdb | }
influxdb | 2024-01-25T08:23:42. info booting influxd server in the background {"system": "docker"}
# ...一部省略...
influxdb | ts=2024-01-25T08:23:45.853606Z lvl=info msg="Configuring InfluxQL statement executor (zeros indicate unlimited)." log_id=0mwwKiU0000 max_select_point=0 max_select_series=0 max_select_buckets=0
influxdb | ts=2024-01-25T08:23:45.910815Z lvl=info msg=Starting log_id=0mwwKiU0000 service=telemetry interval=8h
influxdb | ts=2024-01-25T08:23:45.911076Z lvl=info msg=Listening log_id=0mwwKiU0000 service=tcp-listener transport=http addr=:8086 port=8086
3-2. コンテナのInflux CLI でCSVファイルインポート
- 別の端末を開いて Influxdb コンテナに接続
$ docker exec -it influxdb bin/bash
9445fdd96a65:/# cd /home/influxdb/
9445fdd96a65:/home/influxdb# ls -l
total 20
drwxrwxr-x 2 influxdb influxdb 4096 Jan 24 05:53 csv
-rwxrwxr-x 1 influxdb influxdb 43 Jan 18 04:32 show_datetime.sh
9445fdd96a65:/home/influxdb# ls -l csv
total 10368
-rw-rw-r-- 1 influxdb influxdb 7981 Jan 24 03:05 t_weather_timezone.csv
-rw-rw-r-- 1 influxdb influxdb 4752022 Jan 25 07:59 t_weather_unixtime.csv
- マイグレーション用 CSVをインポート
※ UNIX タイムスタンプの精度を秒に設定: --precision s
※ 11万件近くあったので約40秒くらいかかりました
0a74547714e0:/home/influxdb# ./show_datetime.sh
2024-01-25 17:43:31 JST
0a74547714e0:/home/influxdb# influx write -o raspi-influxdb -b sensor-bucket \
> -t '0123456789abcdef' --precision s -f csv/t_weather_unixtime.csv
0a74547714e0:/home/influxdb# ./show_datetime.sh
2024-01-25 17:44:16 JST
- インポートしたデータの確認
0a74547714e0:/home/influxdb# influx v1 shell
InfluxQL Shell dev
Connected to InfluxDB OSS v2.7.5
> use "sensor-bucket"
> SELECT * FROM "weather"
こちらはインポートしたCSVファイルの抜粋
#constant measurement,weather
#datatype tag,dateTime:number,double,double,double,double
deviceName,time,temp_out,temp_in,humid,pressure
esp8266_1,1640962928,-8.6,13.6,41.9,1001.3
esp8266_1,1640963512,-8.6,12.9,39.2,1001.4
esp8266_1,1640964095,-8.5,12.8,43.2,1001.4
esp8266_1,1640964681,-8.4,13.0,41.7,1001.7
esp8266_1,1640965262,-8.6,12.4,39.5,1001.6
esp8266_1,1640965846,-8.6,12.0,43.1,1001.7
esp8266_1,1640966429,-8.8,11.6,41.8,1001.8
# 以下省略
UNIX epoch タイムスタンプで出力したCSVをインポートする場合、タイムスタンプの精度の指定を忘れないでください。
--precision s
公式サイトのドキュメント
Precision Description Example
ns | Nanoseconds 1577836800000000000
us | Microseconds 1577836800000000
ms | Milliseconds 1577836800000
s | Seconds 1577836800
精度の指定を忘れると検索に引っかからなくなります。
※おそらく数年〜数十年前のデータとして登録されてしまう
0a74547714e0:/home/influxdb# influx write -o raspi-influxdb -b sensor-bucket \
> -t '0123456789abcdef' -f csv/t_weather_unixtime.csv
※ この画面のように CSV の Unix epoch とtime列の整数部が同じならインポート失敗です。
これは失敗したときのCSVの抜粋
#constant measurement,weather
#datatype tag,dateTime:number,double,double,double,double
deviceName,time,temp_out,temp_in,humid,pressure
esp8266_1,1704034851,-2.9,14.0,51.7,1009.6
esp8266_1,1704035435,-2.7,13.8,51.7,1009.8
esp8266_1,1704036019,-2.8,13.6,51.8,1009.9
esp8266_1,1704036602,-3.0,13.4,51.4,1010.1
esp8266_1,1704037186,-3.0,13.2,50.9,1010.4
esp8266_1,1704037770,-3.2,13.0,51.4,1010.5
esp8266_1,1704038354,-3.3,12.9,51.9,1010.6
esp8266_1,1704038938,-3.6,12.7,51.4,1010.7
esp8266_1,1704039522,-3.5,12.6,51.8,1011.2
esp8266_1,1704040106,-3.6,12.4,51.8,1011.5
# ...以下省略...
3-3. RFC3339 タイムスタンプのCSVファイルインポート
RFC3339 タイムスタンプの場合、精度の指定は不要です
0a74547714e0:/home/influxdb# influx write -o raspi-influxdb -b sensor-bucket \
> -t '0123456789abcdef' -f csv/t_weather_timezone.csv
[CSVの内容の抜粋]
※1件目の観測データは "2024-01-01 00:00:51"からですが、UTCなので 9時間前 "2023-12-31T15:00:51Z"から始まっています。
#constant measurement,weather
#datatype tag,dateTime:RFC3339,double,double,double,double
deviceName,time,temp_out,temp_in,humid,pressure
esp8266_1,2023-12-31T15:00:51Z,-2.9,14.0,51.7,1009.6
esp8266_1,2023-12-31T15:10:35Z,-2.7,13.8,51.7,1009.8
esp8266_1,2023-12-31T15:20:19Z,-2.8,13.6,51.8,1009.9
esp8266_1,2023-12-31T15:30:02Z,-3.0,13.4,51.4,1010.1
esp8266_1,2023-12-31T15:39:46Z,-3.0,13.2,50.9,1010.4
esp8266_1,2023-12-31T15:49:30Z,-3.2,13.0,51.4,1010.5
esp8266_1,2023-12-31T15:59:14Z,-3.3,12.9,51.9,1010.6
esp8266_1,2023-12-31T16:08:58Z,-3.6,12.7,51.4,1010.7
esp8266_1,2023-12-31T16:18:42Z,-3.5,12.6,51.8,1011.2
esp8266_1,2023-12-31T16:28:26Z,-3.6,12.4,51.8,1011.5
4. 結論
CSVファイルによる過去データのマイグレーションで InfluxDBのタイムスタンプの設定は注意が必要です。 RFC3339形式タイムスタンプのCSVでインポートしたときにはOKなのに、Unix epochタイムスタンプCSVインポートではNG。タイムスタンプの精度指定に気がつくまで丸2日かかりました。
時間がかかりましたがこの記事で紹介している公式ドキュメントを丹念に読み解くことでようやく過去の気象データのマイグレーションが完了しました。
下記に influxdata社 が提供する python ライブラリを使って、今回インポートしたデータを取得する方法を公開しました。
InfluxDB v2 (OSS版) PythonクライアントでInfluxdbの気象データを取得する
■ この記事で紹介したスクリプトと気象データベースを下記 GitHubリポジトリで公開しました
- SQLite3気象データベース本体
pipito-yukio: databases/sqlie3/weather
README.md # 下記データベースの観測情報
weather.db # SQLite3気象データベース
weather_db.sql # DDL
migrateFromSQLiteWeatherDB/
├── getcsv_weather_timezone_to_influxdb.sh # RFC3339 タイムゾーンを出力するスクリプト
├── getcsv_weather_to_postgres.sh # PostgreSQL用のスクリプト
└── getcsv_weather_unixtime_to_influxdb.sh # UNIX EPOCH タイムスタンプをを出力するスクリプト
- docker-composeリソース
docker/
└── simple
├── .env
├── docker-compose.yml
└── influxv2.env```