概要
AWS IoTで気になる機能としてShadowがあります。これはAWS IoTの基本機能であるMQTTブローカー自体には含まれない機能で、MQTTの上にAWSが独自に構築した機能です。
下はLearn how AWS IoT works with this interactive tutorialでのShadowの説明の図。実際のデバイス(赤いランプが繋がれている)とは別に、クラウド上にデバイスの状態を反映した情報が存在することが視覚的に表されています。
僕の理解では、そもそもMQTTはインターネット(有線、無線)のようなネットワーク越しにデバイス(センサーが繋がってたりする)とブローカーが通信し、そのブローカーを介して別のデバイス(同じくセンサーや制御機器が繋がってたりする)あるいはスマートデバイス・PCが、お互いの情報や制御コマンドを送受信することを想定して設計されていると思っていて、そこには「通信の不安定さ」は必ず考慮に入れなければならない、というハードルがあります。
特に最近は、通信に4Gや3Gのような無線ネットワークが使われるようになってきていて、これらはDoPaの時代から比べればかなり安定してきているとは思いますが、それでも同一LAN内での通信に比べて不安定です。
MQTTでは、そのような問題に対してWillやRetainを仕様として用意しているわけですが、AWS IoTではWillもRetainもサポートしていません。そしてそれらの問題に対するよりシンプルな(かつおそらくAWS側にも都合のよい)提案がShadowだと思っていて、デバイスから常に最新の状態がブローカーに送られてきていれば、なにもデバイスに「状態」を取りに行かなくても、MQTTあるいはHTTP経由でサーバー上に存在する"状態"を取得すれば、デバイスに直接取りに行くより、はるかに安定的に且つ短時間で取得できるわけです。
- こういった機能から、
Shadow
というネーミングは、この機能をよく表現したシンプルで秀逸なネーミングだなーとか思います。
もうひとつ、Shadowには面白い機能があって、例えばsubscriberが update/delta
をsubscribeしてる時に、publisherが update
で state: {desired: {switch: "ON"}}
を含むデータを送ったとします。
state
フィールドの desired
はShadowに対して、状態変化を要求する意味があり、このメッセージはAWS IoT的には switch
の状態を ON
に変更したい、という意味があります。
(あくまで約束事の上での意味であり、そのような機能は自分で実装しなければなりません)
さて、この update
を受け取る側、すなわち制御される側では、update/accepted
をsubscribeして、全てのメッセージを受信することもできますが、先ほどの update/delta
をsubscribeした場合、state
中の reported
に含まれる同じフィールドの値 (この場合は switch
) と比較して、差異があった場合のみメッセージを配信します。
たとえば、update
で desired
を受け取ってShadowに反映した結果、Shadowの状態が
state: {reported: {switch: "ON"}, desired: {switch: "ON"}}
だった場合は配信されず、
state: {reported: {switch: "OFF"}, desired: {switch: "ON"}}
だった場合は、ブローカーからsubscribeしているデバイスへメッセージが配信されます。
デバイス側ではこれを元になんらかの制御を行い、reported: {switch: "ON"}
を含むメッセージをブローカーにpublishすると、Shadowにおけるreported
と desired
の状態は同じとなり、以後、同じ内容の制御要求は送信されなくなります。
これを使えば、publishするデバイスが一定時間おきに自身のデータをpublishする場合でも、subscriber側では state
の reported
と desired
に差異がある場合のみメッセージを受け取ることができるため、通信データ量の削減になり、FOMAやLTEの場合は(契約内容にもよりますが)通信料金の削減、デバイスでのバッテリー節約などができそうです。
以下でこれらの機能を試してみようと思います。
なお、以下のRubyでのサンプルアプリを動かすに当たって AWS IoTとRuby製MQTTクライアントでPub/Subしてみたで書いた準備は既に終わっているものとします。
それと、今回のサンプルは本来、publishする側もsubscribeする側もQoSは1でするべきと思いますが、github.com/njh/ruby-mqttでQoS1にすると、一回のpublishなら良いのですが、何回も連続してpublishするようにループして、送信間隔を空けるためにsleepしたりするとエラーで落ちてしまいました... したがって、とりあえず今回はQoS0でやってます。
Publisher
アプリケーションによって色々な想定があるでしょうが、ここではデバイスにON・OFFのデジタルスイッチが繋がっており、その状態が変化した時にShadowに向かってpublishするものとします。
簡単なコードを以下のように書きました。
require "mqtt"
require "json"
MQTT::Client.connect(host: "AAAAAAAAAA.iot.ap-northeast-1.amazonaws.com",
port: 8883,
ssl: true,
cert_file: "cert.pem",
key_file: "thing-private-key.pem",
ca_file: "rootCA.pem") do |client|
loop do
switch_state = (rand(0..10) % 2).zero? ? "OFF" : "ON"
client.publish("$aws/things/sample_things/shadow/update",
{:state => {:desired => {:switch => switch_state}}}.to_json)
puts "#{Time.now} : sent : #{switch_state}"
sleep 3
end
end
このpublisherはupdate
へのpublishのたびに state:desired
に "ON"
または "OFF"
をランダムに埋め込んだJSONデータをブローカーに対して送ります。
送信間隔は3秒です。
Subscriber
続いて、先ほどのpublishメッセージを受け取るsubscriberを実装します。
# -*- coding: utf-8 -*-
require "mqtt"
require "json"
MQTT::Client.connect(host: "AAAAAAAAAA.iot.ap-northeast-1.amazonaws.com",
port: 8883,
ssl: true,
cert_file: "cert.pem",
key_file: "thing-private-key.pem",
ca_file: "rootCA.pem") do |client|
client.subscribe("$aws/things/sample_things/shadow/update/delta")
loop do
topic,json = client.get
message = JSON.parse(json)
switch_state = message["state"]["switch"]
puts "#{Time.now} : recv : #{switch_state}"
client.publish("$aws/things/sample_things/shadow/update",
{:state => {:reported => {:switch => switch_state}}}.to_json)
end
end
subscriberはupdate/delta
をsubscribeして、データを繰り返し受信します。
また、受信のたびに、受信した "ON"
または "OFF"
を state:reported
としてブローカーに送信します。
Shadowの update
に対する update/delta
の動作
では sub_sample.rb
および pub_sample.rb
を実行して様子を観察します。
結果は以下のようになりました。
$ ruby pub_sample.rb
2015-10-27 12:00:34 +0900 : sent : OFF
2015-10-27 12:00:37 +0900 : sent : ON
2015-10-27 12:00:40 +0900 : sent : ON
2015-10-27 12:00:43 +0900 : sent : OFF
2015-10-27 12:00:46 +0900 : sent : ON
2015-10-27 12:00:49 +0900 : sent : OFF
2015-10-27 12:00:52 +0900 : sent : OFF
2015-10-27 12:00:55 +0900 : sent : OFF
2015-10-27 12:00:58 +0900 : sent : OFF
2015-10-27 12:01:01 +0900 : sent : OFF
2015-10-27 12:01:04 +0900 : sent : OFF
2015-10-27 12:01:07 +0900 : sent : ON
直後にsub_shadow.rb側でメッセージを受信しました。
$ ruby sub_sample.rb
2015-10-27 12:00:37 +0900 : recv : ON
2015-10-27 12:00:43 +0900 : recv : OFF
2015-10-27 12:00:46 +0900 : recv : ON
2015-10-27 12:00:49 +0900 : recv : OFF
2015-10-27 12:01:07 +0900 : recv : ON
たしかにドキュメントに記載してあるとおり、subscriberの側では状態が変化した時のみメッセージを受信しました。publisher側が2回以上同じデータを続けて送った時はsubscriber側ではなにもメッセージを受信していない様子がわかります。
感想など
今回はShadowの中の update
や update/delta
を試してみましたが、他にもTopicによって色々と機能があるようです。 update/delta
はMQTTでサービス構築する際には結構助けになる機能のように思います。厳密には update/delta
でメッセージを受信してから、実際の機器に反映して reported
をpublishし終わる前に同じ内容の state:desired
を持つメッセージをブローカーが受け取る可能性はあるので、絶対に同じ内容のstateを持つメッセージを複数回連続で受信しないとは言えないと思います。したがってアプリの仕様としては、いずれにしてもsbscriber側で受信した際に実際の状態との差異をチェックする必要はあるかもしれません。
ただ、特に無線モジュール等でデバイスとAWS IoTが繋がっている場合、通信料金や通信量上限は一番最初に立ちはだかってくる問題だと思います。しかしながら、自動スケールするMQTTブローカーでこのような機能を自作するのは簡単ではないでしょうし、このような機能が最初から用意してあるというのはアプリを作る側にとってはずいぶんと楽ができると思います。
欲を言えば、メッセージのフォーマットにJSONだけでなくMessagePackもあって、かつ両者を1つのブローカー上でtopicを変えるなどして混在して使用できたらなーとか思いました。
例えばセンサーや制御機器のつながったデバイスはMessagePackを使用してメッセージを update
し、Webアプリではそのデータを update/accepted
や update/delta
からJSONで取得するとか。
IoT的なネットワークではメッセージを大量に送ることになるでしょうから、センサーやGWからpublishするメッセージは、一回分を少しでも小さくしたいところですし、おそらくMessagePackのほうがバッテリーにも優しい気がします(計測したわけじゃないですけど)。
Shadowという機能はこれからのIoTアプリ開発を色々と助けてくれるでしょうし、AWS IoTを使わない場合であっても「Shadow」という考え方、サーバー上にデバイスの状態を仮想的に置いて、そこを介してデータの送受信を非同期に行う概念は自社でなんらかのIoTアプリを一から作る場合でもとっても参考になる考え方だなーと思いました。