はじめに
Azure を使って、センサ(Mosquitto)からのデータを PostgreSQL まで持っていくようなデータパイプラインをつくりました1
ここでは、EventHub
> Functions
> PostgreSQL
のところを設定するにあたってやったことをつらつらと書きます
ちなみに Mosquitto
> IoT Hub
周りは、Azure IoT Hub に Mosquitto™ から MQTT なげてみる に書いてあります
やったこと
やったことは、こんな感じの流れになります
-
EventHub
はできていて、データが流れてきている状態からスタート2 -
PostgreSQL
を設定する -
Functions
を設定する
EventHub から流れてくるデータ
こんな感じの電流値センサからのデータです
{
"id": "11253",
"mid": "M_180331",
"name": "60Min_W-PDP-I4B-A1 出力回路23 電流 計測",
"time": "2020/05/05 08:00:00",
"value": "0",
"unit": "A",
"EventProcessedUtcTime": "2020-07-25T05:06:10.8586288Z",
"PartitionId": 0,
"EventEnqueuedUtcTime": "2020-07-25T05:06:10.777Z",
"IoTHub": {
"MessageId": null,
"CorrelationId": null,
"ConnectionDeviceId": "iot-gw-tk10",
"ConnectionDeviceGenerationId": "637294479848408560",
"EnqueuedTime": "2020-07-25T05:06:10"
}
}
PostgresSQL を設定する
ここはあまり書くことがありません。以下のドキュメントに従って進めるだけです
Quickstart: Create an Azure Database for PostgreSQL server in the Azure portal
- PostgreSQLの名前やパスワードだけ控えておきます
- 途中、データベースに外部からアクセスできるように、IPアドレスのフィルタを設定します。Azure の ネットワークセキュリティとは独立した設定になるようなので注意が必要です(ネットワークセキュリティの方では穴を開けてなくても、こちらを開けると穴が開いてしまいます)
psql
をインストールして接続
(PostgreSQL全体ではなく) psql
だけをインストール(Mac の場合)
brew install libpq
インストールしたところにパスをとおす。私の場合は以下
/usr/local/Cellar/libpq/12.3/bin
psql
で接続する
psql --host=*****.postgres.database.azure.com --port=5432 --username=*****@***** --dbname=postgres
データを受け取るテーブルを作成
CREATE TABLE public.iotdata
(
deviceid integer NOT NULL,
createdate timestamp without time zone NOT NULL DEFAULT now(),
data jsonb
);
データをカラムに分けた方がいいんですが、まずは動かすことに注力して、データは JSON
のまま jsonb
のカラムに放り込む方向性
postgres=> CREATE DATABASE iotdemo;
postgres=> \c iotdemo
iotdemo=> \c
psql (12.3, server 10.11)
SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: off)
You are now connected to database "iotdemo" as user "*****@*****".
iotdemo=> CREATE TABLE public.iotdata
iotdemo-> (
iotdemo(> deviceid integer NOT NULL,
iotdemo(> createdate timestamp without time zone NOT NULL DEFAULT now(),
iotdemo(> data jsonb
iotdemo(> );
CREATE TABLE
iotdemo=>
iotdemo=>
iotdemo=> \dt
List of relations
Schema | Name | Type | Owner
--------+---------+-------+---------
public | iotdata | table | *****
(1 row)
Azure Functions を設定する
Azure Functions の理解
他のクラウドで Functions を触ったことがあればそれほど違和感はない3 。初めてであれば、いきなり EventHub
をターゲットにせずに、ファンクション自体の理解のために、最初に クイック スタート:HTTP 要求に応答する関数を Azure で作成する をやっておくと流れが掴めるのでおすすめ
開発環境
以下から選ぶ感じ4
- Webポータル上でコード書く(なにかとつらい)
- ローカルの場合 (おすすめ)
前提条件
ファンクションを書く前に、このあたりを事前に準備しておく
-
Azure Functions Core Tools バージョン
2.7.1846
以降の2.x
バージョン -
Azure CLI バージョン
2.4
以降5 -
node
はアクティブ LTS およびメンテナンス LTS バージョン (8.11.1
および10.14.1
を推奨)6
なんか、準備だけで、だいぶ、面倒くさくなってきてるが.... 進める
ローカルで書く場合のプロジェクト構成
ファンクションは FunctionApp
> Function
という階層になっているようだ
- 開発言語や実行環境(Win/Linux)などは
FunctionApp
単位で定義 -
FunctionApp
に複数のFunctions
が存在可能 - クラウドへのデプロイ(
publish
)はFunctionApp
単位 -
node
の場合はnode_modules
ごとデプロイすることになる7
├── FunctionApp
│ ├── EventHubTrigger # ファンクション#1
│ │ ├── function.json
│ │ └── index.js
│ ├── HTTPTrigger # ファンクション#2
│ │ ├── function.json
│ │ └── index.js
:
:
├── host.json
├── local.settings.json
├── node_modules
│ ├── buffer-writer
│ │ ├── LICENSE
:
:
トリガーとバインドの概念
ここで、Azure Functions でのトリガーとバインドの概念 を読む。今回は、トリガーとしての設定になる。EventHub
にデータがきたらそれをトリガーにファンクションが動いて、DBに書き込む
ファンクションの設定
準備が整ったので、いよいよはじめるよ
ストレージ作成
まず、作ったファンクションをおく場所であるストレージを作る
$ az storage account create --name ***** --location japaneast --resource-group *****-poc --sku Standard_LRS
$ az storage account show --name ***** -o table
AccessTier CreationTime EnableHttpsTrafficOnly Kind Location Name PrimaryLocation ProvisioningState ResourceGroup StatusOfPrimary
------------ -------------------------------- ------------------------ --------- ---------- ------- ----------------- ------------------- --------------- -----------------
Hot 2020-07-23T09:07:56.927438+00:00 True StorageV2 japaneast ***** japaneast Succeeded *****-poc available
FunctionApp をつくる
クラウド側に FunctionApp をつくる
$ az functionapp create --resource-group *****-poc --consumption-plan-location japaneast --runtime node --runtime-version 10 --functions-version 2 --name *****-func-e2p --storage-account *****
$ az functionapp list -o table
Name Location State ResourceGroup DefaultHostName AppServicePlan
------------------- ---------- ------- ------------------- ------------------------------------- --------------------------
*****-func-e2p Japan East Running *****-poc *****-func-e2p.azurewebsites.net JapanEastPlan
$ az functionapp config appsettings list --name *****-func-e2p --resource-group *****-poc
$ func azure functionapp list-functions *****-func-e2p --show-keys
ローカル側で FunctionApp を初期化
func init e2p --javascript
├── e2p
│ ├── host.json
│ ├── local.settings.json
│ └── package.json
host.json
の version
を 3
はファンクションランタイムバージョンではない。単純にhost.json
のスキーマのバージョンなので、2
のままで良い
ここで、e2p
におりて、ファンクションをつくる
今回は、Azure Event Hub trigger
で作成
$ func new
Select a number for template:
1. Azure Blob Storage trigger
2. Azure Cosmos DB trigger
3. Durable Functions activity
4. Durable Functions HTTP starter
5. Durable Functions orchestrator
6. Azure Event Grid trigger
7. Azure Event Hub trigger
8. HTTP trigger
9. IoT Hub (Event Hub)
10. Azure Queue Storage trigger
11. SendGrid
12. Azure Service Bus Queue trigger
13. Azure Service Bus Topic trigger
14. SignalR negotiate HTTP trigger
15. Timer trigger
Choose option: 7
Azure Event Hub trigger
Function name: [EventHubTrigger]
Writing /Users/*****/azure/e2p/EventHubTrigger/index.js
Writing /Users/*****/azure/e2p/EventHubTrigger/function.json
The function "EventHubTrigger" was created successfully from the "Azure Event Hub trigger" template.
function.json
と index.js
のテンプレが作成されるので、function.json
のパラメータを 接続するEventHub
に合わせて書き換える
{
"bindings": [
{
"type": "eventHubTrigger",
"name": "eventHubMessages",
"direction": "in",
"eventHubName": "*****",
"connection": "*****_RootManageSharedAccessKey_EVENTHUB",
"cardinality": "many",
"consumerGroup": "$Default"
}
]
}
ここで、よくわからなかったのが、connection
の値8。結論としては、これは Event Hubs 名前空間
> 共有アクセス ポリシー
にある主キー
の値のこと。実際には 主キー
をそのまま書くのではなく、ファンクションの 構成
> アプリケーション設定
に環境変数のような感じで書いておくのがベストプラクティスのようだ(以下参照)9
ファンクションを書く
やっと、index.js
に中身を書く... 今回書いたのは超シンプルにこんな感じ
module.exports = async function (context, eventHubMessages) {
var pg = require('pg');
const config = {
host: '*****.postgres.database.azure.com',
user: '*****@*****',
password: '*****',
database: 'iotdemo',
port: 5432,
ssl: true
};
var client = new pg.Client(config);
eventHubMessages.forEach((message, index) => {
context.log("Processed message: " + JSON.stringify(message));
context.log("id: " + message.id)
const sql = 'insert into iotdata(deviceid, data) values(' + message.id + ',' + '\'' + JSON.stringify(message) + '\');';
console.log("SQL: " + sql);
client.connect()
client.query(sql, (err, res) => {
if (err) {
console.error(err.stack)
} else {
console.log(res)
console.log('insert completed successfully!');
}
client.end()
})
});
context.log("eventHubMessages: " + JSON.stringify(eventHubMessages));
};
pg
パッケージをインストールする
$ npm install pg
とやっておいて、node_modules
ができるのを確認する。このフォルダも一緒にクラウドにあげることになる
クラウドにアップロードする
publish
は host.json
のあるところで叩く10
$ func azure functionapp publish *****-func-e2p
$ func azure functionapp publish *****-func-e2p
You're trying to use v3 tooling to publish to a non-v3 function app (FUNCTIONS_EXTENSION_VERSION is set to ~2).
You can pass --force to force update the app to v3, or downgrade to v1 or v2 tooling for publishing.
*****:e2p *****$ func azure functionapp publish *****-func-e2p --force
Getting site publishing info...
Creating archive for current directory...
Uploading 1.21 KB [###############################################################################]
Upload completed successfully.
Deployment completed successfully.
Syncing triggers...
Functions in *****-func-e2p:
EventHubTrigger - [eventHubTrigger]
これで、ひとまず動くはず。
確認する
ポータルの App Service
> 関数
> モニタ
> ログ
でログが確認できる
2020-07-25T05:06:11Z [Information] Executing 'Functions.EventHubTrigger' (Reason='', Id=e1066a47-60af-4d8b-a9df-8d1c5a31a76e)
2020-07-25T05:06:11Z [Information] Processed message: {"id":"11253","mid":"M_180331","name":"60Min_W-PDP-I4B-A1 出力回路23 電流 計測","time":"2020/05/05 08:00:00","value":"0","unit":"A","EventProcessedUtcTime":"2020-07-25T05:06:10.8586288Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-07-25T05:06:10.777Z","IoTHub":{"MessageId":null,"CorrelationId":null,"ConnectionDeviceId":"iot-gw-tk10","ConnectionDeviceGenerationId":"637294479848408560","EnqueuedTime":"2020-07-25T05:06:10"}}
2020-07-25T05:06:11Z [Information] id: 11253
2020-07-25T05:06:11Z [Information] eventHubMessages: [{"id":"11253","mid":"M_180331","name":"60Min_W-PDP-I4B-A1 出力回路23 電流 計測","time":"2020/05/05 08:00:00","value":"0","unit":"A","EventProcessedUtcTime":"2020-07-25T05:06:10.8586288Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-07-25T05:06:10.777Z","IoTHub":{"MessageId":null,"CorrelationId":null,"ConnectionDeviceId":"iot-gw-tk10","ConnectionDeviceGenerationId":"637294479848408560","EnqueuedTime":"2020-07-25T05:06:10"}}]
2020-07-25T05:06:11Z [Information] Executed 'Functions.EventHubTrigger' (Succeeded, Id=e1066a47-60af-4d8b-a9df-8d1c5a31a76e)
あとは、PostgreSQL に SQL なげてみればデータ入っているはず
今後
-
Mosquitto
のところを、IoT Edge
にして、コンテナ化して遊ぶ -
Streaming Analytics
でいろいろアノマリーを検知して遊ぶ -
PowerBI
で可視化して遊ぶ - センサの数を増やし、データをよりリアルタイムにして遊ぶ
参考にしたサイト
- Creating IoT applications with Azure Database for PostgreSQL
- [Send telemetry from a device to an IoT hub to Azure Database for PostgreSQL]([https://github.com/Azure/azure-postgresql/tree/master/samples/IoT%20Demo](https://github.com/Azure/azure-postgresql/tree/master/samples/IoT Demo))
- クイック スタート:Node.js を使用して Azure Database for PostgreSQL - Single Server に接続し、データにクエリを実行する
- Azure functions integration with Postgres in Node.js
-
こんなリッチな構成が本当に必要なのかと思わなくはない ↩
-
IoT Hub
>Streaming Analytics
>EventHub
はポータル上でマウスでコチコチやれば比較的簡単につながりますのでやってみてください ↩ -
Firebase のファンクションの方が使いやすいな... という違和感を除く ↩
-
ただし、今回は言語を JavaScript にしてるので、PostgreSQL と繋げるためにパッケージを入れる必要があることから、ローカルでやることになるのであった ↩
-
さあ、
brew upgrade azure-cli
をどうそ ↩ -
この記述は古いと思われる。Function のバージョンが
3.x
であればnode 12
が使えるはず 。私はここで、homebrew
で入れた最新の14
を消して、nvm
を入れ、12
を入れる羽目になる。Happy Yac Shaving?! ↩ -
今回 PostgreSQLと接続するために
pg
パッケージを使うので これは必須 ↩ -
Webポータルからファンクションをつくると、GUIで選択できるのでそれほど困らないが、ローカルでファンクション作ると何を書けばいいのかわかりにくい。コンポーネントごとに毎回キーを指定しないといけないのは、Azure の ダウンサイドだな... がんばれ Azure IAM... ↩
-
CLIでは
az functionapp config appsettings list -g *****-poc --name *****-func-e2p
みたいな感じで確認・設定できる ↩ -
さもなくば、
Unable to find project root. Expecting to find one of host.json in project root.
と怒られる ↩