Help us understand the problem. What is going on with this article?

Azure Managed ServiceとFIWAREを連携させる

はじめに

IoTやSmartCityというコンテキストで、FIWAREという言葉を聞くようになりました。FIWAREはもともと欧州を中心に開発保守されているコンポーネント群ですが、Let's FIWARE等の日本語技術情報も増えてきたこともあり1、日本でも少しずつ注目されて始めているようです。

実際、FIWAREの公式チュートリアルLet's FIWAREの日本語訳)を参照すれば、docker-composeを用いてFIWAREの主要コンポーネントの動作を簡単に試してみることができます。しかしエンジニアの視点からすると、性能や可用性、運用面からdocker-composeのまま実システムとして動かすのは無理があるため、実運用に向けた次の一歩が気になるところです。特にIoTプラットフォームとして捉えた場合、デバイスから送信されるデータがバーストした場合、あるいは何らかのトラブルでFIWARE側が停止した場合でも耐えられるように、デバイスとの接続部分にはクラウドのManaged Message Queueを活用したくなります。

ということで今回は、AzureのManaged Serviceをできる限り活用し、なるべく運用しやすいFIWARE IoTプラットフォームを作ってみたいと思います。

今回のアーキテクチャ

今回はFIWAREをIoTプラットフォームとして活用する際に必要となる最低限のコンポーネントとして、FIWARE OrionFIWARE IoTAgent JSON、及びFIWARE CygnusAzure AKS上に構築します。またFIWARE OrionやFIWARE IoTAgent JSONが依存するMongoDBクラスタもAKS上に構築します。

加えて、IoTデバイスとメッセージを送受信するManaged QueueとしてAzure ServiceBusを、及びIoTデバイスから収集したデータを蓄積するDBとしてAzure Database for PostgreSQLを活用します。

なお図中にある amqp converter ですが、これはFIWAREとAzure ServiceBusを中継する役割を担っています。

azure_fiware.png

なお本当は、MongoDBクラスタを自力で運用するのではなく、Azure CosmosDBのMongoDB用APIでどうにかするつもりでした。しかしFIWARE OrionがMongoDBの細かい挙動に依存する実装になっていたため、多少のパッチ程度ではAzure CosmosDB上に実装されたMongoDB互換APIで動作させることができませんでした。残念。

またMongoDBのManaged Serviceとして、MongoDB Atlasを使う手もあります。しかし現時点でのFIWARE IoTAgent JSON(1.14.0)はMongoDBとTLS通信する設定ができない2ため、やはりこちらも動作しません。ただしIoTAgentにTLSオプション等を設定可能にするPull Requestはすでにmasterにmergeされているため、次回のリリース以降ではMongoDB Atlasを使えるようになるはずです。

検証環境

種類 コンポーネント バージョン
クライアント macOS Mojave 10.14.6
azure-cli 2.7.0
kubectl 1.18.3
helm 3.2.1
docker 19.03.8
envsubst 0.20.2
jq 1.6
Azure Azure AKS 1.16.9
Azure ServiceBus
Azure Database for PostgreSQL 11.6
FIWARE FIWARE Orion 2.4.0
FIWARE IoTAgent JSON 1.14.0
FIWARE Cygnus 2.1.0
その他 MongoDB 4.2.7
amqp converter 0.1.0

環境構築

では早速、Azure Managed ServiceとFIWAREを連携させたIoTプラットフォームの環境構築をしていきましょう。これから説明するスクリプト類は、githubの nmatsui/qiita_azure_fiware に公開しています。自分で試す場合は、このリポジトリをcloneしていただけば手間が省けると思います。

環境変数の設定

まずは今回のIoTプラットフォームでAzureやFIWAREが必要とする変数を設定します。

variables/env.azure.tempvariables/env.azureにコピーして、Azureのテナント名(TENANT)やAzure Database for PostgreSQLのパスワード(PG_ADMIN_PASSWORD)等を修正してください。


Azure用の環境変数の設定
variables/env.azure.temp
TENANT="<<your tenant>>"
LOCATION="japaneast"
RESOURCE_GROUP="qiita_azure_fiware"

AKS="qafaks"
NODE_VM_SIZE="Standard_B2S"
NODE_COUNT="3"
AKS_VERSION="1.16.9"

SERVICEBUS="qafsbus"

PG="qafpg"
PG_ADMIN="postgres"
PG_ADMIN_PASSWORD="<<your password>>"
PG_SKU="GP_Gen5_2"
PG_VERSION="11"
PG_DATABASE="postgres"

同様に、variables/env.fiware.tempvariables/env.fiwareにコピーします。テンプレートのENTITIESには、今回のIoTプラットフォームに接続するデバイスを定義します。


FIWARE用の環境変数の設定
variables/env.fiware.temp
FIWARE_SERVICE="demo"
FIWARE_SERVICEPATH="/demo/qiita"
ENTITIES="[{\"type\":\"device\",\"id\":\"device01\"}]"

Azure環境の構築

ではCLI経由でAzure AKS、Azure ServiceBus及びAzure Database for PostgreSQLを起動します。

CLIのログインとResourceGroupの作成

まずはazure-cliでログインして、ResourceGroupを作成します。


ログインとResourceGroupの作成
installation/azure/01_login.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure

## login azure
az login --tenant ${TENANT}
installation/azure/02_create_resourcegroup.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure

## create resource group
az group create --name ${RESOURCE_GROUP} --location ${LOCATION}

