はじめに
先日SORACOM Discovery 2020 ONLINEというイベントが開催されました。
日本発のIoTプラットフォームとして注目されているソラコムの年間最大のイベントで、現在の状況を鑑みてオンラインで開催されました。僕も会社から視聴したり、ナイトイベントに登壇したりしておりまして、せっかくなので新発表についてのブログも書いてみます。
題材はインラインプロセッシング SORACOM Orbitの下り通信の活用方法です。特にバッテリー駆動などのデバイスで、シンプルな構成のまま通信回数、通信量を削減することができます。
SORACOM Orbitとは
SORACOM Orbitはデバイスから送信されるデータを、SORACOMプラットフォーム内で加工してから、クラウドに送信することができる、というサービスです。例えばデバイスが処理能力や通信量の関係でバイナリデータしか送信できないけど、対応するクラウドはJSONしか受け取ってくれない、という場合に便利なサービスです。
このような用途ではバイナリパーサーが使用できるのですが、バイナリパーサーは固定されたフォーマットを変換することはできるものの、データ量が設定によって変わったり、データのヘッダによってデータの解釈を変えたり、配列的なデータを扱う場合には使えない、ということもあり、用途はかなり限られていたと思います。(それでもこれがあるのはSORACOMの大きな特長でした。今でもバイナリパーサーは便利に、無料で使えます)
バイナリパーサーはもっと便利になる可能性を感じていて、僕もソラコムサンタとかにも要望を上げたりしてました。Orbitの開発者の方もすごいバイナリパーサーを作りたい、という動機で開発したとのことです。
SORACOM Orbitでは、ユーザーがデータ変換用のプログラムを書き、それをSORACOMプラットフォーム内に登録することができるようになっており、バイナリパーサーではできなかった柔軟なデータ変換が可能です。また、SORACOM Airメタデータのタグを取得することもできるようになっており、タグのデータを付加してデータを送信したり、タグの内容により変換処理を変える、といったこともできるようになっております。
そしてSORACOM Orbitではバイナリパーサーにはできなかった大きな特長がありまして、それは下り(クラウド→デバイス)の通信も変換できる、ということです。バイナリパーサーではデバイスからクラウドへの通信を変換することはできましたが、クラウドからデバイスへの通信は変換できませんでした。主な用途がバイナリとJSONとの相互変換であるということを考えた時、バイナリ→JSONの変換は比較的容易(ライブラリなしでもできる)であるものの、JSON→バイナリへの変換は結構大変です。これをOrbitにより改善することができます。
また、「デバイスリードパターン」と呼ばれる、「デバイスからクラウドにデータをアップロードし、その戻りのデータでデバイスの制御をする」といった制御方法を用いる場合、設計の複雑さや通信量がかなり改善できるのでは、という案を思いついたので、これを試してみました。
デバイスリードパターン
クラウドからデバイスを遠隔制御したい、という場合、いくつかパターンがあります。ソラコムの説明では大きく分けて以下の3つのパターンに分かれるとのことです。
- IPアクセスパターン
- アプリケーションパターン
- デバイスリードパターン
詳しくはこちらのスライドをご覧ください。
要件によってこれらのパターンを使い分けるのですが、特にバッテリー駆動で長時間動作させる制約があり、可能な限り動作や通信を止めて省電力にしたい場合にはデバイスリードパターンを使うことになると思います。
センサー値をアップロードするケースを考えると、センサー値をアップロードするリクエストのレスポンスに設定を入れる方法と、センサー値のアップロードのリクエストと設定情報の要求のリクエストを分ける方法がありますが、一長一短あります。
センサー値をアップロードするリクエストのレスポンスに設定を入れる方法の場合
このような構成になります。
この方法の良いところは1回の通信で必要な処理が完結するところです。通信は電力消費の大部分を占めることになるため、通信回数が少ないことは省電力に有利になります。これだけでこの方法を採用する理由は十分です。
改善したい点は以下の3点です。
まず保存や設定取得にあたる部分をプログラミングする必要があります。SORACOM Funkができて、実行環境にAWS Lambdaを選択できるようになったのはよい(Funkができる以前はBeamと自前の実行環境が必要だった)のですが、それでもサーバーサイドのプログラミングは面倒ですし、安定して動作させるのは苦労します。センサー値を保存する、といった処理の場合は、SORACOM HarvestやSORACOM Funnel + Amazon Kinesis Data Firehoseなどの専用サービスに任せたいところですが、この構成の場合応答を返す必要があるため使えません。これが一番大きいですね。
次にLambda関数にSORACOMのWeb APIを呼び出すための認証情報を持たせる必要があります。あまり認証情報を他の環境に持たせて、管理したくはないですね。
また、1つのLambda関数で、2つの役割(データの保存と設定の取得)を兼ねているのも、設計上あまりよくありません。通常は役割ごとに関数を分け、それぞれ呼び出すのですが、通信回数を減らす必要上役割を兼ねることになりますので、Lambda内で責務分離をする必要があるでしょう。
センサー値のアップロードのリクエストと設定情報の要求のリクエストを分ける方法の場合
このような構成になります。
この方法の良いところは、センサーを保存する処理を専用のサービスに任せられるため自分でサーバー側のプログラミングをする必要がないところと、認証情報はSIM認証が使えるため管理が必要ないこと、役割が分離されて明確であることですね。非常にシンプルな構成になっています。センサー値の行き先はSORACOM Harvestにしていますが、用途に応じてSORACOM Funnelにしたり、Unified Endpointに送ってそこから先はユーザーの設定に任せることもできます。GPSマルチユニットSORACOM Editionはこの構成ですね。
この構成の改善点は、通信回数が増えることです。センサー値をアップロードするごとに設定受信もする場合、単純に通信回数は倍になりますし、Harvestへの通信はUDPで送ることもできますがメタデータサービスへの通信はHTTPのみであるため、ややパケット数や通信量は多くなります。そのため、例えばGPSマルチユニットSORACOM Editionでは設定の取得は1日1回もしくはボタン操作による、など、設定取得を減らして通信量を下げる必要がありますが、その場合設定してから反映されるまでに時間がかかることになります。このあたりの頻度の調整は悩むところですね。
1回の通信で複数のサービスと連携する場合
Unified Endpointを使って、1回の通信で複数のサービスと連携することで、ちょっと改善できます。
プログラミングと実行環境が必要な点、認証情報の管理が必要な点は変わっていませんが、センサー値の保存は専用サービスのSORACOM Harvest、設定の取得はSORACOM Funk + Lambdaと明確に役割を分離して任せることができています。特にセンサー値保存処理の安定性が上がることが期待できますね。
やや構成が複雑になることと、サービスの起動回数が増えてコストが上がることが改善点と言えるでしょう。
Orbitを使った場合
さて、本題のOrbitを使うことでどのように改善できるかですが、以下の構成のようにできます。
やっていることはさきほどの「1回の通信で複数のサービスと連携する場合」とほとんど同じなのですが、かなり構成がシンプルになりました。ポイントは下りの通信にOrbitを適用し、タグから取得した設定を付加してデバイスに渡すことです。これによりここまでの構成の課題が以下のように改善されています。
- デバイスからの通信は1回で済む
- HarvestやFunnelのような応答を返せない専用サービスも利用可能
- センサー値の処理はOrbitの転送先、設定の取得はOrbitと役割が分離されている
- 外部からSORACOMにアクセスするための認証情報の管理が必要ない
なかなか良い感じですね。この構成の課題としては、Orbit用のWASMのプログラミング、ビルド、モジュールの管理が必要なことと、Orbitの使用とリクエストのコストがかかることと言えます。センサー値を1分に1回アップロードすると、
20(利用料金) + 44640(リクエスト/月) * 40(円 / 10000リクエスト) / 10000(リクエスト) = 198.56
となりますので、おおよそ月200円です。消費電力が問題になるデバイスでそこまで頻繁にアップロードはしないと考えられますので、1時間に1回アップロードであれば、
20(利用料金) + 744(リクエスト/月) * 40(円 / 10000リクエスト) / 10000(リクエスト) = 49.76
でおおよそ月50円となりますので、そこまで問題にならないコストかなと思います。
作るもの
Wio LTEに様々なセンサーがついており、それを一定間隔でSORACOM Harvestにアップロードする、というユースケースを考えます。
使用するセンサーやアップロード間隔はSORACOM Airのタグを変更することで設定でき、設定はセンサー値をアップロードした時に反映されることとします。
構成は先ほどの図のままですね。
デバイス側のプログラム
デバイス側のプログラムはWio LTEのサンプルから各種センサーの取得部分を持ってきて、設定に応じてセンサー値を取得して、アップロードする、といったものです。受信したデータは以下のようなフォーマットになります。
ステータスコード(3桁の数字)(スペース)バイナリ設定データ(3byte)
ステータスコードとスペースはUnified Endpointが自動的に付加します。バイナリ設定データをOrbitはOrbitが生成し、デバイス内の設定として使います。Orbitがデバイスがそのまま使える形で生成しているため、JSONのパースが必要なく、JSONのライブラリが必要ありません。ただし、バイナリを使う場合はデバイスとOrbitのCPUアーキテクチャによる違い(エンディアンや数値型のサイズ、アラインメントなど)に気をつけましょう。理想的には受信したデータをそのままデバイスの構造体にコピーすれば設定が完了するのですが、どこまでOrbitで吸収するかは考える必要があります。
初回空データを送っているのは、起動直後では設定値が分からないためです。省電力動作の際には「Wio Extension - RTC」のEEPROM領域に設定を保存しておいて、それを使うなどが必要になるかと思います。
#include <WioLTEforArduino.h>
#include <ADXL345.h> // https://github.com/Seeed-Studio/Accelerometer_ADXL345
#include <Ultrasonic.h> // https://github.com/Seeed-Studio/Grove_Ultrasonic_Ranger
#include <stdio.h>
#define RECEIVE_TIMEOUT (10000)
#define BUTTON_PIN (WIOLTE_D38)
#define MAGNETIC_SWITCH_PIN (WIOLTE_D20)
#define SENSOR_PIN (WIOLTE_A6)
#define ULTRASONIC_PIN (WIOLTE_A4)
WioLTE Wio;
ADXL345 Accel;
Ultrasonic UltrasonicRanger(ULTRASONIC_PIN);
struct SensorInUse {
bool button : 1 = false;
bool magSwitch : 1 = false;
bool temparature : 1 = false;
bool humidity : 1 = false;
bool distance : 1 = false;
bool accel : 1 = false;
bool reserved : 2 = 0;
} sensorInUse;
uint16_t interval = 0;
uint8_t setting[3] = { 0, 0, 0 };
uint16_t getUint16FromBytes(const uint8_t *input){
uint16_t ret = 0;
ret += ((uint16_t)input[0]) << 8;
ret += ((uint16_t)input[1]) << 0;
return ret;
}
void updateSetting(const uint8_t *input){
memcpy(setting, input, sizeof(setting));
interval = getUint16FromBytes((const uint8_t *)&setting[0]);
memcpy(&sensorInUse, &setting[2], 1);
}
void setup() {
delay(200);
SerialUSB.println("");
SerialUSB.println("--- START ---------------------------------------------------");
SerialUSB.println("### I/O Initialize.");
Wio.Init();
SerialUSB.println("### Power supply ON.");
Wio.PowerSupplyLTE(true);
delay(500);
SerialUSB.println("### Turn on or reset.");
if (!Wio.TurnOnOrReset()) {
SerialUSB.println("### ERROR! ###");
return;
}
SerialUSB.println("### Connecting to \"soracom.io\".");
if (!Wio.Activate("soracom.io", "sora", "sora")) {
SerialUSB.println("### ERROR! ###");
return;
}
Wio.PowerSupplyGrove(true);
pinMode(BUTTON_PIN, INPUT);
pinMode(MAGNETIC_SWITCH_PIN, INPUT);
TemperatureAndHumidityBegin(SENSOR_PIN);
Accel.powerOn();
SerialUSB.println("### Setup completed.");
}
void loop() {
char data[1024];
memset(data, 0, sizeof(data));
strcpy(data, "{");
if (sensorInUse.temparature || sensorInUse.humidity){
float temp;
float humi;
if (!TemperatureAndHumidityRead(&temp, &humi)) {
SerialUSB.println("ERROR!");
goto err;
}
if (sensorInUse.button){
int buttonState = digitalRead(BUTTON_PIN);
char buttonString[16];
sprintf(buttonString,"\"button\":%d,", buttonState);
strcat(data, buttonString);
}
if (sensorInUse.magSwitch){
int magSwitchState = digitalRead(MAGNETIC_SWITCH_PIN);
char magSwitchString[16];
sprintf(magSwitchString,"\"magSwitch\":%d,", magSwitchState);
strcat(data, magSwitchString);
}
if (sensorInUse.temparature){
char temparatureString[16];
sprintf(temparatureString,"\"temp\":%.1f,", temp);
strcat(data, temparatureString);
}
if (sensorInUse.humidity){
char humidityString[16];
sprintf(humidityString,"\"humi\":%.1f,", humi);
strcat(data, humidityString);
}
}
if (sensorInUse.accel){
int x;
int y;
int z;
Accel.readXYZ(&x, &y, &z);
char accelString[64];
sprintf(accelString,"\"x\":%d,\"y\":%d,\"z\":%d,", x, y, z);
strcat(data, accelString);
}
if (sensorInUse.distance){
long distance;
distance = UltrasonicRanger.MeasureInCentimeters();
char distanceString[16];
sprintf(distanceString,"\"distance\":%ld,", distance);
strcat(data, distanceString);
}
if (strlen(data) == 1){
strcpy(data, "{}");
} else {
data[strlen(data) - 1] = '}';
}
int connectId;
connectId = Wio.SocketOpen("uni.soracom.io", 23080, WIOLTE_UDP);
if (connectId < 0) {
SerialUSB.println("### Open ERROR! ###");
goto err;
}
SerialUSB.print("Send:");
SerialUSB.print(data);
SerialUSB.println("");
if (!Wio.SocketSend(connectId, data)) {
SerialUSB.println("### Send ERROR! ###");
goto err_close;
}
int length;
char recvBuf[1024];
length = Wio.SocketReceive(connectId, recvBuf, sizeof(recvBuf), RECEIVE_TIMEOUT);
if (length < 0) {
SerialUSB.println("### Receive ERROR! ###");
goto err_close;
}
if (length == 0) {
SerialUSB.println("### Receive TIMEOUT! ###");
goto err_close;
}
SerialUSB.print("Receive:");
SerialUSB.print(recvBuf);
SerialUSB.println("");
if (memcmp(setting, &recvBuf[4], sizeof(setting)) != 0){
SerialUSB.println("Setting Updated");
updateSetting((const uint8_t *)&recvBuf[4]);
}
err_close:
if (!Wio.SocketClose(connectId)) {
SerialUSB.println("### Close ERROR! ###");
goto err;
}
err:
delay(interval);
}
////////////////////////////////////////////////////////////////////////////////////////
//
int TemperatureAndHumidityPin;
void TemperatureAndHumidityBegin(int pin)
{
TemperatureAndHumidityPin = pin;
DHT11Init(TemperatureAndHumidityPin);
}
bool TemperatureAndHumidityRead(float* temperature, float* humidity)
{
byte data[5];
DHT11Start(TemperatureAndHumidityPin);
for (int i = 0; i < 5; i++) data[i] = DHT11ReadByte(TemperatureAndHumidityPin);
DHT11Finish(TemperatureAndHumidityPin);
if(!DHT11Check(data, sizeof (data))) return false;
if (data[1] >= 10) return false;
if (data[3] >= 10) return false;
*humidity = (float)data[0] + (float)data[1] / 10.0f;
*temperature = (float)data[2] + (float)data[3] / 10.0f;
return true;
}
////////////////////////////////////////////////////////////////////////////////////////
//
void DHT11Init(int pin)
{
digitalWrite(pin, HIGH);
pinMode(pin, OUTPUT);
}
void DHT11Start(int pin)
{
// Host the start of signal
digitalWrite(pin, LOW);
delay(18);
// Pulled up to wait for
pinMode(pin, INPUT);
while (!digitalRead(pin)) ;
// Response signal
while (digitalRead(pin)) ;
// Pulled ready to output
while (!digitalRead(pin)) ;
}
byte DHT11ReadByte(int pin)
{
byte data = 0;
for (int i = 0; i < 8; i++) {
while (digitalRead(pin)) ;
while (!digitalRead(pin)) ;
unsigned long start = micros();
while (digitalRead(pin)) ;
unsigned long finish = micros();
if ((unsigned long)(finish - start) > 50) data |= 1 << (7 - i);
}
return data;
}
void DHT11Finish(int pin)
{
// Releases the bus
while (!digitalRead(pin)) ;
digitalWrite(pin, HIGH);
pinMode(pin, OUTPUT);
}
bool DHT11Check(const byte* data, int dataSize)
{
if (dataSize != 5) return false;
byte sum = 0;
for (int i = 0; i < dataSize - 1; i++) {
sum += data[i];
}
return data[dataSize - 1] == sum;
}
////////////////////////////////////////////////////////////////////////////////////////
実装(Orbit)
Orbit側のプログラムは、SORACOM Orbitの開発ガイドに従って進めます。こちらは開発環境がDocker Imageで配布され、そのDocker Image上で開発するためのVSCode設定とソースコード、ツール一式を配布する、という形態を取っており、WASMという比較的新しくてビルド環境をそろえるのに苦労しそうな開発がこんな簡単に始められるのには感動しますね。
プログラムはAssembly Script、Rust、C(C++)で書けますが、今回はC(C++)で記載しました。
今回はdownlinkのみです。
タグからバイナリ設定データを作っているため、入力は読み込みません。
出力はバイナリであるためバイナリデータを指定して、content_typeをapplication/octet-streamに設定しています。
#include <string>
#include <emscripten.h>
#include "soracom/orbit.h"
char format[] = "application/octet-stream";
int32_t downlink_body();
struct SensorInUse {
bool button : 1;
bool magSwitch : 1;
bool temparature : 1;
bool humidity : 1;
bool distance : 1;
bool accel : 1;
bool reserved : 2;
};
struct OutputObject {
uint16_t interval;
struct SensorInUse sensor;
};
extern "C" {
EMSCRIPTEN_KEEPALIVE
int32_t downlink() {
return downlink_body();
}
}
std::string get_tag(const std::string& tag_name) {
const char* tag_value = NULL;
size_t tag_value_len = 0;
int32_t err = soracom_get_tag_value(tag_name.c_str(), tag_name.size(), &tag_value, &tag_value_len);
if (err < 0) {
return "";
}
return std::string(tag_value);
}
int putUint16ToBytes(uint16_t input, uint8_t *output){
output[0] = (uint8_t)(input >> 8);
output[1] = (uint8_t)(input >> 0);
return 2;
}
int createOutputString(OutputObject* obj, uint8_t* output) {
int len = 0;
len += putUint16ToBytes(obj->interval, &output[len]);
memcpy(&output[2], &obj->sensor, 1);
len += 1;
return len;
}
int32_t downlink_body() {
struct OutputObject obj;
std::string intervalString = get_tag("interval");
obj.interval = atoi(intervalString.c_str());
if (obj.interval == 0){
obj.interval = 60000;
}
obj.sensor.button = get_tag("button").compare("true") == 0;
obj.sensor.magSwitch = get_tag("magSwitch").compare("true") == 0;
obj.sensor.temparature = get_tag("temparature").compare("true") == 0;
obj.sensor.humidity = get_tag("humidity").compare("true") == 0;
obj.sensor.distance = get_tag("distance").compare("true") == 0;
obj.sensor.accel = get_tag("accel").compare("true") == 0;
obj.sensor.reserved = 0;
char output[1024];
int len = createOutputString(&obj, (uint8_t *)output);
orbit_set_output(output, len);
orbit_set_output_content_type(format, strlen(format));
return 0;
}
ビルドとSoraletの作成、アップロード
プログラムが書けたらビルドして、Soralet(SORACOM内でのWASMの容れ物)を生成し、そこにビルド済みのWASMモジュールをアップロードします。こちらも開発環境のDocker Imageの中にツールが入っているので、そのまま使うと良いでしょう。以下のコマンドを実行すると良いです。(Soraletの生成とモジュールのアップロードやコンソールからでもできます)
make release
soracom soralets create --soralet-id device-lead
soracom soralets upload --soralet-id device-lead --content-type application/octet-stream --body @build/soralet-optimized.wasm
SIMのグループ設定
使用するSIMにOrbitのグループ設定をしましょう。グループ設定の下の方にOrbitの設定があります。
ONにして、CODE SRNにはさきほど生成したSoraletとバージョンを指定します。バージョンは開発中は$LATEST、本番稼働では指定バージョンにするような使い方が想定されているものと思います。
CONTENT-TYPEは今回は入力を使っていないので何でも良いです。
DIRECTIONはDOWNLINKにチェック、メタデータも使用するのでチェックします。
これでこのグループ内のSIMでSORACOM Orbitが使える状態となりました。
最後にUnified Endpointの行き先としてSORACOM HarvestをONにしておきましょう。
動作
それでは実際に動作させてみましょう。最初はタグを以下の状態にしています。全センサーON、アップロード間隔は10秒です。
10秒ごとに全センサー値をアップロードしていますね。
設定を変えてみましょう。アップロード間隔は30秒、湿度センサーと磁気スイッチからは取得しないようにしました。
30秒ごとのアップロードに変わりましたね。
もちろんHarvestに保存されているデータも間隔やセンサー種が変わりました。
通信回数、通信量を抑えつつ、シンプルな形でセンサー値のアップロードと設定の更新ができる構成になりましたね。今回はHarvestでしたが、Unified Endpointの先は何であってもよいです。戻りのデータはSORACOM Orbitが作っているので、その先のサービスに依らずデータを生成できる、というところも良いと思います。(逆にFunkやBeamからの戻りのデータが必要な場合は、設定データの後に追加するなどの対処が必要)
おわりに
SORACOMはデバイスへの負荷を最小限に、クラウドと安全に連携できる様々なサービス群が魅力ですが、SORACOM Orbitによりさらに柔軟なデータの変換ができるようになりました。
また戻りの通信も色々細工できるので、これを使ったデバイスへのフィードバックも面白いですね。
特にデバイス側の開発者は、これを使えばデバイス側の負担を下げられるかも、ということは知っておいて良いと思います。