3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Azure IoT Data Pipeline - EventHub から Functions で PostgreSQL へ

Last updated at Posted at 2020-07-25

はじめに

Screen Shot 2020-07-25 at 14.05.54.png

Azure を使って、センサ(Mosquitto)からのデータを PostgreSQL まで持っていくようなデータパイプラインをつくりました1

ここでは、EventHub > Functions > PostgreSQL のところを設定するにあたってやったことをつらつらと書きます

ちなみに Mosquitto > IoT Hub 周りは、Azure IoT Hub に Mosquitto™ から MQTT なげてみる に書いてあります

やったこと

やったことは、こんな感じの流れになります

  1. EventHub はできていて、データが流れてきている状態からスタート2
  2. PostgreSQL を設定する
  3. Functions を設定する

EventHub から流れてくるデータ

こんな感じの電流値センサからのデータです

{
  "id": "11253",
  "mid": "M_180331",
  "name": "60Min_W-PDP-I4B-A1 出力回路23 電流 計測",
  "time": "2020/05/05 08:00:00",
  "value": "0",
  "unit": "A",
  "EventProcessedUtcTime": "2020-07-25T05:06:10.8586288Z",
  "PartitionId": 0,
  "EventEnqueuedUtcTime": "2020-07-25T05:06:10.777Z",
  "IoTHub": {
    "MessageId": null,
    "CorrelationId": null,
    "ConnectionDeviceId": "iot-gw-tk10",
    "ConnectionDeviceGenerationId": "637294479848408560",
    "EnqueuedTime": "2020-07-25T05:06:10"
  }
}

PostgresSQL を設定する

ここはあまり書くことがありません。以下のドキュメントに従って進めるだけです

Quickstart: Create an Azure Database for PostgreSQL server in the Azure portal

  • PostgreSQLの名前やパスワードだけ控えておきます
  • 途中、データベースに外部からアクセスできるように、IPアドレスのフィルタを設定します。Azure の ネットワークセキュリティとは独立した設定になるようなので注意が必要です(ネットワークセキュリティの方では穴を開けてなくても、こちらを開けると穴が開いてしまいます)

psql をインストールして接続

(PostgreSQL全体ではなく) psql だけをインストール(Mac の場合)

brew install libpq

インストールしたところにパスをとおす。私の場合は以下

/usr/local/Cellar/libpq/12.3/bin

psql で接続する

psql --host=*****.postgres.database.azure.com --port=5432 --username=*****@***** --dbname=postgres

データを受け取るテーブルを作成

テーブル作成SQL

CREATE TABLE public.iotdata
(
    deviceid integer NOT NULL,
    createdate timestamp without time zone NOT NULL DEFAULT now(),
    data jsonb
);

データをカラムに分けた方がいいんですが、まずは動かすことに注力して、データは JSON のまま jsonb のカラムに放り込む方向性


postgres=> CREATE DATABASE iotdemo;

postgres=> \c iotdemo