Azure AKSの起動

最初にAzure AKSを起動します。5~10分程度時間がかかります。

Azure AKSの起動
installation/azure/03_start_aks.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure

## start aks
az aks create --resource-group ${RESOURCE_GROUP} --name ${AKS} --node-count ${NODE_COUNT} --node-vm-size ${NODE_VM_SIZE} --generate-ssh-keys --kubernetes-version ${AKS_VERSION}
sleep 30
az aks get-credentials --resource-group ${RESOURCE_GROUP} --name ${AKS} --overwrite-existing

起動後に、kubectlが正しくAKSに接続できていることを確認してください。

kubectlの接続確認
$ kubectl get nodes
NAME                                STATUS   ROLES   AGE     VERSION
aks-nodepool1-40227215-vmss000000   Ready    agent   9m50s   v1.16.9
aks-nodepool1-40227215-vmss000001   Ready    agent   9m22s   v1.16.9
aks-nodepool1-40227215-vmss000002   Ready    agent   9m47s   v1.16.9

Azure ServiceBusの起動

次にAzure ServiceBusを起動し、FIWARE側から接続するための「すべてのQueueを読み書きできるアクセスポリシー」を定義しておきます。また各IoTデバイスに対応した送信用Queueと受信用Queueも生成し、それぞれ送信用アクセスポリシーと読み取り用アクセスポリシーを定義しておきます。

Azure ServiceBusの起動
installation/azure/04_start_servicebus.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure
source $(cd $(dirname $0);pwd)/../../variables/env.fiware

## start service bus
az servicebus namespace create --resource-group ${RESOURCE_GROUP} --name ${SERVICEBUS} --sku Standard
az servicebus namespace authorization-rule create --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --name service --rights Send Listen
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
  type=$(echo ${ENTITIES} | jq -r ".[$i].type")
  id=$(echo ${ENTITIES} | jq -r ".[$i].id")
  az servicebus queue create --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --name "${type}.${id}.up"
  az servicebus queue authorization-rule create --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name "${type}.${id}.up" --name device --rights Send
  az servicebus queue create --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --name "${type}.${id}.down"
  az servicebus queue authorization-rule create --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name "${type}.${id}.down" --name device --rights Listen
done

Azure Database for PostgreSQLの起動

最後にAzure Database for PostgreSQLを起動し、AKSのノードが所属するVNETからのアクセスのみ許可するようにService Endpointとアクセスルールを設定します3

Azure Database for PostgreSQLの起動
installation/azure/05_start_postgres.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure

## start postgres
az postgres server create --resource-group ${RESOURCE_GROUP} --name ${PG}  --location ${LOCATION} --admin-user ${PG_ADMIN} --admin-password ${PG_ADMIN_PASSWORD} --sku-name ${PG_SKU} --version ${PG_VERSION}
VNET=$(az network vnet list --resource-group MC_${RESOURCE_GROUP}_${AKS}_${LOCATION} --output json)
VNET_NAME=$(echo ${VNET} | jq -r '.[0].name')
SUBNET_NAME=$(echo ${VNET} | jq -r '.[0].subnets[0].name')
SUBNET_ID=$(echo ${VNET} | jq -r '.[0].subnets[0].id')
az network vnet subnet update --resource-group MC_${RESOURCE_GROUP}_${AKS}_${LOCATION} --name ${SUBNET_NAME} --vnet-name ${VNET_NAME} --service-endpoints Microsoft.SQL
sleep 30
az postgres server vnet-rule create --resource-group ${RESOURCE_GROUP} --server-name ${PG} --name ${PG}_rule --subnet ${SUBNET_ID}

なお今回はPostgreSQLにしましたが、FIWAREがIoTデータをストアできるDatabaseは他にもMySQLやMongoDBなど豊富にあります。詳細はFIWARE Cygnusのドキュメントを確認してください。

FIWARE環境の構築

ここまでで必要なAzure Managed Serviceが準備できましたので、ここからAzure AKS上にMongoDBとFIWAREのコンポーネントを立ち上げに入ります。

MongoDBの起動

まずHelmを用いてMongoDBクラスタを起動します。5~10分ぐらい時間がかかります。

MongoDBの起動
installation/fiware/01_start_mongodb.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
CWD=$(cd $(dirname $0);pwd)

## start mongodb
helm repo add stable https://kubernetes-charts.storage.googleapis.com
helm repo update
helm install stable/mongodb-replicaset --name-template mongodb -f ${CWD}/yaml/mongodb-replicaset-values-aks.yaml

MongoDBのPodがすべて起動した後、MongoDBクラスタの状態を確認してください。正しく起動していれば、PRIMARYのコンテナが一つSECONDARYのコンテナが二つ立ち上がっているはずです。

