InfluxDBv2のFluxクエリ、わかりにくいですね! 書き込み時、フィールドに複数のカラムを書き込んだものが、InfluxQLでは1レコードとして扱えていたのに、Fluxクエリではフィールド毎にバラバラのレコードになるし、出力件数を制限しようとlimit()ファンクションを使っても、なかなかうまく扱えなくて最初は困りました。
ただ、公式ドキュメントは、例示が少ないだけで、説明はきちんと書かれているので、理解の参考になるよう実例とともに補足説明をします。
アノテーテッドCSV(annotated CSV)について
公式ドキュメントはこちら → https://docs.influxdata.com/influxdb/v2.2/reference/syntax/annotated-csv/
アノテーテッドCSVは、アノテーション行、ヘッダ行、データ行からなる CSVデータで、以下のように構成されている。
- 行の最初の文字が「#」の行は、アノテーション行(API呼出時に、dialectオプションを指定した場合)(「#」の文字は、API呼出時のオプションで変更できる)。
- 最初のカラムが、「#group」のアノテーション行は、グループ・キー(group key)を示す。trueのカラムのキーは、グループ・キーのメンバー。
- 最初のカラムが、「#datatype」のアノテーション行は、各カラムのデータ型を示す。
- アノテーション行でない最初の行は、ヘッダ行(API呼出時に、headerオプションをつけた場合)。各カラムのキー名を示す。
- ヘッダ行以降の行は、レコード行。各カラムの値を出力する。
- 各行の最初のカラムには、アノテーション名が出力される。アノテーション行以外の行では、空文字列が出力される。
- レコード行の2番目のカラムは、リザルト・カラム(result column)。yield()ファンクションのパラメータで指定された名前が出力される。暗黙に指定される名前は「_result」。
- レコード行の3番目のカラムは、テーブル・カラム(table column)。各テーブルに与えられたユニークな番号が出力される。
- カラムの構成が異なるテーブルを続けて出力する場合は、空行を1行出力して、次のCSVデータを出力する。
以下のアノテーテッドCSVのサンプルの構成を説明する。
- 1行目は、groupアノテーション行。trueのカラムに対応するヘッダ行のカラム名から、グループ・キーは、_start, _stop, _field, _measurement, sensor_id であることがわかる。
- 2行目は、datatypeアノテーション行。各カラムのデータ型がわかる。
- 3行目は、ヘッダ行。各カラムのカラム名(キー名)がわかる。
- 4行目以降は、データ行。テーブルのリザルト名(the name of the result)は、「_result」。
- もし、続けて別のCSVデータが出力される場合は、1行空けて次のアノテーション行から出力される。
#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
,result,table,_start,_stop,_time,_value,_field,_measurement,sensor_id
,_result,0,1970-01-01T00:00:00Z,2022-06-11T14:47:51.525239812Z,2022-06-10T14:30:11Z,0.39909161878977406,co,airSensors,TLM0100
,_result,0,1970-01-01T00:00:00Z,2022-06-11T14:47:51.525239812Z,2022-06-10T14:30:21Z,0.380589982808886,co,airSensors,TLM0100
,_result,0,1970-01-01T00:00:00Z,2022-06-11T14:47:51.525239812Z,2022-06-10T14:30:31Z,0.3871473017805706,co,airSensors,TLM0100
,_result,0,1970-01-01T00:00:00Z,2022-06-11T14:47:51.525239812Z,2022-06-10T14:30:41Z,0.39797793163318385,co,airSensors,TLM0100
,_result,1,1970-01-01T00:00:00Z,2022-06-11T14:47:51.525239812Z,2022-06-10T14:30:11Z,34.65666051320767,humidity,airSensors,TLM0100
,_result,1,1970-01-01T00:00:00Z,2022-06-11T14:47:51.525239812Z,2022-06-10T14:30:21Z,34.64979119357441,humidity,airSensors,TLM0100
.
(略)
.
(このサンプルは、 https://docs.influxdata.com/influxdb/v2.2/reference/sample-data/ から取得した)
Flux独特の特性で注意が必要なのが以下の点。
- グループ・キーのカラムの値が同じレコードには、同じテーブル番号が割り当てられる。CSVでは、テーブル番号は3番目のカラムに出力される。
- カラムの構成が異なる場合や、カラム名が同じであってもカラムのデータ型が異なる場合、リザルト名が同じだが別のCSVデータとして出力される。
last()ファンクションやlimit()ファンクションは、上記のテーブル(table)の各々を対象として処理する。
クエリサンプル
サンプルに使っているデータは、 https://docs.influxdata.com/influxdb/v2.2/reference/sample-data/ からコピーした。
from(bucket: "airSensor")
|> range(start: -5m)
bucketから読み出したデータは、カラム _start, _stop, _field, _measurement, sensor_idをグループ・キーとしてグループ化されている。_timeカラムがグループ・キーに含まれないので、同じセンサーの全ての時系列のデータが、一つのテーブルにまとめられている。_fieldがグループキーに含まれるので、同じセンサーでも、各々のセンサーデータ毎に、別のテーブルにまとめられている。
以下のサンプルデータは、一つのセンサー(TLM0100)の、四つの取得時刻(14:30:11, 14:30:21, 14:30:31, 14:30:41)の、三つのセンサーデータ(co, humidity, temperature)の部分で、三つのテーブル(0, 1, 2)にまとめられている。
#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
,result,table,_start,_stop,_time,_value,_field,_measurement,sensor_id
,_result,0,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:11Z,0.39909161878977406,co,airSensors,TLM0100
,_result,0,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:21Z,0.380589982808886,co,airSensors,TLM0100
,_result,0,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:31Z,0.3871473017805706,co,airSensors,TLM0100
,_result,0,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:41Z,0.39797793163318385,co,airSensors,TLM0100
,_result,1,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:11Z,34.65666051320767,humidity,airSensors,TLM0100
,_result,1,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:21Z,34.64979119357441,humidity,airSensors,TLM0100
,_result,1,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:31Z,34.69014639878911,humidity,airSensors,TLM0100
,_result,1,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:41Z,34.7300776495192,humidity,airSensors,TLM0100
,_result,2,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:11Z,71.66802506762716,temperature,airSensors,TLM0100
,_result,2,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:21Z,71.70553232217618,temperature,airSensors,TLM0100
,_result,2,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:31Z,71.70177006806514,temperature,airSensors,TLM0100
,_result,2,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:41Z,71.71693530586926,temperature,airSensors,TLM0100
.
(略)
.
一つのバケットに複数のメジャーメントを保存している場合は、filter()ファンクションで選択する。
|> filter(fn: (r) => r._measurement == "airSensors")
各フィールドをまとめて一つのレコードにするには、pivot()ファンクションを使う。一つのレコードにまとめられるレコードを選択するキーになるカラムは、メジャーメント、タグ・キー、時刻だから、rowKeyパラメータに _measurement, sensor_id, _time を指定する。
|> pivot(rowKey: ["_measurement", "sensor_id","_time"], columnKey: ["_field"], valueColumn: "_value")
#group,false,false,true,true,false,true,true,false,false,false
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double,double,double
,result,table,_start,_stop,_time,_measurement,sensor_id,co,humidity,temperature
,_result,0,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:11Z,airSensors,TLM0100,0.39909161878977406,34.65666051320767,71.66802506762716
,_result,0,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:21Z,airSensors,TLM0100,0.380589982808886,34.64979119357441,71.70553232217618
,_result,0,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:31Z,airSensors,TLM0100,0.3871473017805706,34.69014639878911,71.70177006806514
,_result,0,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:41Z,airSensors,TLM0100,0.39797793163318385,34.7300776495192,71.71693530586926
,_result,1,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:11Z,airSensors,TLM0101,0.6005811452576335,34.313877023009965,71.10510825962483
,_result,1,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:21Z,airSensors,TLM0101,0.5850873789566544,34.28479422621333,71.05713846359872
,_result,1,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:31Z,airSensors,TLM0101,0.5724686137815466,34.24395154472709,71.08119197367347
,_result,1,2022-06-10T14:30:11Z,2022-06-10T14:45:11Z,2022-06-10T14:30:41Z,airSensors,TLM0101,0.5562425635901556,34.206671025500086,71.12994812729201
.
(略)
.
フィールド co, humidity, temperature がカラムとして追加される。
pivot()ファンクションの仕様で、出力テーブルのグループ・キーは、入力テーブルのグループ・キーからcolumnKey, valueColumnパラメータに指定したカラムを取り除いたものになる。
時系列データから最新のデータのみ取り出すには、last()ファンクションを使うが、このファンクションは、プッシュダウン・ファンクション(pushdown function)だから、filter()ファンクションに続けた方が、メモリ効率が良い。
ついでに、_start, _stopカラムも取り除く。
from(bucket: "airSensor")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "airSensors")
|> last()
|> drop(columns: ["_start", "_stop"])
|> pivot(rowKey: ["_measurement", "sensor_id","_time"], columnKey: ["_field"], valueColumn: "_value")
#group,false,false,false,true,true,false,false,false
#datatype,string,long,dateTime:RFC3339,string,string,double,double,double
,result,table,_time,_measurement,sensor_id,co,humidity,temperature
,_result,0,2022-06-10T14:30:41Z,airSensors,TLM0100,0.39797793163318385,34.7300776495192,71.71693530586926
,_result,1,2022-06-10T14:30:41Z,airSensors,TLM0101,0.5562425635901556,34.206671025500086,71.12994812729201
,_result,2,2022-06-10T14:30:41Z,airSensors,TLM0102,0.6882972746068912,35.15974683713586,71.90311595614837
,_result,3,2022-06-10T14:30:41Z,airSensors,TLM0103,0.36760471429652314,35.21884065715872,70.88429394086795
,_result,4,2022-06-10T14:30:41Z,airSensors,TLM0200,4.3135107701450774,35.993381839606094,73.60988368223512
,_result,5,2022-06-10T14:30:41Z,airSensors,TLM0201,0.8438587667266314,34.89097390677374,74.22954054024804
,_result,6,2022-06-10T14:30:41Z,airSensors,TLM0202,0.6294125353288226,35.88090731439027,75.48257881877048
,_result,7,2022-06-10T14:30:41Z,airSensors,TLM0203,0.3844136234548983,35.805296663082515,74.30799107202368
前述の通り、グループ・キーが _measurement, sensor_id になるので、センサー毎に別のテーブル(テーブル番号 0〜7)に分かれている。一つのテーブルにまとめるには、group()ファンクションを使う。
|> group()
#group,false,false,false,false,false,false,false,false
#datatype,string,long,dateTime:RFC3339,string,string,double,double,double
,result,table,_time,_measurement,sensor_id,co,humidity,temperature
,_result,0,2022-06-10T14:30:41Z,airSensors,TLM0100,0.39797793163318385,34.7300776495192,71.71693530586926
,_result,0,2022-06-10T14:30:41Z,airSensors,TLM0101,0.5562425635901556,34.206671025500086,71.12994812729201
,_result,0,2022-06-10T14:30:41Z,airSensors,TLM0102,0.6882972746068912,35.15974683713586,71.90311595614837
,_result,0,2022-06-10T14:30:41Z,airSensors,TLM0103,0.36760471429652314,35.21884065715872,70.88429394086795
,_result,0,2022-06-10T14:30:41Z,airSensors,TLM0200,4.3135107701450774,35.993381839606094,73.60988368223512
,_result,0,2022-06-10T14:30:41Z,airSensors,TLM0201,0.8438587667266314,34.89097390677374,74.22954054024804
,_result,0,2022-06-10T14:30:41Z,airSensors,TLM0202,0.6294125353288226,35.88090731439027,75.48257881877048
,_result,0,2022-06-10T14:30:41Z,airSensors,TLM0203,0.3844136234548983,35.805296663082515,74.30799107202368
おわり
Fluxにおける「テーブル」の扱いがお分かりいただけただろうか。