4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

SORACOMで日本と北米機器からping遅延を収集しGoogle Cloudで可視化

Last updated at Posted at 2021-08-22

モチベーション

国外を含む複数拠点にまたがるIoT機器などからのデータ収集では拠点ごとのネットワークに依存しないネットワーク構築ができると便利です。SORACOMはワールドワイドで使えるサービスを展開していて、国内・国外で使えるSIMカードが購入できます。ここでは定期的にデータ収集したいIoTデバイスをロケーションフリーでデプロイするユースケースを仮定し、ペイロードが小さいデータをクラウドで可視化します。具体的には日本と北米の2拠点に設置したLinuxボックスでSORACOMのSIMで接続したネットワーク経由でping遅延を測定し、それをGoogle Cloudで収集した際の実測値をまとめてみました。

評価システム構成

SORACOM_eval.png

ping測定方法

pythonpingを使用し、北米と日本の2拠点に設置したRaspberry Piで40バイトのICMPペイロードをwww.google.comに対し5回送信した時の平均値を計算し、時系列で収集します。厳密に同一時刻のデータ比較はできないため、収集期間に対しデータのばらつきを評価します。また、デバイスとクラウド間のJWTセッションの更新は5分ごとに行います。

ping収集結果のクラウド通知

Google Cloud IoTを使いMQTTでPub/Subイベントとして通知します。ともにSORACOMでのデータ通信が可能なキャリアネットワークを使用し、4G LTEルータあるいは3G USBドングルでデータ通信を行います。ペイロードのスキーマは次のようになります。

{
  "type": "record",
  "name": "ping",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "source",
      "type": "string"
    },
    {
      "name": "target",
      "type": "string"
    },
    {
      "name": "latency",
      "type": "float"
    }
  ]
}

具体的な送信メッセージ。あるタイムスタンプでwww.google.comへpingした時の遅延の平均をsoracom1ノードから通知する例です。sourceの種類は後述します。

{
  "timestamp": 1628883028000, 
  "source":"soracom1", 
  "target":"www.google.com", 
  "latency":128.3
}

ペイロードをPub/Subへ送信するコードはJWTトークンのリフレッシュを含むサンプルを参考にしました。遅延測定用のMQTTトピックを使用し、パラメータはGoogle Cloudでプロジェクトを作ったとき生成されるものを環境変数から読み込んでシェルスクリプトで実行します。

python ./cloudiot_mqtt_gateway.py \
  --registry_id=$REGISTRY_ID \
  --gateway_id=$GATEWAY_ID \
  --cloud_region=$CLOUD_REGION \
  --project_id=$GOOGLE_CLOUD_PROJECT \
  --private_key_file=rsa_private.pem \
  --algorithm=RS256 \
  --ca_certs=roots.pem \
  --mqtt_bridge_hostname=mqtt.googleapis.com \
  --mqtt_bridge_port=$MQTT_BRIDGE_PORT \
  --device_id=$DEVICE_ID \
  --jwt_expires_minutes=5

デモコードはCloud IoTでノードのメタ情報を送信できることを示すだけのものなので、次のpingの遅延を送信するコードを追記します。Python実行環境は評価したRaspberry Pi 3 Model B+で python 3.9をソースからビルドしてpipenvを構築しました。

....
        from pythonping import ping
 
        response_list = ping(PING_TARGET, size=40, count=5)
        payload = {
                "timestamp": int(time.time())*1000,
                "source": args.device_id,
                "target": PING_TARGET,
                "latency": response_list.rtt_avg_ms
        }

        print("Publishing message {}/{}: '{}' to {}".format(i, args.num_messages, json.dumps(payload), mqtt_topic))
        # [START iot_mqtt_jwt_refresh]
        seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
        if seconds_since_issue > 60 * jwt_exp_mins:
            print("Refreshing token after {}s".format(seconds_since_issue))
            jwt_iat = datetime.datetime.utcnow()
            client.loop()
            client.disconnect()
            client = get_client(
                args.project_id,
                args.cloud_region,
                args.registry_id,
                args.device_id,
                args.private_key_file,
                args.algorithm,
                args.ca_certs,
                args.mqtt_bridge_hostname,
                args.mqtt_bridge_port,
            )
        # [END iot_mqtt_jwt_refresh]
        # Publish "payload" to the MQTT topic. qos=1 means at least once
        # delivery. Cloud IoT Core also supports qos=0 for at most once
        # delivery.
        client.publish(mqtt_topic, json.dumps(payload), qos=1)
        time.sleep(60)