MongoDBクラスタの状態確認
$ kubectl get services -l release=mongodb -l app=mongodb-replicaset
NAME             TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)     AGE
mongodb          ClusterIP   None         <none>        27017/TCP   10m
mongodb-client   ClusterIP   None         <none>        27017/TCP   10m
$ kubectl get pods -l release=mongodb -l app=mongodb-replicaset
NAME        READY   STATUS    RESTARTS   AGE
mongodb-0   1/1     Running   0          9m52s
mongodb-1   1/1     Running   0          6m50s
mongodb-2   1/1     Running   0          4m6s
$ kubectl exec mongodb-0 -c mongodb-replicaset -- mongo --eval 'printjson(rs.status().members.map(function(e) {return {name: e.name, stateStr:e.stateStr};}))'
MongoDB shell version v4.2.3
connecting to: mongodb://127.0.0.1:27017/?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("3fc54d94-485a-4e2c-8361-e54b3890e3e5") }
MongoDB server version: 4.2.3
[
    {
        "name" : "mongodb-0.mongodb.default.svc.cluster.local:27017",
        "stateStr" : "PRIMARY"
    },
    {
        "name" : "mongodb-1.mongodb.default.svc.cluster.local:27017",
        "stateStr" : "SECONDARY"
    },
    {
        "name" : "mongodb-2.mongodb.default.svc.cluster.local:27017",
        "stateStr" : "SECONDARY"
    }
]

FIWARE Orionの起動

次にFIWARE Orionを立ち上げます。

FIWARE Orionの起動
installation/fiware/02_start_orion.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
CWD=$(cd $(dirname $0);pwd)

## start orion
kubectl apply -f ${CWD}/yaml/orion-service.yaml
kubectl apply -f ${CWD}/yaml/orion-deployment.yaml

FIWARE OrionのPodが二つ立ち上がっていることを確認してください。

FIWARE OrionのPod確認
$ kubectl get services -l app=orion
NAME    TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)    AGE
orion   ClusterIP   10.0.229.82   <none>        1026/TCP   2m51s
$ kubectl get pods -l app=orion
NAME                     READY   STATUS    RESTARTS   AGE
orion-86cdc7c48c-44695   1/1     Running   0          3m17s
orion-86cdc7c48c-9dx74   1/1     Running   0          3m17s

FIWARE IoTAgent JSONの起動

次にFIWARE IoTAgent JSONを立ち上げます。ただし現時点(1.14.0)のFIWARE IoTAgent JSONは、Azure ServiceBusに直接接続することができません4
そのため、AMQP1.0とHTTPのプロトコル変換を行うコンポーネント(RoboticBase/amqp10-converter)をFIWARE IoTAgent JSONのSideCarとして立ち上げ、Azure ServiceBusとFIWARE IoTAgent JSONの仲立ちをさせます。

FIWARE IoTAgent JSONの起動
installation/fiware/03_start_iotagent-json.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
CWD=$(cd $(dirname $0);pwd)

## start iotagent json
SERVICE_KEY=$(az servicebus namespace authorization-rule keys list --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --name service | jq -r '.primaryKey')
kubectl create secret generic servicebus-credentials --from-literal=primaryKey=$(echo ${SERVICE_KEY})
kubectl apply -f ${CWD}/yaml/iotagent-json-service.yaml
SERVICEBUS=${SERVICEBUS} ENTITIES=${ENTITIES} FIWARE_SERVICE=${FIWARE_SERVICE} FIWARE_SERVICEPATH=${FIWARE_SERVICEPATH} envsubst < ${CWD}/yaml/iotagent-json-deployment.yaml | kubectl apply -f -

FIWARE IoTAgent JSONのPodが二つ立ち上がり、それぞれ 2/2 Running であることを確認してください。

FIWARE IoTAgent JSONのPod確認
$ kubectl get services -l app=iotagent-json
NAME            TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)             AGE
iotagent-json   ClusterIP   10.0.162.149   <none>        4041/TCP,7896/TCP   76s
$ kubectl get pods -l app=iotagent-json
NAME                             READY   STATUS    RESTARTS   AGE
iotagent-json-576dd9f4f9-jxcbq   2/2     Running   0          93s
iotagent-json-576dd9f4f9-zs56f   2/2     Running   0          93s

FIWARE Cygnusの起動

最後にFIWARE Cygnusを立ち上げます。

FIWARE Cygnusの起動
installation/fiware/04_start_cygnus-postgresql.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
CWD=$(cd $(dirname $0);pwd)

## start cygnus
kubectl create secret generic postgresql-credential --from-literal=password=${PG_ADMIN_PASSWORD}
PG_HOST=$(az postgres server show --resource-group ${RESOURCE_GROUP} --name ${PG} | jq -r '.fullyQualifiedDomainName')
kubectl apply -f ${CWD}/yaml/cygnus-postgresql-service.yaml
PG_HOST=${PG_HOST} PG_ADMIN=${PG_ADMIN} PG=${PG} PG_DATABASE=${PG_DATABASE} envsubst < ${CWD}/yaml/cygnus-postgresql-deployment.yaml | kubectl apply -f -

FIWARE CygnusのPodが二つ立ち上がっていることを確認してください。

FIWARE CygnusのPod確認
$ kubectl get services -l app=cygnus-postgresql
NAME                TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)             AGE
cygnus-postgresql   ClusterIP   10.0.198.80   <none>        5055/TCP,5080/TCP   2m3s
$ kubectl get pods -l app=cygnus-postgresql
NAME                                 READY   STATUS    RESTARTS   AGE
cygnus-postgresql-5494f4c7d4-c65k2   1/1     Running   0          117s
cygnus-postgresql-5494f4c7d4-lqblk   1/1     Running   0          117s

Azure Database for PostgreSQLにテーブルを作成

今回のデモでは、FIWARE Cygnusを "columnモード" (CYGNUS_POSTGRESQL_ATTR_PERSISTENCE="column")で動作させています。このモードはIoTデバイスから送信されるデータがそのまま一つの行としてInsertされるのでわかりやすいのですが、必要なテーブルを自動作成してくれません。そのため最初に自分でCREATE SCHEMA及びCREATE TABLEを実行する必要があります。

