LoginSignup
25
20

More than 5 years have passed since last update.

AWS IoT Shadowの update/delta で制御指令を無駄なく受け取ってみた

Last updated at Posted at 2015-10-27

概要

 AWS IoTで気になる機能としてShadowがあります。これはAWS IoTの基本機能であるMQTTブローカー自体には含まれない機能で、MQTTの上にAWSが独自に構築した機能です。
 下はLearn how AWS IoT works with this interactive tutorialでのShadowの説明の図。実際のデバイス(赤いランプが繋がれている)とは別に、クラウド上にデバイスの状態を反映した情報が存在することが視覚的に表されています。

スクリーンショット 2015-10-14 11.09.11.png

 僕の理解では、そもそもMQTTはインターネット(有線、無線)のようなネットワーク越しにデバイス(センサーが繋がってたりする)とブローカーが通信し、そのブローカーを介して別のデバイス(同じくセンサーや制御機器が繋がってたりする)あるいはスマートデバイス・PCが、お互いの情報や制御コマンドを送受信することを想定して設計されていると思っていて、そこには「通信の不安定さ」は必ず考慮に入れなければならない、というハードルがあります。
 特に最近は、通信に4Gや3Gのような無線ネットワークが使われるようになってきていて、これらはDoPaの時代から比べればかなり安定してきているとは思いますが、それでも同一LAN内での通信に比べて不安定です。

MQTTでは、そのような問題に対してWillやRetainを仕様として用意しているわけですが、AWS IoTではWillもRetainもサポートしていません。そしてそれらの問題に対するよりシンプルな(かつおそらくAWS側にも都合のよい)提案がShadowだと思っていて、デバイスから常に最新の状態がブローカーに送られてきていれば、なにもデバイスに「状態」を取りに行かなくても、MQTTあるいはHTTP経由でサーバー上に存在する"状態"を取得すれば、デバイスに直接取りに行くより、はるかに安定的に且つ短時間で取得できるわけです。

  • こういった機能から、Shadow というネーミングは、この機能をよく表現したシンプルで秀逸なネーミングだなーとか思います。

もうひとつ、Shadowには面白い機能があって、例えばsubscriberが update/delta をsubscribeしてる時に、publisherが updatestate: {desired: {switch: "ON"}} を含むデータを送ったとします。
state フィールドの desired はShadowに対して、状態変化を要求する意味があり、このメッセージはAWS IoT的には switch の状態を ON に変更したい、という意味があります。
(あくまで約束事の上での意味であり、そのような機能は自分で実装しなければなりません)

さて、この update を受け取る側、すなわち制御される側では、update/accepted をsubscribeして、全てのメッセージを受信することもできますが、先ほどの update/delta をsubscribeした場合、state 中の reported に含まれる同じフィールドの値 (この場合は switch ) と比較して、差異があった場合のみメッセージを配信します。

たとえば、updatedesired を受け取ってShadowに反映した結果、Shadowの状態が

state: {reported: {switch: "ON"}, desired: {switch: "ON"}}

だった場合は配信されず、

state: {reported: {switch: "OFF"}, desired: {switch: "ON"}}

だった場合は、ブローカーからsubscribeしているデバイスへメッセージが配信されます。

デバイス側ではこれを元になんらかの制御を行い、reported: {switch: "ON"} を含むメッセージをブローカーにpublishすると、Shadowにおけるreporteddesired の状態は同じとなり、以後、同じ内容の制御要求は送信されなくなります。

 これを使えば、publishするデバイスが一定時間おきに自身のデータをpublishする場合でも、subscriber側では statereporteddesired に差異がある場合のみメッセージを受け取ることができるため、通信データ量の削減になり、FOMAやLTEの場合は(契約内容にもよりますが)通信料金の削減、デバイスでのバッテリー節約などができそうです。

以下でこれらの機能を試してみようと思います。
なお、以下のRubyでのサンプルアプリを動かすに当たって AWS IoTとRuby製MQTTクライアントでPub/Subしてみたで書いた準備は既に終わっているものとします。

それと、今回のサンプルは本来、publishする側もsubscribeする側もQoSは1でするべきと思いますが、github.com/njh/ruby-mqttでQoS1にすると、一回のpublishなら良いのですが、何回も連続してpublishするようにループして、送信間隔を空けるためにsleepしたりするとエラーで落ちてしまいました... したがって、とりあえず今回はQoS0でやってます。

Publisher

アプリケーションによって色々な想定があるでしょうが、ここではデバイスにON・OFFのデジタルスイッチが繋がっており、その状態が変化した時にShadowに向かってpublishするものとします。
簡単なコードを以下のように書きました。

pub_sample.rb
require "mqtt"
require "json"

MQTT::Client.connect(host: "AAAAAAAAAA.iot.ap-northeast-1.amazonaws.com",
                     port: 8883,
                     ssl: true,
                     cert_file: "cert.pem",
                     key_file: "thing-private-key.pem",
                     ca_file: "rootCA.pem") do |client|

  loop do
    switch_state = (rand(0..10) % 2).zero? ? "OFF" : "ON"

    client.publish("$aws/things/sample_things/shadow/update",
                   {:state => {:desired => {:switch => switch_state}}}.to_json)

    puts "#{Time.now} : sent : #{switch_state}"
    sleep 3
  end