....

ペイロードのsourceは"soracomX"(Xは番号)のように命名し、使用したネットワークと機器の組み合わせによって次のように区別します。

Xの値 1 2 3 6 4 5
拠点 (北米) 北米 北米 北米 日本 日本
GW ケーブルモデム 4G LTE(Yifun) 4G LTE(Yifun) 4G LTE(GL.iNet) 3G AK-020 3G AK-020
サービス Xfinity/400M/20M plan01s.4xfast plan01s.slow plan01s.slow plan01s.4xfast plan01s.slow

セルラースピードの区分は日本・北米とも次のようになっています。キャリアは北米のテスト環境ではAT&T Wireless、日本ではスペックによるとドコモとなっています。

Screen Shot 2021-08-14 at 5.38.06 AM.png

北米のセッションスナップショット

8月14日、8月21日にそれぞれYifun、GL.iNetの4G LTEルータをネットワーク接続したときにどのキャリアと繋がったかを示すセッション一覧を示します。SIMが1枚しかないので機器を交換しながらテストしました。YifunとGL.iNetのIMEMは末尾3桁がそれぞれ579、374で、8月21日のスナップショットではYifunが削除されてGL.iNetが接続となりキャリアはAT&T Wirelessで変更なし、ということが分かります。

8/14

Screen Shot 2021-08-14 at 5.45.43 AM.png

8/21

Screen Shot 2021-08-21 at 5.53.11 PM.png

日本のセッションスナップショット

8月16日にAK-020をネットワークへ接続したときの様子です。こちらは機器交換はなく同一のハードウエアを一連のテストで使用しました。

8/16

Screen Shot 2021-08-17 at 2.26.26 PM.png

Cloud FunctionでPub/Subイベントの振り分け

Pub/Subイベントは受信側としてGoogle Cloud Storage(GCS)にCSVファイルで保存できます。次のサンプルコードは雛形をベースとしたものでファイル名を(source名).jsonとし最新データをソースごとに上書きしています。CSVを入れたGCSはData Studioのソースとして使えるので、ファイル数がさほど多くなければそのままデータを集約しレポート化する用途にも使えます。長期的なデータ収集で最新のみをキャッシュしたいとき、安価で信頼性のあるデータソースとなり得ます。

Pub/SubイベントをGoogle Storage CSVへ変換

import csv
import json
import datetime
import time
import base64
from google.cloud import storage


def hello_pubsub(event, context):
    request_json_str = base64.b64decode(event['data']).decode('utf-8')

    request_json = json.loads(request_json_str)
    print(request_json)
    upload_blob(request_json)


