モチベーション
国外を含む複数拠点にまたがるIoT機器などからのデータ収集では拠点ごとのネットワークに依存しないネットワーク構築ができると便利です。SORACOMはワールドワイドで使えるサービスを展開していて、国内・国外で使えるSIMカードが購入できます。ここでは定期的にデータ収集したいIoTデバイスをロケーションフリーでデプロイするユースケースを仮定し、ペイロードが小さいデータをクラウドで可視化します。具体的には日本と北米の2拠点に設置したLinuxボックスでSORACOMのSIMで接続したネットワーク経由でping遅延を測定し、それをGoogle Cloudで収集した際の実測値をまとめてみました。
評価システム構成
ping測定方法
pythonpingを使用し、北米と日本の2拠点に設置したRaspberry Piで40バイトのICMPペイロードをwww.google.comに対し5回送信した時の平均値を計算し、時系列で収集します。厳密に同一時刻のデータ比較はできないため、収集期間に対しデータのばらつきを評価します。また、デバイスとクラウド間のJWTセッションの更新は5分ごとに行います。
ping収集結果のクラウド通知
Google Cloud IoTを使いMQTTでPub/Subイベントとして通知します。ともにSORACOMでのデータ通信が可能なキャリアネットワークを使用し、4G LTEルータあるいは3G USBドングルでデータ通信を行います。ペイロードのスキーマは次のようになります。
{
"type": "record",
"name": "ping",
"fields": [
{
"name": "timestamp",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "source",
"type": "string"
},
{
"name": "target",
"type": "string"
},
{
"name": "latency",
"type": "float"
}
]
}
具体的な送信メッセージ。あるタイムスタンプでwww.google.comへpingした時の遅延の平均をsoracom1ノードから通知する例です。sourceの種類は後述します。
{
"timestamp": 1628883028000,
"source":"soracom1",
"target":"www.google.com",
"latency":128.3
}
ペイロードをPub/Subへ送信するコードはJWTトークンのリフレッシュを含むサンプルを参考にしました。遅延測定用のMQTTトピックを使用し、パラメータはGoogle Cloudでプロジェクトを作ったとき生成されるものを環境変数から読み込んでシェルスクリプトで実行します。
python ./cloudiot_mqtt_gateway.py \
--registry_id=$REGISTRY_ID \
--gateway_id=$GATEWAY_ID \
--cloud_region=$CLOUD_REGION \
--project_id=$GOOGLE_CLOUD_PROJECT \
--private_key_file=rsa_private.pem \
--algorithm=RS256 \
--ca_certs=roots.pem \
--mqtt_bridge_hostname=mqtt.googleapis.com \
--mqtt_bridge_port=$MQTT_BRIDGE_PORT \
--device_id=$DEVICE_ID \
--jwt_expires_minutes=5
デモコードはCloud IoTでノードのメタ情報を送信できることを示すだけのものなので、次のpingの遅延を送信するコードを追記します。Python実行環境は評価したRaspberry Pi 3 Model B+で python 3.9をソースからビルドしてpipenvを構築しました。
....
from pythonping import ping
response_list = ping(PING_TARGET, size=40, count=5)
payload = {
"timestamp": int(time.time())*1000,
"source": args.device_id,
"target": PING_TARGET,
"latency": response_list.rtt_avg_ms
}
print("Publishing message {}/{}: '{}' to {}".format(i, args.num_messages, json.dumps(payload), mqtt_topic))
# [START iot_mqtt_jwt_refresh]
seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
if seconds_since_issue > 60 * jwt_exp_mins:
print("Refreshing token after {}s".format(seconds_since_issue))
jwt_iat = datetime.datetime.utcnow()
client.loop()
client.disconnect()
client = get_client(
args.project_id,
args.cloud_region,
args.registry_id,
args.device_id,
args.private_key_file,
args.algorithm,
args.ca_certs,
args.mqtt_bridge_hostname,
args.mqtt_bridge_port,
)
# [END iot_mqtt_jwt_refresh]
# Publish "payload" to the MQTT topic. qos=1 means at least once
# delivery. Cloud IoT Core also supports qos=0 for at most once
# delivery.
client.publish(mqtt_topic, json.dumps(payload), qos=1)
time.sleep(60)
....
ペイロードのsourceは"soracomX"(Xは番号)のように命名し、使用したネットワークと機器の組み合わせによって次のように区別します。
Xの値 | 1 | 2 | 3 | 6 | 4 | 5 |
---|---|---|---|---|---|---|
拠点 | (北米) | 北米 | 北米 | 北米 | 日本 | 日本 |
GW | ケーブルモデム | 4G LTE(Yifun) | 4G LTE(Yifun) | 4G LTE(GL.iNet) | 3G AK-020 | 3G AK-020 |
サービス | Xfinity/400M/20M | plan01s.4xfast | plan01s.slow | plan01s.slow | plan01s.4xfast | plan01s.slow |
セルラースピードの区分は日本・北米とも次のようになっています。キャリアは北米のテスト環境ではAT&T Wireless、日本ではスペックによるとドコモとなっています。
北米のセッションスナップショット
8月14日、8月21日にそれぞれYifun、GL.iNetの4G LTEルータをネットワーク接続したときにどのキャリアと繋がったかを示すセッション一覧を示します。SIMが1枚しかないので機器を交換しながらテストしました。YifunとGL.iNetのIMEMは末尾3桁がそれぞれ579、374で、8月21日のスナップショットではYifunが削除されてGL.iNetが接続となりキャリアはAT&T Wirelessで変更なし、ということが分かります。
8/14
8/21
日本のセッションスナップショット
8月16日にAK-020をネットワークへ接続したときの様子です。こちらは機器交換はなく同一のハードウエアを一連のテストで使用しました。
8/16
Cloud FunctionでPub/Subイベントの振り分け
Pub/Subイベントは受信側としてGoogle Cloud Storage(GCS)にCSVファイルで保存できます。次のサンプルコードは雛形をベースとしたものでファイル名を(source名).jsonとし最新データをソースごとに上書きしています。CSVを入れたGCSはData Studioのソースとして使えるので、ファイル数がさほど多くなければそのままデータを集約しレポート化する用途にも使えます。長期的なデータ収集で最新のみをキャッシュしたいとき、安価で信頼性のあるデータソースとなり得ます。
Pub/SubイベントをGoogle Storage CSVへ変換
import csv
import json
import datetime
import time
import base64
from google.cloud import storage
def hello_pubsub(event, context):
request_json_str = base64.b64decode(event['data']).decode('utf-8')
request_json = json.loads(request_json_str)
print(request_json)
upload_blob(request_json)
def upload_blob(msg):
"""Uploads a file to the bucket."""
destination_blob_name = f"{msg['source']}.csv"
source_file_name = f'/tmp/{destination_blob_name}'
bucket_name = 'soracomalarms'
fieldnames = ['timestamp', 'source', 'target', 'latency']
msg['timestamp'] = datetime.datetime.utcfromtimestamp(int(msg['timestamp'])//1000).strftime('%Y-%m-%d %H:%M:%S')
with open(source_file_name, 'w') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerow(msg)
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(
"File {} uploaded to {}.".format(
source_file_name, destination_blob_name
)
)
Pub/SubイベントをFirebase Realtime DBへ投入
Pub/Subイベントの受信側の別の例として、時系列データ保存のユースケースを考えます。具体的にはGoogle Cloud Functionを使用しFirebaseのリアルタイムデータベースに書き込んでいきます。データの分析を終えたら処理済み部分を捨てていくような用途を想定します。ペイロードが小さく中規模のデータで簡単なPythonによる簡単なクエリが実装できる安価なデータベースということで採用しました。Pub/Subイベントをトリガとするデータベースへの書き込みは次のコードを参考にしました。
// Firebase App (the core Firebase SDK) is always required and
// must be listed before other Firebase SDKs
var firebase = require("firebase/app");
// Add the Firebase products that you want to use
require("firebase/auth");
require("firebase/firestore");
const functions = require("firebase-functions");
const admin = require('firebase-admin');
admin.initializeApp();
// Set the configuration for your app
var config = {
apiKey: "<YOUR_API_KEY>",
authDomain: "<YOUR_PROJECT_ID>.firebaseapp.com",
databaseURL: "https://<YOUR_DB_NAME>.firebaseio.com",
projectId: "<YOUR_PROJECT_ID>",
storageBucket: "<YOUR_PROJECT_ID>.appspot.com",
messagingSenderId: "<YOUR_ID>",
appId: "<YOUR_APP_ID>",
measurementId: "<YOUR_MEASUREMENT_ID>"
};
firebase.initializeApp(config);
const db = admin.database();
function writeData(timestamp, source, target, latency) {
db.ref('latencies/' + timestamp).set({
source: source,
target: target,
latency: latency
});
}
// Take the text parameter passed to this HTTP endpoint and insert it into
// Firestore under the path /messages/:documentId/original
exports.helloPubSub = functions.pubsub.topic('<YOUR_TOPIC>').onPublish((message) => {
let timestamp = null;
let source = null;
let target = null;
let latency = null;
try {
timestamp = message.json.timestamp;
source = message.json.source;
target = message.json.target;
latency = message.json.latency;
} catch (e) {
functions.logger.error('PubSub message was not JSON', e);
}
writeData(timestamp, source, target, latency);
});
収集データの可視化
収集データの可視化ニーズとして、最新状態のダッシュボード・定期的なレポート・都度のデータ分析、などのユースケースを考えます。
GCP内のCSVファイルをData Studio・Flaskで可視化
リアルタイム状態を表示する場合、FlaskのようなWebサーバで先に作成したGCS内のCSVファイルをページ表示のタイミングで読み取り、UIに表示させることができます。Data StudioにGCSを直接つないで参照することもできますが、クエリ時にレポートの条件を変えないとキャッシュが更新されなかったりキャッシュの自動更新頻度もデータの更新と同期して行えないのでこの用途には不向きです。逆にダッシュボードを定期レポートにするような用途にはData Studioが有効活用できそうです。
FirebaseリアルタイムデータベースをJupyter Notebookで分析
時系列データへのクエリを行い必要なデータ処理を行うため、Jupyter NotebookからFirebaseリアルタイムデータベースへアクセスし遅延の集計処理を行います。データベースのAPIへ簡単なクエリを実行し可視化するなら、Jupyterから操作できるとツールへのアクセスが容易で便利です。
各拠点で使用した機器とネットワーク
日本国内向けの環境は、Raspberry Pi 3 model B+、データSIMと3G USBドングル(AK-020)をSORACOMから借用しました。soracom.ioのWebページからSIMのスペックはplan-Dであることと、サービスはs1が利用できていることが確認できました。セルラー情報はSIMの種類のためか空欄で確認できませんでした。
AK-020はpppセッションでネットワーク接続する際モデムコマンドによる制御が必要ですが、デバイスをUSBポートに挿すと自動的にppp接続まで完了するスクリプトが提供されています。接続デバイス以外のインタフェースは無効になるのでソフトウエアの設定はブロードバンドで行い、出荷前にスクリプトを実行しておき現地でデバイスをさしてもらうような運用が想定できます。
北米向けの環境は、IoTデバイスは日本向けの環境と合わせRaspberry Pi 3 Model B+を使用し、ネットワークデバイスは手元にあった 4G LTEルータ2機種(Yifan YF325, GL.iNet GL-X300B/Collie)にSORACOM Air SIMを交互に挿入して使用しました。
Raspberry Pi 3 Model B+
Yifan YF325
GL.iNet GL-X300B/Collie
SORACOM Air
可視化のサンプル
Flaskによる最新状態ダッシュボード
各ソースからのPub/Subイベントの最新データをGCSに入れたCSVファイルから読み取り、HTMLで表形式にしたものです。
Google Data Studioによるダッシュボード・定期レポート
Flaskと同じデータのData Studio版です。最新データしか見えていないのでデータポイントが1点のみです。時系列データにフィルタを適用して定期レポートとして利用することができます。
Firebaseリアルタイムデータベース
ソース別にデータをツリーとして格納した場合の例です。ソースごとに遅延の集計を行います。
計測結果
ソースごとの遅延の平均
実測値をサンプル数で割った平均をそのまま示します。ethは比較参照用に北米に設置したPCをEthernetケーブル接続し、同様のping測定をテスト全工程にわたって行った時の結果です。平均の分母となるサンプル数はソースごとに異なっています。
遅延のばらつき
機器やSIMが1セットしかないためソースごとに集計タイミングの異なるテスト結果をそのまま示します。
日本
北米
ヒストグラム
料金
SORACOM Air (北米)
日本のSIMでは課金状態が確認できないため、北米での結果を示します。SORACOM Air/plan01sで機器1台が毎分Pingを計測し先に示したスキーマでPub/Subイベントで通知するユースケースでは、1日あたり0.60ドルほど必要でした。
Google Cloud Platform
テスト期間中、ある時刻に北米2台、日本1台、の高々3台のIoTデバイスが動作しPub/Subイベントを毎分生成、また各デバイスともJWTセッションの更新リクエストが5分おきに発生します。まずGCPが受けたPub/Subイベントリクエストの推移を示します。
具体的な課金対象となるingress/egressの様子はGCPのMonitoring画面の"Billable bytes transferred"メトリクスから取得できます。特に19−20日は3台が定常状態で動作していたので時間当たりのトラフィックもほぼ一定です。
詳細を確認するため、8月20日1:00am から30分のスナップショットを見ると5分おきにピークを持つ繰り返し波形が発生しているのが分かります。
課金は8月18-20日のGCP全体の合計として0.08ドルでした。Cloud StorageへのCSV上書き+キャッシュ更新頻度(Data Freshness)を1時間としたData Studioからのリクエスト、Google FunctionによるCloud Storage/Firebase realtime DBへ向けたAPI呼び出しが課金されているようです。
まとめ
SORACOM Air/データSIMでネットワーク接続したデバイスは日本・北米ともに安定的にデータ収集できました。使用した遅延・サービス・デバイス固有の特性については次のようなことが分かりました。
- 4G LTE/US の方が全般的に3G USBドングル/JP より遅延が小さい
- 4G LTEを見る限り、4xfastとslowは帯域幅のみの違いで遅延の差はほとんどない
- 3Gドングル/JP は4G LTEルータ/US よりも遅延のばらつきが大きい
- GL.iNet 4G LTEルータはYifan 4G LTEルータよりも遅延が若干小さい?
Appendix:検証中に気づいた点
- 北米拠点で4G LTEルータは設定を変えていないが同一サービスが"3G or GSM"と認識されてしまうことがあった。
-
Pub/SubイベントをFirebaseへ投入するためのCloud Functionのドキュメントは動作コードを確定するのに時間を要した。
-
Firebase側のCloud FunctionのデバッグはFirebase Emulator Suiteをローカル環境にインストールして使うとデプロイ時間が0になりバグ検出→修正サイクルを最適化できる。
-
FirebaseのFunctionとGoogle Cloud Functionはともに同じ内容が表示されていて今後統合がさらに進んでいくのだろうかと感じた。
-
Raspberry Pi 3 model B+で Python3.9をビルドする際、ffiとsslパッケージをaptでインストールしてからmakeし、setuptoolsを最新にしないと動作するpipenvを作るのに何度かpython をビルドし直す必要があった。前者への対応として、対処策とは逆にリリースの新しい方向へ 6.soから7.soへのシンボリックリンクをマニュアルで作成した。
サンプルコード
GCSへアクセスするFlaskプロジェクトとFirebaseからのデータ分析に使ったNotebookを含んだリポジトリを次に置きました。