はじめに
Node-REDでよくElasticsearchへデータ投入するので、ノウハウを書きたいです。
追加ノードでも色々ありますが、Elasticsearchはバージョンの更新が早く、APIも変わる事があります。
そのため、標準のノードを組み合わせての利用が無難かと思ってます。
ソフトウェア環境
- Node-RED v1.0.3
- Elasticsearch 7.3.2
サブフロー化
まずは、Elasticsearchへ通信するhttpノードをサブフロー化します。これにより、BASIC認証などの情報をノードを設置する際に毎回設定しなくて済みます。あと、常にヘッダに"Content-Type: 'application/json'"を設定するようにしましょう。これで少し、Elasticsearchへの接続設定の手間が省けます。
URLやメソッドはリクエストにより変化するため、サブフロー内では設定しないようにしましょう。
書き出したコードは以下です。
[{"id":"3f7774f5.a6a20c","type":"subflow","name":"Elasticsearch","info":"","category":"","in":[{"x":50,"y":30,"wires":[{"id":"c5bb15c.4c10de8"}]}],"out":[{"x":580,"y":30,"wires":[{"id":"19d2a3b.d048a5c","port":0}]}],"env":[],"color":"#DDAA99"},{"id":"eeda88c1.f8b278","type":"http request","z":"3f7774f5.a6a20c","name":"Elasticsearch","method":"use","ret":"txt","paytoqs":false,"url":"","tls":"","persist":false,"proxy":"","authType":"basic","x":340,"y":30,"wires":[["19d2a3b.d048a5c"]]},{"id":"c5bb15c.4c10de8","type":"change","z":"3f7774f5.a6a20c","name":"msg.headers","rules":[{"t":"delete","p":"headers","pt":"msg"},{"t":"set","p":"headers","pt":"msg","to":"{\"Content-Type\":\"application/json\",\"Connection\":\"close\"}","tot":"json"}],"action":"","property":"","from":"","to":"","reg":false,"x":170,"y":30,"wires":[["eeda88c1.f8b278"]]},{"id":"19d2a3b.d048a5c","type":"json","z":"3f7774f5.a6a20c","name":"","property":"payload","action":"","pretty":false,"x":490,"y":30,"wires":[[]]}]
1. データ投入
Linkingデバイスという、環境センサーからのビーコンを受信し、Elasticsearchへデータ投入する例です。
linking-scannerノードからは以下のようなデータがペイロードとして渡されます。
このデータをElasticsearchへ投入するために、Changeノードで msg.method と msg.url を設定します。
msg.urlは前のノードの値を使って、動的なURLを生成します。
プログラムを組める方はfunctionノードに頼りがちですが、実は使わずに生成できます。
ついでに、JSONata式を使えば、現時刻もChangeノードで生成できます。
Elasticsearchへ投入するJSONデータをテンプレートノードで生成します。
Mustacheテンプレートを使えば、msgオブジェクト内のデータが使えます。
このフローを書き出したものがこちらです。(Elasticsearchサブフローも含まれます)
[{"id":"3f7774f5.a6a20c","type":"subflow","name":"Elasticsearch","info":"","category":"","in":[{"x":50,"y":30,"wires":[{"id":"c5bb15c.4c10de8"}]}],"out":[{"x":580,"y":30,"wires":[{"id":"19d2a3b.d048a5c","port":0}]}],"env":[],"color":"#DDAA99"},{"id":"eeda88c1.f8b278","type":"http request","z":"3f7774f5.a6a20c","name":"Elasticsearch","method":"use","ret":"txt","paytoqs":false,"url":"","tls":"","persist":false,"proxy":"","authType":"basic","x":340,"y":30,"wires":[["19d2a3b.d048a5c"]]},{"id":"c5bb15c.4c10de8","type":"change","z":"3f7774f5.a6a20c","name":"msg.headers","rules":[{"t":"delete","p":"headers","pt":"msg"},{"t":"set","p":"headers","pt":"msg","to":"{\"Content-Type\":\"application/json\",\"Connection\":\"close\"}","tot":"json"}],"action":"","property":"","from":"","to":"","reg":false,"x":170,"y":30,"wires":[["eeda88c1.f8b278"]]},{"id":"19d2a3b.d048a5c","type":"json","z":"3f7774f5.a6a20c","name":"","property":"payload","action":"","pretty":false,"x":490,"y":30,"wires":[[]]},{"id":"e6695b94.08cca8","type":"subflow:3f7774f5.a6a20c","z":"f4557e75.b0f3f","x":640,"y":140,"wires":[[]]},{"id":"7108220c.1b5d9c","type":"change","z":"f4557e75.b0f3f","name":"POST _doc","rules":[{"t":"set","p":"method","pt":"msg","to":"POST","tot":"str"},{"t":"set","p":"url","pt":"msg","to":"http://localhost:9200/sensor-","tot":"str"},{"t":"change","p":"url","pt":"msg","from":"$","fromt":"re","to":"payload.service","tot":"msg"},{"t":"change","p":"url","pt":"msg","from":"$","fromt":"re","to":"/_doc","tot":"str"},{"t":"set","p":"date","pt":"msg","to":"$now()","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":280,"y":140,"wires":[["69e18d2a.9e18d4"]]},{"id":"69e18d2a.9e18d4","type":"template","z":"f4557e75.b0f3f","name":"ES Query JSON","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{\n \"date\": \"{{{date}}}\",\n \"device\": \"{{{payload.device}}}\",\n \"{{payload.service}}\": {{{payload.data}}}\n}","output":"str","x":460,"y":140,"wires":[["e6695b94.08cca8"]]},{"id":"d541687c.a302d8","type":"linking-scanner","z":"f4557e75.b0f3f","name":"","autostart":true,"duration":"","interval":"30","x":100,"y":140,"wires":[["7108220c.1b5d9c"]]}]
2. 時系列クエリ
最新の温度を検索する場合のフローです。シンプルに、ChangeノードとElasticsearchサブフローのみです。
Changeノードは以下のように、msg.method と msg.url と、ms.payloadに検索クエリを設定します。
Elasticsearchの検索クエリは以下のようになってます。日付をDESCにして最初の1レコードのみを要求します。
結果は以下のようになります。 msg.payload.hits.hits[0]._source にデータが返ってきます。
直近10分間の平均は以下のような検索クエリにしています。
msg.payload.aggregations.temperature_avg.buckets[0].aggs.value に入ってますね。
3. 単語解析
Elasticsearchでは分かち書きのような事もできます。
日本語を単語分したい場合は、Elasticsearchのプラグインanalysis-kuromojiをインストールする必要があります。
結果は、msg.payload.tokens[]に入ってます。
単語が取れれば、単語についての分析などが実装できると思います。
settings , mappings もできますし、ファイルノードを組み合わせてユーザ辞書の更新もNode-REDでしています。
おわりに
個人的に、Node-REDとElasticsearchは非常に相性良く組み合わせて使う事ができると考えています。
実際に仕事ではオリジナルのデータに分析データを埋め込み、Kibanaで可視化をしています。