schemaとtableの作成
installation/fiware/05_create_table.sh
#!/bin/bash

source $(cd $(dirname $0);pwd)/../../variables/env.azure
source $(cd $(dirname $0);pwd)/../../variables/env.fiware

PG_HOST=$(az postgres server show --resource-group ${RESOURCE_GROUP} --name ${PG} | jq -r '.fullyQualifiedDomainName')

TBL="dummy"
if [[ ${FIWARE_SERVICEPATH} =~ ^/?(.+)$ ]]; then
  TBL=${BASH_REMATCH[1]////_}
fi

## sql
SQL=$(cat << __EOS__
CREATE SCHEMA IF NOT EXISTS ${FIWARE_SERVICE};
DROP TABLE IF EXISTS ${FIWARE_SERVICE}.${TBL};
CREATE TABLE IF NOT EXISTS ${FIWARE_SERVICE}.${TBL} (
  id SERIAL NOT NULL,
  recvTime TIMESTAMP WITH TIME ZONE NOT NULL,
  fiwareServicePath TEXT NOT NULL,
  entityId TEXT NOT NULL,
  entityType TEXT NOT NULL,
  temperature NUMERIC,
  temperature_md JSON,
  humidity NUMERIC,
  humidity_md JSON,
  open_info TEXT,
  open_info_md JSON,
  open_status TEXT,
  open_status_md JSON,
  PRIMARY KEY (id)
);
DROP INDEX IF EXISTS ${TBL}_entityId_idx;
CREATE INDEX IF NOT EXISTS ${TBL}_entityId_idx ON ${FIWARE_SERVICE}.${TBL} (entityId);
DROP INDEX IF EXISTS ${TBL}_recvTime_idx;
CREATE INDEX IF NOT EXISTS ${TBL}_recvTime_idx ON ${FIWARE_SERVICE}.${TBL} (recvTime);
__EOS__
)
echo "SQL=${SQL}"

## create schema and table
kubectl run --rm -i --restart=OnFailure --image postgres:${PG_VERSION} psql-client -- /bin/bash -c \
  "PGPASSWORD=${PG_ADMIN_PASSWORD} psql --host=${PG_HOST} --port=5432 --dbname=${PG_DATABASE} --username=${PG_ADMIN}@${PG} --command '${SQL}'"

FIWAREの設定

IoTプラットフォームの環境が構築できましたので、FIWAREにIoTデバイスの設定を投入します。各FIWAREコンポーネントの詳細は、それぞれのドキュメントをご参照ください。

Service Groupの登録

まずFIWARE IoTAgent JSONへService Groupを登録します。

Service Groupの登録
settings/fiware/01_register_servicegroup.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.fiware

## register service group
cnt=0
len=$(echo ${ENTITIES} | jq '[.[].type] | unique | length')
for i in $(seq 0 $((${len} - 1))); do
  type=$(echo ${ENTITIES} | jq -r "[.[].type] | unique | .[$i]")
  kubectl run -i --rm --restart=OnFailure --image curlimages/curl curl$((++cnt)) -- \
    -i "http://iotagent-json:4041/iot/services/" \
    -H "Fiware-Service: ${FIWARE_SERVICE}" \
    -H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
    -H "Content-Type: application/json" \
    -X POST -d @- <<__EOS__
{
  "services": [
    {
      "apikey": "${type}",
      "cbroker": "http://orion:1026",
      "resource": "/iot/json",
      "entity_type": "${type}"
    }
  ]
}
__EOS__
done

kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
  -sS "http://iotagent-json:4041/iot/services/" \
  -H "Fiware-Service: ${FIWARE_SERVICE}" \
  -H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .

登録に成功すると、次のようなFIWARE IoTAgent JSONに登録されたService Group情報が表示されます。

Service Groupの登録情報
{
  "count": 1,
  "services": [
    {
      "commands": [],
      "lazy": [],
      "attributes": [],
      "_id": "5eda2776967c9e00061354d6",
      "resource": "/iot/json",
      "apikey": "device",
      "service": "demo",
      "subservice": "/demo/qiita",
      "__v": 0,
      "static_attributes": [],
      "internal_attributes": [],
      "entity_type": "device"
    }
  ]
}

Deviceの登録

次にFIWARE IoTAgent JSONへService Groupを登録します。

Deviceの登録
settings/fiware/02_register_devices.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.fiware

## register devices
cnt=0
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
  type=$(echo ${ENTITIES} | jq -r ".[$i].type")
  id=$(echo ${ENTITIES} | jq -r ".[$i].id")

  kubectl run -i --rm --restart=OnFailure --image curlimages/curl curl$((++cnt)) -- \
    -i "http://iotagent-json:4041/iot/devices/" \
    -H "Fiware-Service: ${FIWARE_SERVICE}" \
    -H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
    -H "Content-Type: application/json" \
    -X POST -d @- <<__EOS__
{
  "devices": [
    {
      "device_id": "${id}",
      "entity_name": "${id}",
      "entity_type": "${type}",
      "timezone": "Asia/Tokyo",
      "protocol": "json",
      "attributes": [
        {
          "name": "temperature",
          "type": "number"
        },
        {
          "name": "humidity",
          "type": "number"
        }
      ],
      "commands": [
        {
          "name": "open",
          "type": "command"
        }
      ],
      "transport": "HTTP",
      "endpoint": "http://localhost:3000/amqp10/cmd/${type}/${id}"
    }
  ]
}
__EOS__
done

kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
  -sS "http://iotagent-json:4041/iot/devices/" \
  -H "Fiware-Service: ${FIWARE_SERVICE}" \
  -H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .

kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
  -sS "http://orion:1026/v2/entities/" \
  -H "Fiware-Service: ${FIWARE_SERVICE}" \
  -H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .

登録に成功すると、次のようなFIWARE IoTAgent JSONに登録されたDevice情報と、FIWARE IoTAgent JSONによって自動作成されたFIWARE OrionのEntity情報が表示されます。

Deviceの登録情報
{
  "count": 1,
  "devices": [
    {
      "device_id": "device01",
      "service": "demo",
      "service_path": "/demo/qiita",
      "entity_name": "device01",
      "entity_type": "device",
      "endpoint": "http://localhost:3000/amqp10/cmd/device/device01",
      "transport": "HTTP",
      "attributes": [
        {
          "object_id": "temperature",
          "name": "temperature",
          "type": "number"
        },
        {
          "object_id": "humidity",
          "name": "humidity",
          "type": "number"
        }
      ],
      "lazy": [],
      "commands": [
        {
          "object_id": "open",
          "name": "open",
          "type": "command"
        }
      ],
      "static_attributes": [],
      "protocol": "json"
    }
  ]
}

Entityの登録情報
[
  {
    "id": "device01",
    "type": "device",
    "TimeInstant": {
      "type": "DateTime",
      "value": "2020-06-05T22:37:38.00Z",
      "metadata": {}
    },
    "humidity": {
      "type": "number",
      "value": " ",
      "metadata": {}
    },
    "open_info": {
      "type": "commandResult",
      "value": " ",
      "metadata": {}
    },
    "open_status": {
      "type": "commandStatus",
      "value": "UNKNOWN",
      "metadata": {}
    },
    "temperature": {
      "type": "number",
      "value": " ",
      "metadata": {}
    },
    "open": {
      "type": "command",
      "value": "",
      "metadata": {}
    }
  }
]

Subscriptionの登録

最後に、FIWARE Orionのデータが更新された(=Deviceから新しいデータが送信されてきた)時に、FIWARE Cygnus経由でデータをPostgreSQLへ保存する設定を登録します。

Subscriptionの登録
settings/fiware/03_register_subscription.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.fiware

## register subscription
cnt=0
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
  type=$(echo ${ENTITIES} | jq -r ".[$i].type")
  id=$(echo ${ENTITIES} | jq -r ".[$i].id")

  kubectl run -i --rm --restart=OnFailure --image curlimages/curl curl$((++cnt)) -- \
    -i "http://orion:1026/v2/subscriptions/" \
    -H "Fiware-Service: ${FIWARE_SERVICE}" \
    -H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
    -H "Content-Type: application/json" \
    -H "Content-Type: application/json" \
    -X POST -d @- <<__EOS__
{
  "subject": {
    "entities": [{
      "id": "${id}",
      "type": "${type}"
    }],
    "condition": {
      "attrs": ["temperature", "humidity", "open_status", "open_info"]
    }
  },
  "notification": {
    "http": {
      "url": "http://cygnus-postgresql:5055/notify"
    },
    "attrs": ["temperature", "humidity", "open_status", "open_info"],
    "attrsFormat": "legacy"
  }
}
__EOS__
  echo ""
done

kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
  -sS "http://orion:1026/v2/subscriptions/" \
  -H "Fiware-Service: ${FIWARE_SERVICE}" \
  -H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .

登録に成功すると、FIWARE Orionに登録されたSubscription情報が表示されます。

Subscriptionの登録情報
[
  {
    "id": "5eda3857742e072c41a8ff45",
    "status": "active",
    "subject": {
      "entities": [
        {
          "id": "device01",
          "type": "device"
        }
      ],
      "condition": {
        "attrs": [
          "temperature",
          "humidity",
          "open_status",
          "open_info"
        ]
      }
    },
    "notification": {
      "timesSent": 1,
      "lastNotification": "2020-06-05T23:19:35.00Z",
      "attrs": [
        "temperature",
        "humidity",
        "open_status",
        "open_info"
      ],
      "onlyChangedAttrs": false,
      "attrsFormat": "legacy",
      "http": {
        "url": "http://cygnus-postgresql:5055/notify"
      },
      "lastSuccess": "2020-06-05T23:19:35.00Z",
      "lastSuccessCode": 200
    }
  }
]

動作確認

IoTプラットフォーム環境の準備が整いましたので、AMQP1.0で通信できるダミーのIoTデバイスとFIWAREをAzure ServiceBus経由で接続してみましょう。

ダミーデバイスの準備

ダミーのIoTデバイス用のDockerコンテナを作成します。

ダミーIoTデバイス用Dockerコンテナの作成
execution/01_create_dummyclient.sh
#!/bin/bash

CWD=$(cd $(dirname $0);pwd)

## create docker container
docker build -t my/dummy_client:0.0.1 ${CWD}/dummy_client

コンテナが作成されたことを確認してください。

ダミーIoTデバイス用Dockerコンテナの確認
$ docker image ls | grep my/dummy_client
my/dummy_client                0.0.1               f8a2e9348ad1        2 minutes ago       89.9MB

ダミーIoTデバイスから温度と湿度をFIWAREへ送信

では、ダミーIoTデバイスからAzure ServiceBusのQueueへ温度と湿度のデータを送ってみましょう。IoTプラットフォーム環境が正しく構築できていれば、Queueに到着したメッセージをFIWAREが取得し、そのデータをPostgreSQLに蓄積します。

ダミーIoTデバイスからデータを送信

先程作成したDockerコンテナを用い、AMQP1.0で温度と湿度のデータをAzure ServcieBusへ送信します。

温度と湿度のデータ送信
execution/02_send_attributes.sh
#!/bin/bash

source $(cd $(dirname $0);pwd)/../variables/env.azure
source $(cd $(dirname $0);pwd)/../variables/env.fiware

## send attributes
HOST="${SERVICEBUS}.servicebus.windows.net"
PORT="5671"
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
  type=$(echo ${ENTITIES} | jq -r ".[$i].type")
  id=$(echo ${ENTITIES} | jq -r ".[$i].id")
  SEND_QUEUE="${type}.${id}.up"
  SENDER_USERNAME="device"
  SENDER_PASSWORD=$(az servicebus queue authorization-rule keys list --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name ${SEND_QUEUE} --name ${SENDER_USERNAME} | jq -r '.primaryKey')
  RECEIVE_QUEUE="${type}.${id}.down"
  RECEIVER_USERNAME="device"
  RECEIVER_PASSWORD=$(az servicebus queue authorization-rule keys list --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name ${RECEIVE_QUEUE} --name ${RECEIVER_USERNAME} | jq -r '.primaryKey')

  docker run -e AMQP_HOST=${HOST} -e AMQP_PORT=${PORT} \
    -e AMQP_SEND_QUEUE=${SEND_QUEUE} -e AMQP_SENDER_USERNAME=${SENDER_USERNAME} -e AMQP_SENDER_PASSWORD=${SENDER_PASSWORD} \
    -e AMQP_RECEIVE_QUEUE=${RECEIVE_QUEUE} -e AMQP_RECEIVER_USERNAME=${RECEIVER_USERNAME} -e AMQP_RECEIVER_PASSWORD=${RECEIVER_PASSWORD} \
    my/dummy_client:0.0.1 attrs
done

送信したデータが次のように表示されます。

sending msg:  {
  body: '{"attrs":{"temperature":26.857522721066257,"humidity":64.19745805586517}}'
}
[connection-1] await sendMessage -> Delivery id: 0, settled: true
sent attributes successfully

FIWAREの状態を確認

Azure ServiceBusへ送信したデータがFIWAREに反映されていることを確認してください。

FIWAREの状態確認
execution/03_show_orion.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.fiware

## show entities
cnt=0
kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
  -sS "http://orion:1026/v2/entities/" \
  -H "Fiware-Service: ${FIWARE_SERVICE}" \
  -H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .

[
  {
    "id": "device01",
    "type": "device",
    "TimeInstant": {
      "type": "DateTime",
      "value": "2020-06-06T01:05:49.00Z",
      "metadata": {}
    },
    "humidity": {
      "type": "number",
      "value": 64.197458056,
      "metadata": {
        "TimeInstant": {
          "type": "DateTime",
          "value": "2020-06-06T01:05:49.00Z"
        }
      }
    },
    "open_info": {
      "type": "commandResult",
      "value": " ",
      "metadata": {}
    },
    "open_status": {
      "type": "commandStatus",
      "value": "UNKNOWN",
      "metadata": {}
    },
    "temperature": {
      "type": "number",
      "value": 26.857522721,
      "metadata": {
        "TimeInstant": {
          "type": "DateTime",
          "value": "2020-06-06T01:05:49.00Z"
        }
      }
    },
    "open": {
      "type": "command",
      "value": "",
      "metadata": {}
    }
  }
]

PostgreSQLのテーブル確認

またAzure ServiceBusへ送信したデータがFIWAREを経由してPostgreSQLに蓄積されていることも確認できます。

PostgreSQLの状態確認
execution/04_show_postgres.sh
#!/bin/bash

source $(cd $(dirname $0);pwd)/../variables/env.azure
source $(cd $(dirname $0);pwd)/../variables/env.fiware

PG_HOST=$(az postgres server show --resource-group ${RESOURCE_GROUP} --name ${PG} | jq -r '.fullyQualifiedDomainName')

TBL="dummy"
if [[ ${FIWARE_SERVICEPATH} =~ ^/?(.+)$ ]]; then
  TBL=${BASH_REMATCH[1]////_}
fi

## sql
SQL=$(cat << __EOS__
SELECT * FROM ${FIWARE_SERVICE}.${TBL};
__EOS__
)
echo "SQL=${SQL}"

## show data
kubectl run --rm -i --restart=OnFailure --image postgres:${PG_VERSION} psql-client -- /bin/bash -c \
  "PGPASSWORD=${PG_ADMIN_PASSWORD} psql --host=${PG_HOST} --port=5432 --dbname=${PG_DATABASE} --username=${PG_ADMIN}@${PG} --command '${SQL}'"

SQL=SELECT * FROM demo.demo_qiita;
 id |          recvtime          | fiwareservicepath | entityid | entitytype | temperature  |                                temperature_md                                |   humidity   |                                 humidity_md                                  | open_info | open_info_md | open_status | open_status_md
----+----------------------------+-------------------+----------+------------+--------------+------------------------------------------------------------------------------+--------------+------------------------------------------------------------------------------+-----------+--------------+-------------+----------------
  1 | 2020-06-06 01:05:49.276+00 | /demo/qiita       | device01 | device     | 26.857522721 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | 64.197458056 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] |           | []           | UNKNOWN     | []
(1 row)

FIWAREからダミーIoTデバイスへコマンドを送信

それでは最後に、FIWAREからダミーIoTデバイスへ窓を開ける命令を送ってみましょう。

FIWAREからコマンドを送信

FIWAREのREST APIを用いて、デバイスに "open" コマンドを送ります。

"open"コマンドの送信
execution/05_send_cmd.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.fiware

## send command
cnt=0
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
  type=$(echo ${ENTITIES} | jq -r ".[$i].type")
  id=$(echo ${ENTITIES} | jq -r ".[$i].id")

  kubectl run -i --rm --restart=OnFailure --image curlimages/curl curl$((++cnt)) -- \
    -i "http://orion:1026/v2/entities/${id}/attrs?type=${type}" \
    -H "Fiware-Service: ${FIWARE_SERVICE}" \
    -H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
    -H "Content-Type: application/json" \
    -X PATCH -d @- <<__EOS__
{
  "open": {
    "value": "window1"
  }
}
__EOS__
done

kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
  -sS "http://orion:1026/v2/entities/" \
  -H "Fiware-Service: ${FIWARE_SERVICE}" \
  -H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .

ダミーIoTデバイスがまだコマンドを受信していたいため、FIWAREが管理しているコマンドの状態はPENDINGになっています。

[
  {
    "id": "device01",
    "type": "device",
    "TimeInstant": {
      "type": "DateTime",
      "value": "2020-06-06T01:08:28.00Z",
      "metadata": {}
    },
    "humidity": {
      "type": "number",
      "value": 64.197458056,
      "metadata": {
        "TimeInstant": {
          "type": "DateTime",
          "value": "2020-06-06T01:05:49.00Z"
        }
      }
    },
    "open_info": {
      "type": "commandResult",
      "value": " ",
      "metadata": {}
    },
    "open_status": {
      "type": "commandStatus",
      "value": "PENDING",
      "metadata": {
        "TimeInstant": {
          "type": "DateTime",
          "value": "2020-06-06T01:08:28.00Z"
        }
      }
    },
    "temperature": {
      "type": "number",
      "value": 26.857522721,
      "metadata": {
        "TimeInstant": {
          "type": "DateTime",
          "value": "2020-06-06T01:05:49.00Z"
        }
      }
    },
    "open": {
      "type": "command",
      "value": "",
      "metadata": {}
    }
  }
]

またdownstreamキューを確認すると、Active Messageが1つ溜まっていることがわかります。

"open"コマンドの送信
execution/06_show_queue.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.azure
source $(cd $(dirname $0);pwd)/../variables/env.fiware

## show service bus queue
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
  type=$(echo ${ENTITIES} | jq -r ".[$i].type")
  id=$(echo ${ENTITIES} | jq -r ".[$i].id")
  az servicebus queue show --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --name "${type}.${id}.down" | jq '{name: .name, count: .countDetails}'
done

{
  "name": "device.device01.down",
  "count": {
    "activeMessageCount": 1,
    "deadLetterMessageCount": 0,
    "scheduledMessageCount": 0,
    "transferDeadLetterMessageCount": 0,
    "transferMessageCount": 0
  }
}

ダミーIoTデバイスがコマンドを受信し、処理結果を送信

先程作成したDockerコンテナを用い、AMQP1.0でコマンドをAzure ServcieBusから受信し、処理結果をAzure ServcieBusへ送信します。

"open"コマンドの受信と処理結果の送信
execution/07_process_cmd.sh
#!/bin/bash

source $(cd $(dirname $0);pwd)/../variables/env.azure
source $(cd $(dirname $0);pwd)/../variables/env.fiware

## send attributes
HOST="${SERVICEBUS}.servicebus.windows.net"
PORT="5671"
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
  type=$(echo ${ENTITIES} | jq -r ".[$i].type")
  id=$(echo ${ENTITIES} | jq -r ".[$i].id")
  SEND_QUEUE="${type}.${id}.up"
  SENDER_USERNAME="device"
  SENDER_PASSWORD=$(az servicebus queue authorization-rule keys list --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name ${SEND_QUEUE} --name ${SENDER_USERNAME} | jq -r '.primaryKey')
  RECEIVE_QUEUE="${type}.${id}.down"
  RECEIVER_USERNAME="device"
  RECEIVER_PASSWORD=$(az servicebus queue authorization-rule keys list --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name ${RECEIVE_QUEUE} --name ${RECEIVER_USERNAME} | jq -r '.primaryKey')

  docker run -e AMQP_HOST=${HOST} -e AMQP_PORT=${PORT} \
    -e AMQP_SEND_QUEUE=${SEND_QUEUE} -e AMQP_SENDER_USERNAME=${SENDER_USERNAME} -e AMQP_SENDER_PASSWORD=${SENDER_PASSWORD} \
    -e AMQP_RECEIVE_QUEUE=${RECEIVE_QUEUE} -e AMQP_RECEIVER_USERNAME=${RECEIVER_USERNAME} -e AMQP_RECEIVER_PASSWORD=${RECEIVER_PASSWORD} \
    my/dummy_client:0.0.1 cmd
done

ダミーIoTデバイスが受信したコマンドを処理した結果が表示されます。

start consuming cmd
receiving msg { cmd: { open: 'window1' } }
sending msg:  {
  body: '{"cmdexe":{"open":"processed window1 at 2020-06-06T01:28:40.312Z"}}'
}
[connection-2] await sendMessage -> Delivery id: 0, settled: true

FIWAREの状態を確認

FIWAREが管理しているコマンドの状態がOKになり、ダミーIoTデバイスから送信された処理結果が反映されていることを確認してください。

FIWAREの状態確認
execution/03_show_orion.sh
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.fiware

## show entities
cnt=0
kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
  -sS "http://orion:1026/v2/entities/" \
  -H "Fiware-Service: ${FIWARE_SERVICE}" \
  -H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .

[
  {
    "id": "device01",
    "type": "device",
    "TimeInstant": {
      "type": "DateTime",
      "value": "2020-06-06T01:28:40.00Z",
      "metadata": {}
    },
    "humidity": {
      "type": "number",
      "value": 64.197458056,
      "metadata": {
        "TimeInstant": {
          "type": "DateTime",
          "value": "2020-06-06T01:05:49.00Z"
        }
      }
    },
    "open_info": {
      "type": "commandResult",
      "value": "processed window1 at 2020-06-06T01:28:40.312Z",
      "metadata": {
        "TimeInstant": {
          "type": "DateTime",
          "value": "2020-06-06T01:28:40.00Z"
        }
      }
    },
    "open_status": {
      "type": "commandStatus",
      "value": "OK",
      "metadata": {
        "TimeInstant": {
          "type": "DateTime",
          "value": "2020-06-06T01:28:40.00Z"
        }
      }
    },
    "temperature": {
      "type": "number",
      "value": 26.857522721,
      "metadata": {
        "TimeInstant": {
          "type": "DateTime",
          "value": "2020-06-06T01:05:49.00Z"
        }
      }
    },
    "open": {
      "type": "command",
      "value": "",
      "metadata": {}
    }
  }
]

PostgreSQLのテーブル確認

またコマンドの処理状況がPostgreSQLに蓄積されていることも確認できます。

PostgreSQLの状態確認
execution/04_show_postgres.sh
#!/bin/bash

source $(cd $(dirname $0);pwd)/../variables/env.azure
source $(cd $(dirname $0);pwd)/../variables/env.fiware

PG_HOST=$(az postgres server show --resource-group ${RESOURCE_GROUP} --name ${PG} | jq -r '.fullyQualifiedDomainName')

TBL="dummy"
if [[ ${FIWARE_SERVICEPATH} =~ ^/?(.+)$ ]]; then
  TBL=${BASH_REMATCH[1]////_}
fi

## sql
SQL=$(cat << __EOS__
SELECT * FROM ${FIWARE_SERVICE}.${TBL};
__EOS__
)
echo "SQL=${SQL}"

## show data
kubectl run --rm -i --restart=OnFailure --image postgres:${PG_VERSION} psql-client -- /bin/bash -c \
  "PGPASSWORD=${PG_ADMIN_PASSWORD} psql --host=${PG_HOST} --port=5432 --dbname=${PG_DATABASE} --username=${PG_ADMIN}@${PG} --command '${SQL}'"

SQL=SELECT * FROM demo.demo_qiita;
 id |          recvtime          | fiwareservicepath | entityid | entitytype | temperature  |                                temperature_md                                |   humidity   |                                 humidity_md                                  |                   open_info                   |                                 open_info_md                                 | open_status |                                open_status_md
----+----------------------------+-------------------+----------+------------+--------------+------------------------------------------------------------------------------+--------------+------------------------------------------------------------------------------+-----------------------------------------------+------------------------------------------------------------------------------+-------------+------------------------------------------------------------------------------
  1 | 2020-06-06 01:05:49.276+00 | /demo/qiita       | device01 | device     | 26.857522721 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | 64.197458056 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] |                                               | []                                                                           | UNKNOWN     | []
  2 | 2020-06-06 01:08:28.534+00 | /demo/qiita       | device01 | device     | 26.857522721 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | 64.197458056 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] |                                               | []                                                                           | PENDING     | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:08:28.00Z"}]
  3 | 2020-06-06 01:28:40.559+00 | /demo/qiita       | device01 | device     | 26.857522721 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | 64.197458056 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | processed window1 at 2020-06-06T01:28:40.312Z | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:28:40.00Z"}] | OK          | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:28:40.00Z"}]
(3 rows)

最後に

ということで、Azure Managed ServiceとFIWAREを活用したIoTプラットフォームのデモを動かしてみました。今回はAzureで動かしてみましたが、AWS AmazonMQ、AWS RDS及びAWS EKSでも同様のIoTプラットフォームは実装できると思います(たぶん)。もしうまく動作させることができた方がいらっしゃったら、ぜひ教えて下さい。


  1. 2019年に開催した勉強会の資料: FIWARE勉強会 20190913 

  2. FIWARE Orionは現時点(2.4.0)でTLS接続オプションを設定できます 

  3. ServiceEndpointを利用するため、Azure Database for PostgreSQLでBasic SKUを使うことはできません: Azure Portal を使用して Azure Database for PostgreSQL - Single Server で VNet サービス エンドポイントと VNet ルールを作成および管理する 

  4. FIWARE IoTAgent JSONは、HTTP、MQTT3.1.1及びAMQP 0.9.1のプロトコルをサポートしていますが、Azure ServcieBusが提供するAMQP 1.0には対応していないためです 

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away