iotdemo=> \c
psql (12.3, server 10.11)
SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: off)
You are now connected to database "iotdemo" as user "*****@*****".
iotdemo=> CREATE TABLE public.iotdata
iotdemo-> (
iotdemo(>     deviceid integer NOT NULL,
iotdemo(>     createdate timestamp without time zone NOT NULL DEFAULT now(),
iotdemo(>     data jsonb
iotdemo(> );
CREATE TABLE
iotdemo=> 
iotdemo=> 
iotdemo=> \dt
         List of relations
 Schema |  Name   | Type  |  Owner  
--------+---------+-------+---------
 public | iotdata | table | *****
(1 row)

Azure Functions を設定する

Azure Functions の理解

他のクラウドで Functions を触ったことがあればそれほど違和感はない3 。初めてであれば、いきなり EventHub をターゲットにせずに、ファンクション自体の理解のために、最初に クイック スタート:HTTP 要求に応答する関数を Azure で作成する をやっておくと流れが掴めるのでおすすめ

開発環境

以下から選ぶ感じ4

前提条件

ファンクションを書く前に、このあたりを事前に準備しておく

  • Azure Functions Core Tools バージョン 2.7.1846 以降の 2.x バージョン
  • Azure CLI バージョン 2.4 以降5
  • node はアクティブ LTS およびメンテナンス LTS バージョン (8.11.1 および 10.14.1 を推奨)6

なんか、準備だけで、だいぶ、面倒くさくなってきてるが.... 進める

ローカルで書く場合のプロジェクト構成

ファンクションは FunctionApp > Function という階層になっているようだ

  • 開発言語や実行環境(Win/Linux)などは FunctionApp 単位で定義
  • FunctionApp に複数の Functions が存在可能
  • クラウドへのデプロイ(publish)は FunctionApp 単位
  • node の場合は node_modules ごとデプロイすることになる7
  ├── FunctionApp
  │   ├── EventHubTrigger # ファンクション#1
  │   │   ├── function.json
  │   │   └── index.js
  │   ├── HTTPTrigger # ファンクション#2
  │   │   ├── function.json
  │   │   └── index.js
  :
  :
  ├── host.json
  ├── local.settings.json
  ├── node_modules
  │   ├── buffer-writer
  │   │   ├── LICENSE
  :
  :

トリガーとバインドの概念

ここで、Azure Functions でのトリガーとバインドの概念 を読む。今回は、トリガーとしての設定になる。EventHub にデータがきたらそれをトリガーにファンクションが動いて、DBに書き込む

ファンクションの設定

準備が整ったので、いよいよはじめるよ

ストレージ作成

まず、作ったファンクションをおく場所であるストレージを作る

$ az storage account create --name ***** --location japaneast --resource-group *****-poc --sku Standard_LRS
$ az storage account show --name ***** -o table
AccessTier    CreationTime                      EnableHttpsTrafficOnly    Kind       Location    Name     PrimaryLocation    ProvisioningState    ResourceGroup    StatusOfPrimary
------------  --------------------------------  ------------------------  ---------  ----------  -------  -----------------  -------------------  ---------------  -----------------
Hot           2020-07-23T09:07:56.927438+00:00  True                      StorageV2  japaneast   *****  japaneast          Succeeded            *****-poc      available

FunctionApp をつくる

クラウド側に FunctionApp をつくる

$ az functionapp create --resource-group *****-poc --consumption-plan-location japaneast --runtime node --runtime-version 10 --functions-version 2 --name *****-func-e2p --storage-account *****
$ az functionapp list -o table
Name                 Location    State    ResourceGroup        DefaultHostName                        AppServicePlan
-------------------  ----------  -------  -------------------  -------------------------------------  --------------------------
*****-func-e2p     Japan East  Running  *****-poc          *****-func-e2p.azurewebsites.net     JapanEastPlan
$ az functionapp config appsettings list --name *****-func-e2p --resource-group *****-poc
$ func azure functionapp list-functions *****-func-e2p --show-keys

ローカル側で FunctionApp を初期化

func init e2p --javascript
├── e2p
│   ├── host.json
│   ├── local.settings.json
│   └── package.json

host.jsonversion3 はファンクションランタイムバージョンではない。単純にhost.json のスキーマのバージョンなので、2 のままで良い

ここで、e2pにおりて、ファンクションをつくる

今回は、Azure Event Hub trigger で作成

$ func new
Select a number for template:
1. Azure Blob Storage trigger
2. Azure Cosmos DB trigger
3. Durable Functions activity
4. Durable Functions HTTP starter
5. Durable Functions orchestrator
6. Azure Event Grid trigger
7. Azure Event Hub trigger
8. HTTP trigger
9. IoT Hub (Event Hub)
10. Azure Queue Storage trigger
11. SendGrid
12. Azure Service Bus Queue trigger
13. Azure Service Bus Topic trigger
14. SignalR negotiate HTTP trigger
15. Timer trigger
Choose option: 7
Azure Event Hub trigger
Function name: [EventHubTrigger] 
Writing /Users/*****/azure/e2p/EventHubTrigger/index.js
Writing /Users/*****/azure/e2p/EventHubTrigger/function.json
The function "EventHubTrigger" was created successfully from the "Azure Event Hub trigger" template.

function.jsonindex.js のテンプレが作成されるので、function.json のパラメータを 接続するEventHub に合わせて書き換える

function.json
{
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "eventHubMessages",
      "direction": "in",
      "eventHubName": "*****",
      "connection": "*****_RootManageSharedAccessKey_EVENTHUB",
      "cardinality": "many",
      "consumerGroup": "$Default"
    }
  ]
}

ここで、よくわからなかったのが、connection の値8。結論としては、これは Event Hubs 名前空間 > 共有アクセス ポリシー にある主キー の値のこと。実際には 主キー をそのまま書くのではなく、ファンクションの 構成 > アプリケーション設定 に環境変数のような感じで書いておくのがベストプラクティスのようだ(以下参照)9

image-20200724135710900.png

ファンクションを書く

やっと、index.js に中身を書く... 今回書いたのは超シンプルにこんな感じ

index.js
module.exports = async function (context, eventHubMessages) {

    var pg = require('pg');
    
    const config = {
      host: '*****.postgres.database.azure.com',
      user: '*****@*****',     
      password: '*****',
      database: 'iotdemo',
      port: 5432,
      ssl: true
    };
    
    var client = new pg.Client(config);

    eventHubMessages.forEach((message, index) => {
      context.log("Processed message: " + JSON.stringify(message));
      context.log("id: " + message.id)

      const sql = 'insert into iotdata(deviceid, data) values(' + message.id + ',' + '\'' + JSON.stringify(message) + '\');';
      console.log("SQL: " + sql);

      client.connect()
      client.query(sql, (err, res) => {
        if (err) {
          console.error(err.stack)
        } else {
          console.log(res)
          console.log('insert completed successfully!');
        }
        client.end()
      })

    });

   context.log("eventHubMessages: " +  JSON.stringify(eventHubMessages));

};

pg パッケージをインストールする

$ npm install pg

とやっておいて、node_modules ができるのを確認する。このフォルダも一緒にクラウドにあげることになる

クラウドにアップロードする

publishhost.json のあるところで叩く10

$ func azure functionapp publish *****-func-e2p
$ func azure functionapp publish *****-func-e2p
You're trying to use v3 tooling to publish to a non-v3 function app (FUNCTIONS_EXTENSION_VERSION is set to ~2).
You can pass --force to force update the app to v3, or downgrade to v1 or v2 tooling for publishing.
*****:e2p *****$ func azure functionapp publish *****-func-e2p --force
Getting site publishing info...
Creating archive for current directory...
Uploading 1.21 KB [###############################################################################]
Upload completed successfully.
Deployment completed successfully.
Syncing triggers...
Functions in *****-func-e2p:
    EventHubTrigger - [eventHubTrigger]

これで、ひとまず動くはず。

確認する

ポータルの App Service > 関数 > モニタ > ログ でログが確認できる

2020-07-25T05:06:11Z   [Information]   Executing 'Functions.EventHubTrigger' (Reason='', Id=e1066a47-60af-4d8b-a9df-8d1c5a31a76e)
2020-07-25T05:06:11Z   [Information]   Processed message: {"id":"11253","mid":"M_180331","name":"60Min_W-PDP-I4B-A1 出力回路23 電流 計測","time":"2020/05/05 08:00:00","value":"0","unit":"A","EventProcessedUtcTime":"2020-07-25T05:06:10.8586288Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-07-25T05:06:10.777Z","IoTHub":{"MessageId":null,"CorrelationId":null,"ConnectionDeviceId":"iot-gw-tk10","ConnectionDeviceGenerationId":"637294479848408560","EnqueuedTime":"2020-07-25T05:06:10"}}
2020-07-25T05:06:11Z   [Information]   id: 11253
2020-07-25T05:06:11Z   [Information]   eventHubMessages: [{"id":"11253","mid":"M_180331","name":"60Min_W-PDP-I4B-A1 出力回路23 電流 計測","time":"2020/05/05 08:00:00","value":"0","unit":"A","EventProcessedUtcTime":"2020-07-25T05:06:10.8586288Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-07-25T05:06:10.777Z","IoTHub":{"MessageId":null,"CorrelationId":null,"ConnectionDeviceId":"iot-gw-tk10","ConnectionDeviceGenerationId":"637294479848408560","EnqueuedTime":"2020-07-25T05:06:10"}}]
2020-07-25T05:06:11Z   [Information]   Executed 'Functions.EventHubTrigger' (Succeeded, Id=e1066a47-60af-4d8b-a9df-8d1c5a31a76e)

あとは、PostgreSQL に SQL なげてみればデータ入っているはず

今後

  • Mosquitto のところを、IoT Edge にして、コンテナ化して遊ぶ
  • Streaming Analytics でいろいろアノマリーを検知して遊ぶ
  • PowerBI で可視化して遊ぶ
  • センサの数を増やし、データをよりリアルタイムにして遊ぶ

参考にしたサイト

  1. こんなリッチな構成が本当に必要なのかと思わなくはない

  2. IoT Hub > Streaming Analytics > EventHub はポータル上でマウスでコチコチやれば比較的簡単につながりますのでやってみてください

  3. Firebase のファンクションの方が使いやすいな... という違和感を除く

  4. ただし、今回は言語を JavaScript にしてるので、PostgreSQL と繋げるためにパッケージを入れる必要があることから、ローカルでやることになるのであった

  5. さあ、brew upgrade azure-cli をどうそ

  6. この記述は古いと思われる。Function のバージョンが 3.x であれば node 12 が使えるはず 。私はここで、homebrew で入れた最新の14 を消して、nvm を入れ、12 を入れる羽目になる。Happy Yac Shaving?!

  7. 今回 PostgreSQLと接続するために pg パッケージを使うので これは必須

  8. Webポータルからファンクションをつくると、GUIで選択できるのでそれほど困らないが、ローカルでファンクション作ると何を書けばいいのかわかりにくい。コンポーネントごとに毎回キーを指定しないといけないのは、Azure の ダウンサイドだな... がんばれ Azure IAM...

  9. CLIでは az functionapp config appsettings list -g *****-poc --name *****-func-e2p みたいな感じで確認・設定できる

  10. さもなくば、Unable to find project root. Expecting to find one of host.json in project root. と怒られる

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?