end

このpublisherはupdateへのpublishのたびに state:desired"ON" または "OFF" をランダムに埋め込んだJSONデータをブローカーに対して送ります。
送信間隔は3秒です。

Subscriber

続いて、先ほどのpublishメッセージを受け取るsubscriberを実装します。

sub_sample.rb
# -*- coding: utf-8 -*-
require "mqtt"
require "json"

MQTT::Client.connect(host: "AAAAAAAAAA.iot.ap-northeast-1.amazonaws.com",
                     port: 8883,
                     ssl: true,
                     cert_file: "cert.pem",
                     key_file: "thing-private-key.pem",
                     ca_file: "rootCA.pem") do |client|

  client.subscribe("$aws/things/sample_things/shadow/update/delta")

  loop do
    topic,json = client.get
    message = JSON.parse(json)
    switch_state = message["state"]["switch"]
    puts "#{Time.now} : recv : #{switch_state}"

    client.publish("$aws/things/sample_things/shadow/update",
                   {:state => {:reported => {:switch => switch_state}}}.to_json)
  end
end

subscriberはupdate/deltaをsubscribeして、データを繰り返し受信します。
また、受信のたびに、受信した "ON" または "OFF"state:reported としてブローカーに送信します。

Shadowの update に対する update/delta の動作

では sub_sample.rb および pub_sample.rb を実行して様子を観察します。
結果は以下のようになりました。

pub_sample.rb
$ ruby pub_sample.rb 

2015-10-27 12:00:34 +0900 : sent : OFF
2015-10-27 12:00:37 +0900 : sent : ON
2015-10-27 12:00:40 +0900 : sent : ON
2015-10-27 12:00:43 +0900 : sent : OFF
2015-10-27 12:00:46 +0900 : sent : ON
2015-10-27 12:00:49 +0900 : sent : OFF
2015-10-27 12:00:52 +0900 : sent : OFF
2015-10-27 12:00:55 +0900 : sent : OFF
2015-10-27 12:00:58 +0900 : sent : OFF
2015-10-27 12:01:01 +0900 : sent : OFF
2015-10-27 12:01:04 +0900 : sent : OFF
2015-10-27 12:01:07 +0900 : sent : ON

直後にsub_shadow.rb側でメッセージを受信しました。

sub_sample.rb
$ ruby sub_sample.rb

2015-10-27 12:00:37 +0900 : recv : ON
2015-10-27 12:00:43 +0900 : recv : OFF
2015-10-27 12:00:46 +0900 : recv : ON
2015-10-27 12:00:49 +0900 : recv : OFF
2015-10-27 12:01:07 +0900 : recv : ON

たしかにドキュメントに記載してあるとおり、subscriberの側では状態が変化した時のみメッセージを受信しました。publisher側が2回以上同じデータを続けて送った時はsubscriber側ではなにもメッセージを受信していない様子がわかります。

感想など

今回はShadowの中の updateupdate/delta を試してみましたが、他にもTopicによって色々と機能があるようです。 update/delta はMQTTでサービス構築する際には結構助けになる機能のように思います。厳密には update/delta でメッセージを受信してから、実際の機器に反映して reported をpublishし終わる前に同じ内容の state:desired を持つメッセージをブローカーが受け取る可能性はあるので、絶対に同じ内容のstateを持つメッセージを複数回連続で受信しないとは言えないと思います。したがってアプリの仕様としては、いずれにしてもsbscriber側で受信した際に実際の状態との差異をチェックする必要はあるかもしれません。

 ただ、特に無線モジュール等でデバイスとAWS IoTが繋がっている場合、通信料金や通信量上限は一番最初に立ちはだかってくる問題だと思います。しかしながら、自動スケールするMQTTブローカーでこのような機能を自作するのは簡単ではないでしょうし、このような機能が最初から用意してあるというのはアプリを作る側にとってはずいぶんと楽ができると思います。
 
 欲を言えば、メッセージのフォーマットにJSONだけでなくMessagePackもあって、かつ両者を1つのブローカー上でtopicを変えるなどして混在して使用できたらなーとか思いました。
 例えばセンサーや制御機器のつながったデバイスはMessagePackを使用してメッセージを update し、Webアプリではそのデータを update/acceptedupdate/delta からJSONで取得するとか。
 IoT的なネットワークではメッセージを大量に送ることになるでしょうから、センサーやGWからpublishするメッセージは、一回分を少しでも小さくしたいところですし、おそらくMessagePackのほうがバッテリーにも優しい気がします(計測したわけじゃないですけど)。
 
 Shadowという機能はこれからのIoTアプリ開発を色々と助けてくれるでしょうし、AWS IoTを使わない場合であっても「Shadow」という考え方、サーバー上にデバイスの状態を仮想的に置いて、そこを介してデータの送受信を非同期に行う概念は自社でなんらかのIoTアプリを一から作る場合でもとっても参考になる考え方だなーと思いました。

25
20
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
25
20