def upload_blob(msg):
    """Uploads a file to the bucket."""

    destination_blob_name = f"{msg['source']}.csv"
    source_file_name = f'/tmp/{destination_blob_name}'
    bucket_name = 'soracomalarms'

    fieldnames = ['timestamp', 'source', 'target', 'latency']
    msg['timestamp'] = datetime.datetime.utcfromtimestamp(int(msg['timestamp'])//1000).strftime('%Y-%m-%d %H:%M:%S')
    with open(source_file_name, 'w') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerow(msg)

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(
        "File {} uploaded to {}.".format(
            source_file_name, destination_blob_name
        )
    )

Pub/SubイベントをFirebase Realtime DBへ投入

Pub/Subイベントの受信側の別の例として、時系列データ保存のユースケースを考えます。具体的にはGoogle Cloud Functionを使用しFirebaseのリアルタイムデータベースに書き込んでいきます。データの分析を終えたら処理済み部分を捨てていくような用途を想定します。ペイロードが小さく中規模のデータで簡単なPythonによる簡単なクエリが実装できる安価なデータベースということで採用しました。Pub/Subイベントをトリガとするデータベースへの書き込みは次のコードを参考にしました。

// Firebase App (the core Firebase SDK) is always required and
// must be listed before other Firebase SDKs
var firebase = require("firebase/app");

// Add the Firebase products that you want to use
require("firebase/auth");
require("firebase/firestore");

const functions = require("firebase-functions");
const admin = require('firebase-admin');
admin.initializeApp();

// Set the configuration for your app
var config = {
    apiKey: "<YOUR_API_KEY>",
    authDomain: "<YOUR_PROJECT_ID>.firebaseapp.com",
    databaseURL: "https://<YOUR_DB_NAME>.firebaseio.com",
    projectId: "<YOUR_PROJECT_ID>",
    storageBucket: "<YOUR_PROJECT_ID>.appspot.com",
    messagingSenderId: "<YOUR_ID>",
    appId: "<YOUR_APP_ID>",
    measurementId: "<YOUR_MEASUREMENT_ID>"
};
firebase.initializeApp(config);

const db = admin.database();

function writeData(timestamp, source, target, latency) {
  db.ref('latencies/' + timestamp).set({
    source: source,
    target: target,
    latency: latency 
  });
}

// Take the text parameter passed to this HTTP endpoint and insert it into 
// Firestore under the path /messages/:documentId/original
exports.helloPubSub = functions.pubsub.topic('<YOUR_TOPIC>').onPublish((message) => {
  let timestamp = null;
  let source = null;
  let target = null;
  let latency = null;
  try {
    timestamp = message.json.timestamp;
    source = message.json.source;
    target = message.json.target;
    latency = message.json.latency;
  } catch (e) {
    functions.logger.error('PubSub message was not JSON', e);
  }
  writeData(timestamp, source, target, latency);
  
});

収集データの可視化

収集データの可視化ニーズとして、最新状態のダッシュボード・定期的なレポート・都度のデータ分析、などのユースケースを考えます。

GCP内のCSVファイルをData Studio・Flaskで可視化

リアルタイム状態を表示する場合、FlaskのようなWebサーバで先に作成したGCS内のCSVファイルをページ表示のタイミングで読み取り、UIに表示させることができます。Data StudioにGCSを直接つないで参照することもできますが、クエリ時にレポートの条件を変えないとキャッシュが更新されなかったりキャッシュの自動更新頻度もデータの更新と同期して行えないのでこの用途には不向きです。逆にダッシュボードを定期レポートにするような用途にはData Studioが有効活用できそうです。

FirebaseリアルタイムデータベースをJupyter Notebookで分析

時系列データへのクエリを行い必要なデータ処理を行うため、Jupyter NotebookからFirebaseリアルタイムデータベースへアクセスし遅延の集計処理を行います。データベースのAPIへ簡単なクエリを実行し可視化するなら、Jupyterから操作できるとツールへのアクセスが容易で便利です。

各拠点で使用した機器とネットワーク

日本国内向けの環境は、Raspberry Pi 3 model B+、データSIMと3G USBドングル(AK-020)をSORACOMから借用しました。soracom.ioのWebページからSIMのスペックはplan-Dであることと、サービスはs1が利用できていることが確認できました。セルラー情報はSIMの種類のためか空欄で確認できませんでした。

Screen Shot 2021-08-17 at 2.40.05 PM.png

AK-020はpppセッションでネットワーク接続する際モデムコマンドによる制御が必要ですが、デバイスをUSBポートに挿すと自動的にppp接続まで完了するスクリプトが提供されています。接続デバイス以外のインタフェースは無効になるのでソフトウエアの設定はブロードバンドで行い、出荷前にスクリプトを実行しておき現地でデバイスをさしてもらうような運用が想定できます。

北米向けの環境は、IoTデバイスは日本向けの環境と合わせRaspberry Pi 3 Model B+を使用し、ネットワークデバイスは手元にあった 4G LTEルータ2機種(Yifan YF325, GL.iNet GL-X300B/Collie)にSORACOM Air SIMを交互に挿入して使用しました。

Raspberry Pi 3 Model B+

RPI.png

Yifan YF325

IMG_5435.png

GL.iNet GL-X300B/Collie

GLINet.jpg

SORACOM Air

Soracom_sim.png
 

可視化のサンプル

Flaskによる最新状態ダッシュボード

各ソースからのPub/Subイベントの最新データをGCSに入れたCSVファイルから読み取り、HTMLで表形式にしたものです。

Screen Shot 2021-08-17 at 3.06.16 PM.png

Google Data Studioによるダッシュボード・定期レポート

Flaskと同じデータのData Studio版です。最新データしか見えていないのでデータポイントが1点のみです。時系列データにフィルタを適用して定期レポートとして利用することができます。

Screen Shot 2021-08-17 at 3.03.45 PM.png

Firebaseリアルタイムデータベース

ソース別にデータをツリーとして格納した場合の例です。ソースごとに遅延の集計を行います。

Screen Shot 2021-08-17 at 3.11.26 PM.png

計測結果

ソースごとの遅延の平均

実測値をサンプル数で割った平均をそのまま示します。ethは比較参照用に北米に設置したPCをEthernetケーブル接続し、同様のping測定をテスト全工程にわたって行った時の結果です。平均の分母となるサンプル数はソースごとに異なっています。

Screen Shot 2021-08-22 at 7.11.36 AM.png

遅延のばらつき

機器やSIMが1セットしかないためソースごとに集計タイミングの異なるテスト結果をそのまま示します。

日本

latency_per_source_jp.png

北米

latency_per_source_us.png

ヒストグラム

ソースそれぞれについて遅延のばらつきを示したものです。
latency_summary.png

料金

SORACOM Air (北米)

日本のSIMでは課金状態が確認できないため、北米での結果を示します。SORACOM Air/plan01sで機器1台が毎分Pingを計測し先に示したスキーマでPub/Subイベントで通知するユースケースでは、1日あたり0.60ドルほど必要でした。

billing_us.png

Google Cloud Platform

テスト期間中、ある時刻に北米2台、日本1台、の高々3台のIoTデバイスが動作しPub/Subイベントを毎分生成、また各デバイスともJWTセッションの更新リクエストが5分おきに発生します。まずGCPが受けたPub/Subイベントリクエストの推移を示します。

GCP_requests.png

具体的な課金対象となるingress/egressの様子はGCPのMonitoring画面の"Billable bytes transferred"メトリクスから取得できます。特に19−20日は3台が定常状態で動作していたので時間当たりのトラフィックもほぼ一定です。

traffic_all.png

詳細を確認するため、8月20日1:00am から30分のスナップショットを見ると5分おきにピークを持つ繰り返し波形が発生しているのが分かります。

!Screen Shot 2021-08-23 at 7.27.42 AM.png

課金は8月18-20日のGCP全体の合計として0.08ドルでした。Cloud StorageへのCSV上書き+キャッシュ更新頻度(Data Freshness)を1時間としたData Studioからのリクエスト、Google FunctionによるCloud Storage/Firebase realtime DBへ向けたAPI呼び出しが課金されているようです。

GCP -billing.png

まとめ

SORACOM Air/データSIMでネットワーク接続したデバイスは日本・北米ともに安定的にデータ収集できました。使用した遅延・サービス・デバイス固有の特性については次のようなことが分かりました。

  • 4G LTE/US の方が全般的に3G USBドングル/JP より遅延が小さい
  • 4G LTEを見る限り、4xfastとslowは帯域幅のみの違いで遅延の差はほとんどない
  • 3Gドングル/JP は4G LTEルータ/US よりも遅延のばらつきが大きい
  • GL.iNet 4G LTEルータはYifan 4G LTEルータよりも遅延が若干小さい?

Appendix:検証中に気づいた点

  • 北米拠点で4G LTEルータは設定を変えていないが同一サービスが"3G or GSM"と認識されてしまうことがあった。

Celltower.png

  • Pub/SubイベントをFirebaseへ投入するためのCloud Functionのドキュメントは動作コードを確定するのに時間を要した。

  • Firebase側のCloud FunctionのデバッグはFirebase Emulator Suiteをローカル環境にインストールして使うとデプロイ時間が0になりバグ検出→修正サイクルを最適化できる。

  • FirebaseのFunctionとGoogle Cloud Functionはともに同じ内容が表示されていて今後統合がさらに進んでいくのだろうかと感じた。

  • Raspberry Pi 3 model B+で Python3.9をビルドする際、ffiとsslパッケージをaptでインストールしてからmakeし、setuptoolsを最新にしないと動作するpipenvを作るのに何度かpython をビルドし直す必要があった。前者への対応として、対処策とは逆にリリースの新しい方向へ 6.soから7.soへのシンボリックリンクをマニュアルで作成した。

サンプルコード

GCSへアクセスするFlaskプロジェクトとFirebaseからのデータ分析に使ったNotebookを含んだリポジトリを次に置きました。

4
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?