はじめに
IoTやSmartCityというコンテキストで、FIWAREという言葉を聞くようになりました。FIWAREはもともと欧州を中心に開発保守されているコンポーネント群ですが、Let's FIWARE等の日本語技術情報も増えてきたこともあり1、日本でも少しずつ注目されて始めているようです。
実際、FIWAREの公式チュートリアル(Let's FIWAREの日本語訳)を参照すれば、docker-composeを用いてFIWAREの主要コンポーネントの動作を簡単に試してみることができます。しかしエンジニアの視点からすると、性能や可用性、運用面からdocker-composeのまま実システムとして動かすのは無理があるため、実運用に向けた次の一歩が気になるところです。特にIoTプラットフォームとして捉えた場合、デバイスから送信されるデータがバーストした場合、あるいは何らかのトラブルでFIWARE側が停止した場合でも耐えられるように、デバイスとの接続部分にはクラウドのManaged Message Queueを活用したくなります。
ということで今回は、AzureのManaged Serviceをできる限り活用し、なるべく運用しやすいFIWARE IoTプラットフォームを作ってみたいと思います。
今回のアーキテクチャ
今回はFIWAREをIoTプラットフォームとして活用する際に必要となる最低限のコンポーネントとして、FIWARE OrionとFIWARE IoTAgent JSON、及びFIWARE CygnusをAzure AKS上に構築します。またFIWARE OrionやFIWARE IoTAgent JSONが依存するMongoDBクラスタもAKS上に構築します。
加えて、IoTデバイスとメッセージを送受信するManaged QueueとしてAzure ServiceBusを、及びIoTデバイスから収集したデータを蓄積するDBとしてAzure Database for PostgreSQLを活用します。
なお図中にある amqp converter ですが、これはFIWAREとAzure ServiceBusを中継する役割を担っています。
なお本当は、MongoDBクラスタを自力で運用するのではなく、Azure CosmosDBのMongoDB用APIでどうにかするつもりでした。しかしFIWARE OrionがMongoDBの細かい挙動に依存する実装になっていたため、多少のパッチ程度ではAzure CosmosDB上に実装されたMongoDB互換APIで動作させることができませんでした。残念。
2022年1月時点での最新版 Orion 3.4.0 + CosmosDB MongoDB API 4.0 であれば、全く問題無しというわけではないが、概ね動作します。(Azure CosmosDBとFIWARE Orionを連携させる) [2022/01 更新]
またMongoDBのManaged Serviceとして、MongoDB Atlasを使う手もあります。しかし現時点でのFIWARE IoTAgent JSON(1.14.0)はMongoDBとTLS通信する設定ができない2ため、やはりこちらも動作しません。ただしIoTAgentにTLSオプション等を設定可能にするPull Requestはすでにmasterにmergeされているため、次回のリリース以降ではMongoDB Atlasを使えるようになるはずです。
検証環境
種類 | コンポーネント | バージョン |
---|---|---|
クライアント | macOS Mojave | 10.14.6 |
azure-cli | 2.7.0 | |
kubectl | 1.18.3 | |
helm | 3.2.1 | |
docker | 19.03.8 | |
envsubst | 0.20.2 | |
jq | 1.6 | |
Azure | Azure AKS | 1.16.9 |
Azure ServiceBus | ||
Azure Database for PostgreSQL | 11.6 | |
FIWARE | FIWARE Orion | 2.4.0 |
FIWARE IoTAgent JSON | 1.14.0 | |
FIWARE Cygnus | 2.1.0 | |
その他 | MongoDB | 4.2.7 |
amqp converter | 0.1.0 |
環境構築
では早速、Azure Managed ServiceとFIWAREを連携させたIoTプラットフォームの環境構築をしていきましょう。これから説明するスクリプト類は、githubの nmatsui/qiita_azure_fiware に公開しています。自分で試す場合は、このリポジトリをcloneしていただけば手間が省けると思います。
環境変数の設定
まずは今回のIoTプラットフォームでAzureやFIWAREが必要とする変数を設定します。
variables/env.azure.temp
をvariables/env.azure
にコピーして、Azureのテナント名(TENANT
)やAzure Database for PostgreSQLのパスワード(PG_ADMIN_PASSWORD
)等を修正してください。
Azure用の環境変数の設定
TENANT="<<your tenant>>"
LOCATION="japaneast"
RESOURCE_GROUP="qiita_azure_fiware"
AKS="qafaks"
NODE_VM_SIZE="Standard_B2S"
NODE_COUNT="3"
AKS_VERSION="1.16.9"
SERVICEBUS="qafsbus"
PG="qafpg"
PG_ADMIN="postgres"
PG_ADMIN_PASSWORD="<<your password>>"
PG_SKU="GP_Gen5_2"
PG_VERSION="11"
PG_DATABASE="postgres"
同様に、variables/env.fiware.temp
をvariables/env.fiware
にコピーします。テンプレートのENTITIES
には、今回のIoTプラットフォームに接続するデバイスを定義します。
FIWARE用の環境変数の設定
FIWARE_SERVICE="demo"
FIWARE_SERVICEPATH="/demo/qiita"
ENTITIES="[{\"type\":\"device\",\"id\":\"device01\"}]"
Azure環境の構築
ではCLI経由でAzure AKS、Azure ServiceBus及びAzure Database for PostgreSQLを起動します。
CLIのログインとResourceGroupの作成
まずはazure-cliでログインして、ResourceGroupを作成します。
ログインとResourceGroupの作成
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure
## login azure
az login --tenant ${TENANT}
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure
## create resource group
az group create --name ${RESOURCE_GROUP} --location ${LOCATION}
Azure AKSの起動
最初にAzure AKSを起動します。5~10分程度時間がかかります。
Azure AKSの起動
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure
## start aks
az aks create --resource-group ${RESOURCE_GROUP} --name ${AKS} --node-count ${NODE_COUNT} --node-vm-size ${NODE_VM_SIZE} --generate-ssh-keys --kubernetes-version ${AKS_VERSION}
sleep 30
az aks get-credentials --resource-group ${RESOURCE_GROUP} --name ${AKS} --overwrite-existing
起動後に、kubectlが正しくAKSに接続できていることを確認してください。
kubectlの接続確認
$ kubectl get nodes
NAME STATUS ROLES AGE VERSION
aks-nodepool1-40227215-vmss000000 Ready agent 9m50s v1.16.9
aks-nodepool1-40227215-vmss000001 Ready agent 9m22s v1.16.9
aks-nodepool1-40227215-vmss000002 Ready agent 9m47s v1.16.9
Azure ServiceBusの起動
次にAzure ServiceBusを起動し、FIWARE側から接続するための「すべてのQueueを読み書きできるアクセスポリシー」を定義しておきます。また各IoTデバイスに対応した送信用Queueと受信用Queueも生成し、それぞれ送信用アクセスポリシーと読み取り用アクセスポリシーを定義しておきます。
Azure ServiceBusの起動
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
## start service bus
az servicebus namespace create --resource-group ${RESOURCE_GROUP} --name ${SERVICEBUS} --sku Standard
az servicebus namespace authorization-rule create --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --name service --rights Send Listen
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
type=$(echo ${ENTITIES} | jq -r ".[$i].type")
id=$(echo ${ENTITIES} | jq -r ".[$i].id")
az servicebus queue create --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --name "${type}.${id}.up"
az servicebus queue authorization-rule create --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name "${type}.${id}.up" --name device --rights Send
az servicebus queue create --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --name "${type}.${id}.down"
az servicebus queue authorization-rule create --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name "${type}.${id}.down" --name device --rights Listen
done
Azure Database for PostgreSQLの起動
最後にAzure Database for PostgreSQLを起動し、AKSのノードが所属するVNETからのアクセスのみ許可するようにService Endpointとアクセスルールを設定します3。
Azure Database for PostgreSQLの起動
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure
## start postgres
az postgres server create --resource-group ${RESOURCE_GROUP} --name ${PG} --location ${LOCATION} --admin-user ${PG_ADMIN} --admin-password ${PG_ADMIN_PASSWORD} --sku-name ${PG_SKU} --version ${PG_VERSION}
VNET=$(az network vnet list --resource-group MC_${RESOURCE_GROUP}_${AKS}_${LOCATION} --output json)
VNET_NAME=$(echo ${VNET} | jq -r '.[0].name')
SUBNET_NAME=$(echo ${VNET} | jq -r '.[0].subnets[0].name')
SUBNET_ID=$(echo ${VNET} | jq -r '.[0].subnets[0].id')
az network vnet subnet update --resource-group MC_${RESOURCE_GROUP}_${AKS}_${LOCATION} --name ${SUBNET_NAME} --vnet-name ${VNET_NAME} --service-endpoints Microsoft.SQL
sleep 30
az postgres server vnet-rule create --resource-group ${RESOURCE_GROUP} --server-name ${PG} --name ${PG}_rule --subnet ${SUBNET_ID}
なお今回はPostgreSQLにしましたが、FIWAREがIoTデータをストアできるDatabaseは他にもMySQLやMongoDBなど豊富にあります。詳細はFIWARE Cygnusのドキュメントを確認してください。
FIWARE環境の構築
ここまでで必要なAzure Managed Serviceが準備できましたので、ここからAzure AKS上にMongoDBとFIWAREのコンポーネントを立ち上げに入ります。
MongoDBの起動
まずHelmを用いてMongoDBクラスタを起動します。5~10分ぐらい時間がかかります。
MongoDBの起動
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
CWD=$(cd $(dirname $0);pwd)
## start mongodb
helm repo add stable https://kubernetes-charts.storage.googleapis.com
helm repo update
helm install stable/mongodb-replicaset --name-template mongodb -f ${CWD}/yaml/mongodb-replicaset-values-aks.yaml
MongoDBのPodがすべて起動した後、MongoDBクラスタの状態を確認してください。正しく起動していれば、PRIMARYのコンテナが一つSECONDARYのコンテナが二つ立ち上がっているはずです。
MongoDBクラスタの状態確認
$ kubectl get services -l release=mongodb -l app=mongodb-replicaset
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
mongodb ClusterIP None <none> 27017/TCP 10m
mongodb-client ClusterIP None <none> 27017/TCP 10m
$ kubectl get pods -l release=mongodb -l app=mongodb-replicaset
NAME READY STATUS RESTARTS AGE
mongodb-0 1/1 Running 0 9m52s
mongodb-1 1/1 Running 0 6m50s
mongodb-2 1/1 Running 0 4m6s
$ kubectl exec mongodb-0 -c mongodb-replicaset -- mongo --eval 'printjson(rs.status().members.map(function(e) {return {name: e.name, stateStr:e.stateStr};}))'
MongoDB shell version v4.2.3
connecting to: mongodb://127.0.0.1:27017/?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("3fc54d94-485a-4e2c-8361-e54b3890e3e5") }
MongoDB server version: 4.2.3
[
{
"name" : "mongodb-0.mongodb.default.svc.cluster.local:27017",
"stateStr" : "PRIMARY"
},
{
"name" : "mongodb-1.mongodb.default.svc.cluster.local:27017",
"stateStr" : "SECONDARY"
},
{
"name" : "mongodb-2.mongodb.default.svc.cluster.local:27017",
"stateStr" : "SECONDARY"
}
]
FIWARE Orionの起動
次にFIWARE Orionを立ち上げます。
FIWARE Orionの起動
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
CWD=$(cd $(dirname $0);pwd)
## start orion
kubectl apply -f ${CWD}/yaml/orion-service.yaml
kubectl apply -f ${CWD}/yaml/orion-deployment.yaml
FIWARE OrionのPodが二つ立ち上がっていることを確認してください。
FIWARE OrionのPod確認
$ kubectl get services -l app=orion
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
orion ClusterIP 10.0.229.82 <none> 1026/TCP 2m51s
$ kubectl get pods -l app=orion
NAME READY STATUS RESTARTS AGE
orion-86cdc7c48c-44695 1/1 Running 0 3m17s
orion-86cdc7c48c-9dx74 1/1 Running 0 3m17s
FIWARE IoTAgent JSONの起動
次にFIWARE IoTAgent JSONを立ち上げます。ただし現時点(1.14.0)のFIWARE IoTAgent JSONは、Azure ServiceBusに直接接続することができません4。
そのため、AMQP1.0とHTTPのプロトコル変換を行うコンポーネント(RoboticBase/amqp10-converter)をFIWARE IoTAgent JSONのSideCarとして立ち上げ、Azure ServiceBusとFIWARE IoTAgent JSONの仲立ちをさせます。
FIWARE IoTAgent JSONの起動
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
CWD=$(cd $(dirname $0);pwd)
## start iotagent json
SERVICE_KEY=$(az servicebus namespace authorization-rule keys list --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --name service | jq -r '.primaryKey')
kubectl create secret generic servicebus-credentials --from-literal=primaryKey=$(echo ${SERVICE_KEY})
kubectl apply -f ${CWD}/yaml/iotagent-json-service.yaml
SERVICEBUS=${SERVICEBUS} ENTITIES=${ENTITIES} FIWARE_SERVICE=${FIWARE_SERVICE} FIWARE_SERVICEPATH=${FIWARE_SERVICEPATH} envsubst < ${CWD}/yaml/iotagent-json-deployment.yaml | kubectl apply -f -
FIWARE IoTAgent JSONのPodが二つ立ち上がり、それぞれ 2/2 Running
であることを確認してください。
FIWARE IoTAgent JSONのPod確認
$ kubectl get services -l app=iotagent-json
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
iotagent-json ClusterIP 10.0.162.149 <none> 4041/TCP,7896/TCP 76s
$ kubectl get pods -l app=iotagent-json
NAME READY STATUS RESTARTS AGE
iotagent-json-576dd9f4f9-jxcbq 2/2 Running 0 93s
iotagent-json-576dd9f4f9-zs56f 2/2 Running 0 93s
FIWARE Cygnusの起動
最後にFIWARE Cygnusを立ち上げます。
FIWARE Cygnusの起動
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
CWD=$(cd $(dirname $0);pwd)
## start cygnus
kubectl create secret generic postgresql-credential --from-literal=password=${PG_ADMIN_PASSWORD}
PG_HOST=$(az postgres server show --resource-group ${RESOURCE_GROUP} --name ${PG} | jq -r '.fullyQualifiedDomainName')
kubectl apply -f ${CWD}/yaml/cygnus-postgresql-service.yaml
PG_HOST=${PG_HOST} PG_ADMIN=${PG_ADMIN} PG=${PG} PG_DATABASE=${PG_DATABASE} envsubst < ${CWD}/yaml/cygnus-postgresql-deployment.yaml | kubectl apply -f -
FIWARE CygnusのPodが二つ立ち上がっていることを確認してください。
FIWARE CygnusのPod確認
$ kubectl get services -l app=cygnus-postgresql
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
cygnus-postgresql ClusterIP 10.0.198.80 <none> 5055/TCP,5080/TCP 2m3s
$ kubectl get pods -l app=cygnus-postgresql
NAME READY STATUS RESTARTS AGE
cygnus-postgresql-5494f4c7d4-c65k2 1/1 Running 0 117s
cygnus-postgresql-5494f4c7d4-lqblk 1/1 Running 0 117s
Azure Database for PostgreSQLにテーブルを作成
今回のデモでは、FIWARE Cygnusを "columnモード" (CYGNUS_POSTGRESQL_ATTR_PERSISTENCE="column"
)で動作させています。このモードはIoTデバイスから送信されるデータがそのまま一つの行としてInsertされるのでわかりやすいのですが、必要なテーブルを自動作成してくれません。そのため最初に自分でCREATE SCHEMA及びCREATE TABLEを実行する必要があります。
schemaとtableの作成
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.azure
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
PG_HOST=$(az postgres server show --resource-group ${RESOURCE_GROUP} --name ${PG} | jq -r '.fullyQualifiedDomainName')
TBL="dummy"
if [[ ${FIWARE_SERVICEPATH} =~ ^/?(.+)$ ]]; then
TBL=${BASH_REMATCH[1]////_}
fi
## sql
SQL=$(cat << __EOS__
CREATE SCHEMA IF NOT EXISTS ${FIWARE_SERVICE};
DROP TABLE IF EXISTS ${FIWARE_SERVICE}.${TBL};
CREATE TABLE IF NOT EXISTS ${FIWARE_SERVICE}.${TBL} (
id SERIAL NOT NULL,
recvTime TIMESTAMP WITH TIME ZONE NOT NULL,
fiwareServicePath TEXT NOT NULL,
entityId TEXT NOT NULL,
entityType TEXT NOT NULL,
temperature NUMERIC,
temperature_md JSON,
humidity NUMERIC,
humidity_md JSON,
open_info TEXT,
open_info_md JSON,
open_status TEXT,
open_status_md JSON,
PRIMARY KEY (id)
);
DROP INDEX IF EXISTS ${TBL}_entityId_idx;
CREATE INDEX IF NOT EXISTS ${TBL}_entityId_idx ON ${FIWARE_SERVICE}.${TBL} (entityId);
DROP INDEX IF EXISTS ${TBL}_recvTime_idx;
CREATE INDEX IF NOT EXISTS ${TBL}_recvTime_idx ON ${FIWARE_SERVICE}.${TBL} (recvTime);
__EOS__
)
echo "SQL=${SQL}"
## create schema and table
kubectl run --rm -i --restart=OnFailure --image postgres:${PG_VERSION} psql-client -- /bin/bash -c \
"PGPASSWORD=${PG_ADMIN_PASSWORD} psql --host=${PG_HOST} --port=5432 --dbname=${PG_DATABASE} --username=${PG_ADMIN}@${PG} --command '${SQL}'"
FIWAREの設定
IoTプラットフォームの環境が構築できましたので、FIWAREにIoTデバイスの設定を投入します。各FIWAREコンポーネントの詳細は、それぞれのドキュメントをご参照ください。
Service Groupの登録
まずFIWARE IoTAgent JSONへService Groupを登録します。
Service Groupの登録
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
## register service group
cnt=0
len=$(echo ${ENTITIES} | jq '[.[].type] | unique | length')
for i in $(seq 0 $((${len} - 1))); do
type=$(echo ${ENTITIES} | jq -r "[.[].type] | unique | .[$i]")
kubectl run -i --rm --restart=OnFailure --image curlimages/curl curl$((++cnt)) -- \
-i "http://iotagent-json:4041/iot/services/" \
-H "Fiware-Service: ${FIWARE_SERVICE}" \
-H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
-H "Content-Type: application/json" \
-X POST -d @- <<__EOS__
{
"services": [
{
"apikey": "${type}",
"cbroker": "http://orion:1026",
"resource": "/iot/json",
"entity_type": "${type}"
}
]
}
__EOS__
done
kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
-sS "http://iotagent-json:4041/iot/services/" \
-H "Fiware-Service: ${FIWARE_SERVICE}" \
-H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .
登録に成功すると、次のようなFIWARE IoTAgent JSONに登録されたService Group情報が表示されます。
Service Groupの登録情報
{
"count": 1,
"services": [
{
"commands": [],
"lazy": [],
"attributes": [],
"_id": "5eda2776967c9e00061354d6",
"resource": "/iot/json",
"apikey": "device",
"service": "demo",
"subservice": "/demo/qiita",
"__v": 0,
"static_attributes": [],
"internal_attributes": [],
"entity_type": "device"
}
]
}
Deviceの登録
次にFIWARE IoTAgent JSONへService Groupを登録します。
Deviceの登録
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
## register devices
cnt=0
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
type=$(echo ${ENTITIES} | jq -r ".[$i].type")
id=$(echo ${ENTITIES} | jq -r ".[$i].id")
kubectl run -i --rm --restart=OnFailure --image curlimages/curl curl$((++cnt)) -- \
-i "http://iotagent-json:4041/iot/devices/" \
-H "Fiware-Service: ${FIWARE_SERVICE}" \
-H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
-H "Content-Type: application/json" \
-X POST -d @- <<__EOS__
{
"devices": [
{
"device_id": "${id}",
"entity_name": "${id}",
"entity_type": "${type}",
"timezone": "Asia/Tokyo",
"protocol": "json",
"attributes": [
{
"name": "temperature",
"type": "number"
},
{
"name": "humidity",
"type": "number"
}
],
"commands": [
{
"name": "open",
"type": "command"
}
],
"transport": "HTTP",
"endpoint": "http://localhost:3000/amqp10/cmd/${type}/${id}"
}
]
}
__EOS__
done
kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
-sS "http://iotagent-json:4041/iot/devices/" \
-H "Fiware-Service: ${FIWARE_SERVICE}" \
-H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .
kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
-sS "http://orion:1026/v2/entities/" \
-H "Fiware-Service: ${FIWARE_SERVICE}" \
-H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .
登録に成功すると、次のようなFIWARE IoTAgent JSONに登録されたDevice情報と、FIWARE IoTAgent JSONによって自動作成されたFIWARE OrionのEntity情報が表示されます。
Deviceの登録情報
{
"count": 1,
"devices": [
{
"device_id": "device01",
"service": "demo",
"service_path": "/demo/qiita",
"entity_name": "device01",
"entity_type": "device",
"endpoint": "http://localhost:3000/amqp10/cmd/device/device01",
"transport": "HTTP",
"attributes": [
{
"object_id": "temperature",
"name": "temperature",
"type": "number"
},
{
"object_id": "humidity",
"name": "humidity",
"type": "number"
}
],
"lazy": [],
"commands": [
{
"object_id": "open",
"name": "open",
"type": "command"
}
],
"static_attributes": [],
"protocol": "json"
}
]
}
Entityの登録情報
[
{
"id": "device01",
"type": "device",
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-05T22:37:38.00Z",
"metadata": {}
},
"humidity": {
"type": "number",
"value": " ",
"metadata": {}
},
"open_info": {
"type": "commandResult",
"value": " ",
"metadata": {}
},
"open_status": {
"type": "commandStatus",
"value": "UNKNOWN",
"metadata": {}
},
"temperature": {
"type": "number",
"value": " ",
"metadata": {}
},
"open": {
"type": "command",
"value": "",
"metadata": {}
}
}
]
Subscriptionの登録
最後に、FIWARE Orionのデータが更新された(=Deviceから新しいデータが送信されてきた)時に、FIWARE Cygnus経由でデータをPostgreSQLへ保存する設定を登録します。
Subscriptionの登録
#!/bin/bash
source $(cd $(dirname $0);pwd)/../../variables/env.fiware
## register subscription
cnt=0
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
type=$(echo ${ENTITIES} | jq -r ".[$i].type")
id=$(echo ${ENTITIES} | jq -r ".[$i].id")
kubectl run -i --rm --restart=OnFailure --image curlimages/curl curl$((++cnt)) -- \
-i "http://orion:1026/v2/subscriptions/" \
-H "Fiware-Service: ${FIWARE_SERVICE}" \
-H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
-H "Content-Type: application/json" \
-H "Content-Type: application/json" \
-X POST -d @- <<__EOS__
{
"subject": {
"entities": [{
"id": "${id}",
"type": "${type}"
}],
"condition": {
"attrs": ["temperature", "humidity", "open_status", "open_info"]
}
},
"notification": {
"http": {
"url": "http://cygnus-postgresql:5055/notify"
},
"attrs": ["temperature", "humidity", "open_status", "open_info"],
"attrsFormat": "legacy"
}
}
__EOS__
echo ""
done
kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
-sS "http://orion:1026/v2/subscriptions/" \
-H "Fiware-Service: ${FIWARE_SERVICE}" \
-H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .
登録に成功すると、FIWARE Orionに登録されたSubscription情報が表示されます。
Subscriptionの登録情報
[
{
"id": "5eda3857742e072c41a8ff45",
"status": "active",
"subject": {
"entities": [
{
"id": "device01",
"type": "device"
}
],
"condition": {
"attrs": [
"temperature",
"humidity",
"open_status",
"open_info"
]
}
},
"notification": {
"timesSent": 1,
"lastNotification": "2020-06-05T23:19:35.00Z",
"attrs": [
"temperature",
"humidity",
"open_status",
"open_info"
],
"onlyChangedAttrs": false,
"attrsFormat": "legacy",
"http": {
"url": "http://cygnus-postgresql:5055/notify"
},
"lastSuccess": "2020-06-05T23:19:35.00Z",
"lastSuccessCode": 200
}
}
]
動作確認
IoTプラットフォーム環境の準備が整いましたので、AMQP1.0で通信できるダミーのIoTデバイスとFIWAREをAzure ServiceBus経由で接続してみましょう。
ダミーデバイスの準備
ダミーのIoTデバイス用のDockerコンテナを作成します。
ダミーIoTデバイス用Dockerコンテナの作成
#!/bin/bash
CWD=$(cd $(dirname $0);pwd)
## create docker container
docker build -t my/dummy_client:0.0.1 ${CWD}/dummy_client
コンテナが作成されたことを確認してください。
ダミーIoTデバイス用Dockerコンテナの確認
$ docker image ls | grep my/dummy_client
my/dummy_client 0.0.1 f8a2e9348ad1 2 minutes ago 89.9MB
ダミーIoTデバイスから温度と湿度をFIWAREへ送信
では、ダミーIoTデバイスからAzure ServiceBusのQueueへ温度と湿度のデータを送ってみましょう。IoTプラットフォーム環境が正しく構築できていれば、Queueに到着したメッセージをFIWAREが取得し、そのデータをPostgreSQLに蓄積します。
ダミーIoTデバイスからデータを送信
先程作成したDockerコンテナを用い、AMQP1.0で温度と湿度のデータをAzure ServcieBusへ送信します。
温度と湿度のデータ送信
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.azure
source $(cd $(dirname $0);pwd)/../variables/env.fiware
## send attributes
HOST="${SERVICEBUS}.servicebus.windows.net"
PORT="5671"
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
type=$(echo ${ENTITIES} | jq -r ".[$i].type")
id=$(echo ${ENTITIES} | jq -r ".[$i].id")
SEND_QUEUE="${type}.${id}.up"
SENDER_USERNAME="device"
SENDER_PASSWORD=$(az servicebus queue authorization-rule keys list --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name ${SEND_QUEUE} --name ${SENDER_USERNAME} | jq -r '.primaryKey')
RECEIVE_QUEUE="${type}.${id}.down"
RECEIVER_USERNAME="device"
RECEIVER_PASSWORD=$(az servicebus queue authorization-rule keys list --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name ${RECEIVE_QUEUE} --name ${RECEIVER_USERNAME} | jq -r '.primaryKey')
docker run -e AMQP_HOST=${HOST} -e AMQP_PORT=${PORT} \
-e AMQP_SEND_QUEUE=${SEND_QUEUE} -e AMQP_SENDER_USERNAME=${SENDER_USERNAME} -e AMQP_SENDER_PASSWORD=${SENDER_PASSWORD} \
-e AMQP_RECEIVE_QUEUE=${RECEIVE_QUEUE} -e AMQP_RECEIVER_USERNAME=${RECEIVER_USERNAME} -e AMQP_RECEIVER_PASSWORD=${RECEIVER_PASSWORD} \
my/dummy_client:0.0.1 attrs
done
送信したデータが次のように表示されます。
sending msg: {
body: '{"attrs":{"temperature":26.857522721066257,"humidity":64.19745805586517}}'
}
[connection-1] await sendMessage -> Delivery id: 0, settled: true
sent attributes successfully
FIWAREの状態を確認
Azure ServiceBusへ送信したデータがFIWAREに反映されていることを確認してください。
FIWAREの状態確認
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.fiware
## show entities
cnt=0
kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
-sS "http://orion:1026/v2/entities/" \
-H "Fiware-Service: ${FIWARE_SERVICE}" \
-H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .
[
{
"id": "device01",
"type": "device",
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:05:49.00Z",
"metadata": {}
},
"humidity": {
"type": "number",
"value": 64.197458056,
"metadata": {
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:05:49.00Z"
}
}
},
"open_info": {
"type": "commandResult",
"value": " ",
"metadata": {}
},
"open_status": {
"type": "commandStatus",
"value": "UNKNOWN",
"metadata": {}
},
"temperature": {
"type": "number",
"value": 26.857522721,
"metadata": {
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:05:49.00Z"
}
}
},
"open": {
"type": "command",
"value": "",
"metadata": {}
}
}
]
PostgreSQLのテーブル確認
またAzure ServiceBusへ送信したデータがFIWAREを経由してPostgreSQLに蓄積されていることも確認できます。
PostgreSQLの状態確認
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.azure
source $(cd $(dirname $0);pwd)/../variables/env.fiware
PG_HOST=$(az postgres server show --resource-group ${RESOURCE_GROUP} --name ${PG} | jq -r '.fullyQualifiedDomainName')
TBL="dummy"
if [[ ${FIWARE_SERVICEPATH} =~ ^/?(.+)$ ]]; then
TBL=${BASH_REMATCH[1]////_}
fi
## sql
SQL=$(cat << __EOS__
SELECT * FROM ${FIWARE_SERVICE}.${TBL};
__EOS__
)
echo "SQL=${SQL}"
## show data
kubectl run --rm -i --restart=OnFailure --image postgres:${PG_VERSION} psql-client -- /bin/bash -c \
"PGPASSWORD=${PG_ADMIN_PASSWORD} psql --host=${PG_HOST} --port=5432 --dbname=${PG_DATABASE} --username=${PG_ADMIN}@${PG} --command '${SQL}'"
SQL=SELECT * FROM demo.demo_qiita;
id | recvtime | fiwareservicepath | entityid | entitytype | temperature | temperature_md | humidity | humidity_md | open_info | open_info_md | open_status | open_status_md
----+----------------------------+-------------------+----------+------------+--------------+------------------------------------------------------------------------------+--------------+------------------------------------------------------------------------------+-----------+--------------+-------------+----------------
1 | 2020-06-06 01:05:49.276+00 | /demo/qiita | device01 | device | 26.857522721 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | 64.197458056 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | | [] | UNKNOWN | []
(1 row)
FIWAREからダミーIoTデバイスへコマンドを送信
それでは最後に、FIWAREからダミーIoTデバイスへ窓を開ける命令を送ってみましょう。
FIWAREからコマンドを送信
FIWAREのREST APIを用いて、デバイスに "open" コマンドを送ります。
"open"コマンドの送信
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.fiware
## send command
cnt=0
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
type=$(echo ${ENTITIES} | jq -r ".[$i].type")
id=$(echo ${ENTITIES} | jq -r ".[$i].id")
kubectl run -i --rm --restart=OnFailure --image curlimages/curl curl$((++cnt)) -- \
-i "http://orion:1026/v2/entities/${id}/attrs?type=${type}" \
-H "Fiware-Service: ${FIWARE_SERVICE}" \
-H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
-H "Content-Type: application/json" \
-X PATCH -d @- <<__EOS__
{
"open": {
"value": "window1"
}
}
__EOS__
done
kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
-sS "http://orion:1026/v2/entities/" \
-H "Fiware-Service: ${FIWARE_SERVICE}" \
-H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .
ダミーIoTデバイスがまだコマンドを受信していたいため、FIWAREが管理しているコマンドの状態はPENDING
になっています。
[
{
"id": "device01",
"type": "device",
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:08:28.00Z",
"metadata": {}
},
"humidity": {
"type": "number",
"value": 64.197458056,
"metadata": {
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:05:49.00Z"
}
}
},
"open_info": {
"type": "commandResult",
"value": " ",
"metadata": {}
},
"open_status": {
"type": "commandStatus",
"value": "PENDING",
"metadata": {
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:08:28.00Z"
}
}
},
"temperature": {
"type": "number",
"value": 26.857522721,
"metadata": {
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:05:49.00Z"
}
}
},
"open": {
"type": "command",
"value": "",
"metadata": {}
}
}
]
またdownstreamキューを確認すると、Active Messageが1つ溜まっていることがわかります。
"open"コマンドの送信
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.azure
source $(cd $(dirname $0);pwd)/../variables/env.fiware
## show service bus queue
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
type=$(echo ${ENTITIES} | jq -r ".[$i].type")
id=$(echo ${ENTITIES} | jq -r ".[$i].id")
az servicebus queue show --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --name "${type}.${id}.down" | jq '{name: .name, count: .countDetails}'
done
{
"name": "device.device01.down",
"count": {
"activeMessageCount": 1,
"deadLetterMessageCount": 0,
"scheduledMessageCount": 0,
"transferDeadLetterMessageCount": 0,
"transferMessageCount": 0
}
}
ダミーIoTデバイスがコマンドを受信し、処理結果を送信
先程作成したDockerコンテナを用い、AMQP1.0でコマンドをAzure ServcieBusから受信し、処理結果をAzure ServcieBusへ送信します。
"open"コマンドの受信と処理結果の送信
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.azure
source $(cd $(dirname $0);pwd)/../variables/env.fiware
## send attributes
HOST="${SERVICEBUS}.servicebus.windows.net"
PORT="5671"
len=$(echo ${ENTITIES} | jq '. | length')
for i in $( seq 0 $((${len} - 1)) ); do
type=$(echo ${ENTITIES} | jq -r ".[$i].type")
id=$(echo ${ENTITIES} | jq -r ".[$i].id")
SEND_QUEUE="${type}.${id}.up"
SENDER_USERNAME="device"
SENDER_PASSWORD=$(az servicebus queue authorization-rule keys list --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name ${SEND_QUEUE} --name ${SENDER_USERNAME} | jq -r '.primaryKey')
RECEIVE_QUEUE="${type}.${id}.down"
RECEIVER_USERNAME="device"
RECEIVER_PASSWORD=$(az servicebus queue authorization-rule keys list --resource-group ${RESOURCE_GROUP} --namespace-name ${SERVICEBUS} --queue-name ${RECEIVE_QUEUE} --name ${RECEIVER_USERNAME} | jq -r '.primaryKey')
docker run -e AMQP_HOST=${HOST} -e AMQP_PORT=${PORT} \
-e AMQP_SEND_QUEUE=${SEND_QUEUE} -e AMQP_SENDER_USERNAME=${SENDER_USERNAME} -e AMQP_SENDER_PASSWORD=${SENDER_PASSWORD} \
-e AMQP_RECEIVE_QUEUE=${RECEIVE_QUEUE} -e AMQP_RECEIVER_USERNAME=${RECEIVER_USERNAME} -e AMQP_RECEIVER_PASSWORD=${RECEIVER_PASSWORD} \
my/dummy_client:0.0.1 cmd
done
ダミーIoTデバイスが受信したコマンドを処理した結果が表示されます。
start consuming cmd
receiving msg { cmd: { open: 'window1' } }
sending msg: {
body: '{"cmdexe":{"open":"processed window1 at 2020-06-06T01:28:40.312Z"}}'
}
[connection-2] await sendMessage -> Delivery id: 0, settled: true
FIWAREの状態を確認
FIWAREが管理しているコマンドの状態がOK
になり、ダミーIoTデバイスから送信された処理結果が反映されていることを確認してください。
FIWAREの状態確認
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.fiware
## show entities
cnt=0
kubectl run -i --rm --restart=OnFailure --quiet=true --image curlimages/curl curl$((++cnt)) -- \
-sS "http://orion:1026/v2/entities/" \
-H "Fiware-Service: ${FIWARE_SERVICE}" \
-H "Fiware-ServicePath: ${FIWARE_SERVICEPATH}" \
| jq .
[
{
"id": "device01",
"type": "device",
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:28:40.00Z",
"metadata": {}
},
"humidity": {
"type": "number",
"value": 64.197458056,
"metadata": {
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:05:49.00Z"
}
}
},
"open_info": {
"type": "commandResult",
"value": "processed window1 at 2020-06-06T01:28:40.312Z",
"metadata": {
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:28:40.00Z"
}
}
},
"open_status": {
"type": "commandStatus",
"value": "OK",
"metadata": {
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:28:40.00Z"
}
}
},
"temperature": {
"type": "number",
"value": 26.857522721,
"metadata": {
"TimeInstant": {
"type": "DateTime",
"value": "2020-06-06T01:05:49.00Z"
}
}
},
"open": {
"type": "command",
"value": "",
"metadata": {}
}
}
]
PostgreSQLのテーブル確認
またコマンドの処理状況がPostgreSQLに蓄積されていることも確認できます。
PostgreSQLの状態確認
#!/bin/bash
source $(cd $(dirname $0);pwd)/../variables/env.azure
source $(cd $(dirname $0);pwd)/../variables/env.fiware
PG_HOST=$(az postgres server show --resource-group ${RESOURCE_GROUP} --name ${PG} | jq -r '.fullyQualifiedDomainName')
TBL="dummy"
if [[ ${FIWARE_SERVICEPATH} =~ ^/?(.+)$ ]]; then
TBL=${BASH_REMATCH[1]////_}
fi
## sql
SQL=$(cat << __EOS__
SELECT * FROM ${FIWARE_SERVICE}.${TBL};
__EOS__
)
echo "SQL=${SQL}"
## show data
kubectl run --rm -i --restart=OnFailure --image postgres:${PG_VERSION} psql-client -- /bin/bash -c \
"PGPASSWORD=${PG_ADMIN_PASSWORD} psql --host=${PG_HOST} --port=5432 --dbname=${PG_DATABASE} --username=${PG_ADMIN}@${PG} --command '${SQL}'"
SQL=SELECT * FROM demo.demo_qiita;
id | recvtime | fiwareservicepath | entityid | entitytype | temperature | temperature_md | humidity | humidity_md | open_info | open_info_md | open_status | open_status_md
----+----------------------------+-------------------+----------+------------+--------------+------------------------------------------------------------------------------+--------------+------------------------------------------------------------------------------+-----------------------------------------------+------------------------------------------------------------------------------+-------------+------------------------------------------------------------------------------
1 | 2020-06-06 01:05:49.276+00 | /demo/qiita | device01 | device | 26.857522721 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | 64.197458056 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | | [] | UNKNOWN | []
2 | 2020-06-06 01:08:28.534+00 | /demo/qiita | device01 | device | 26.857522721 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | 64.197458056 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | | [] | PENDING | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:08:28.00Z"}]
3 | 2020-06-06 01:28:40.559+00 | /demo/qiita | device01 | device | 26.857522721 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | 64.197458056 | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:05:49.00Z"}] | processed window1 at 2020-06-06T01:28:40.312Z | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:28:40.00Z"}] | OK | [{"name":"TimeInstant","type":"DateTime","value":"2020-06-06T01:28:40.00Z"}]
(3 rows)
最後に
ということで、Azure Managed ServiceとFIWAREを活用したIoTプラットフォームのデモを動かしてみました。今回はAzureで動かしてみましたが、AWS AmazonMQ、AWS RDS及びAWS EKSでも同様のIoTプラットフォームは実装できると思います(たぶん)。もしうまく動作させることができた方がいらっしゃったら、ぜひ教えて下さい。
-
2019年に開催した勉強会の資料: FIWARE勉強会 20190913 ↩
-
FIWARE Orionは現時点(2.4.0)でTLS接続オプションを設定できます ↩
-
ServiceEndpointを利用するため、Azure Database for PostgreSQLでBasic SKUを使うことはできません: Azure Portal を使用して Azure Database for PostgreSQL - Single Server で VNet サービス エンドポイントと VNet ルールを作成および管理する ↩
-
FIWARE IoTAgent JSONは、HTTP、MQTT3.1.1及びAMQP 0.9.1のプロトコルをサポートしていますが、Azure ServcieBusが提供するAMQP 1.0には対応していないためです ↩