Watson IoT PlatformのアプリケーションをFlaskで実装しようとしてハマった話をまとめておきます。
Flaskの理解が浅かったせいで、やはりちゃんと理解して使わないといけないと猛省しました。
環境
Watson IoT Platformのアプリケーション向けにいくつかのプログラミングインターフェイスが提供されていますが、今回MQTTを使用してIoTデバイスの接続状況を報告するDevice Statusをsubscribeするアプリケーションを作ってみました。
最終的にはIBM Cloud FoundryにコードをプッシュしたかったのでFlaskで実装することにしましたが、ここでしっかりとFlaskの理解を深めておくべきでした....
Python 2.7
paho-mqtt 1.5.0
Flask 1.1.2
準備
IBM Cloud Foundryでアプリケーションの作成
下記の記事を参考にパブリック・アプリケーションを作成します。
https://cloud.ibm.com/docs/cloud-foundry-public?topic=cloud-foundry-public-getting-started
上記手順の中で出てくるサンプルコードをgitから入手し、これに手を入れて取り敢えずHTTPリクエストに応答できるようにします。
from flask import Flask
import os
app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))
@app.route('/')
def root():
return 'Hello, world'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=port, debug=True)
このプログラムをローカル実行してみます。
# python hello.py
* Serving Flask app "hello" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 210-659-291
確認のためcurlでアクセスしてみます。無事ポート8000でHTTPリクエストに応答できているようです。
# curl localhost:8000
Hello, world
これで最低限HTTPリクエストに応答できるアプリケーションができました。
Watson IoT PlatformのAPIキーとトークンの作成
Watson IoT Platformは、IoTデバイスが収集した情報をpublishするためのMQTTクライアント(二つの方式がありそれぞれデバイス、ゲートウェイと呼ぶ)と、IoTデバイスが収集したデータを受け取って処理したりIoTデバイスのステータス確認や操作をするMQTTクライアント(アプリケーション)の二種類のクライアント接続が可能です。
今回はアプリケーションとして接続し、Watson IoT Platformが管理するDevice Statusの変更通知を受け取るため当該トピックをsubscribeします。接続の詳細については下記のリンクに記載されています。
https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/mqtt.html
手始めにWatson IoT PlatformにアプリケーションとしてMQTT接続するために、APIキーとトークンを作成します。下記のリンクを参照して作業を進めます。
https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/app_dev_index.html
作成されたAPIキーとトークンを使ってMQTT接続することで、IoTデバイスがWatson IoT Platformに接続あるいは切断するたびにDevice Statusの変更通知を受けることができるようになるはずです。
アプリケーションを実装してみたところ...
Flaskを使用したhello.pyに、作成したAPIキーとトークンでWatson IoT Platformへ接続するコードを追加します。
Watson IoT PlatformへMQTT接続するために、client id、userid, passwordを指定する必要があります。
それぞれWatson IoT Platformびより割り当てられているorganization idとAPIキーおよびトークンを指定のフォーマットで連結し作成します。このフォーマットについては下記のリンクのMQTT authenticationに詳しく記載されています。
https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/mqtt.html
以下のサンプルコードではorganization idを'oooooo', APIキーを'kkkkkkkkkk', トークンを'tttttttttttttttttt'としていますが、コードを動かす際にはWatson IoT Platformにより提供された値に変更する必要があります。
from flask import Flask
import os
import paho.mqtt.client as mqtt
from datetime import datetime
def on_connect(client, userdata, flags, respons_code):
client.on_message = on_message
client.subscribe('iot-2/type/+/id/+/mon')
print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt connected")
def on_disconnect(client, userdata, rc):
print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt disconnected")
def on_message(client, userdata, msg):
print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt message arrived")
client = mqtt.Client(client_id='a:oooooo:appl1', protocol=mqtt.MQTTv311)
client.username_pw_set('a-oooooo-kkkkkkkkkk', password='tttttttttttttttttt')
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect('de.messaging.internetofthings.ibmcloud.com', 1883, 120)
client.loop_start()
app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))
@app.route('/')
def root():
return "Hello, world"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=port, debug=True)
このようなコードを準備し動作確認してみることにしました。ポート8000からのHTTPリクエストに応答しつつ、MQTTメッセージの受信が確認できるだろうという期待です。
しかし実際にはコードを実行すると....
# python hello.py
* Serving Flask app "hello" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
* Restarting with stat
2020/05/13 18:13:26: mqtt connected
* Debugger is active!
* Debugger PIN: 210-659-291
2020/05/13 18:13:26: mqtt message arrived
2020/05/13 18:13:27: mqtt disconnected
2020/05/13 18:13:27: mqtt connected
2020/05/13 18:13:27: mqtt message arrived
2020/05/13 18:13:28: mqtt connected
2020/05/13 18:13:28: mqtt disconnected
2020/05/13 18:13:29: mqtt message arrived
2020/05/13 18:13:30: mqtt disconnected
2020/05/13 18:13:30: mqtt connected
2020/05/13 18:13:30: mqtt message arrived
2020/05/13 18:13:31: mqtt disconnected
2020/05/13 18:13:31: mqtt connected
このように一旦MQTTでの接続が確立されるもののすぐに切れてしまいました。その後paho-mqttが予期しないセッションの切断を受け自動的に再接続していますが、それもまたすぐに切れてしまい、以降再接続・切断を繰り返していきます。
APIキーやトークンが間違っていた場合そもそもMQTT接続に失敗するはずですし、MQTTのkeepaliveでタイムアウトしているのかとメッセージの間隔を調整してみたりもしたのですがこれも改善にはつながりませんでした。
たまたま通信経路のどこがに障害があるのかとも疑い数日放置してみましたが症状は変わらずでした。
解決編
困り果ててしまいstackoverflowに質問を投稿してみましたところ、二人の方からclient idが重複しているのではないかとアドバイスしていただきました。
確かにMQTT Brokerに同一のclient idでの接続はできません。なるほどとは思ったものの重複する理由が思い当たりません。hello.pyは同時に1プロセスしか実行していません。またclient idはorganizagtion idとAPIキーを連結して生成するのがルールのなのですが、organization idはともかくAPIキーを他の目的に流用したりもしていません。何らかの理由でWatson IoT Platform上で古い接続情報が残ってしまっていてそれで接続できなくなるのかもしれないと思い、APIキーを新しく生成してみたのですが、そちらを使っても症状は変わりませんでした。
どうやらclient idの重複が原因ではなかったのかとあきらめかけていたのですが、再度hello.pyの出力を眺めていると'Restarting with stat'の文字列が目に入りました。
そう言えばこれは何だろう?
これまでMQTT部分を疑い色々確認してきたのですが、このFlaskの謎メッセージについて調べてみることにしました。
結果、
- Flaskをデバッグモードを有効化して実行すると、アプリケーションを構成するコードの変更を検知して自動的にアプリケーションを再起動する仕組みがある。
- この時Flaskは暗黙に子プロセスを起動し、親プロセスがコードの変更の監視と子プロセスの再起動を担当し、子プロセスがWEBアプリケーションとしてHTTPリクエストの処理をする。
というようなことが分かってきました。
確認してみると、
# ps -f | grep hello.py
501 20745 2576 0 10:39PM ttys005 0:00.35 /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python hello.py
501 20748 20745 0 10:39PM ttys005 0:00.36 /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python /Users/(省略)/hello.py
確かに親子関係にある二つのhello.pyプロセスが実行されています。
この状況であればcllient idの重複は納得できます。親プロセス(pid: 20745)と子プロセス(pid: 20748)からほぼ同時に同じclient idを使用してMQTT接続にいってしまうわけです。うーん、なるほど。
ではどうやってclient idの重複を回避するのか。デバッグモードを無効にすれば簡単ですが、せっかく用意されている便利機能を使えないのも残念です。さらに調べると親プロセスは子プロセスを起動する前に'WEERKZUG_RUN_MAIN'ということが分かりました。この環境変数を確認し、親プロセスではMQTT接続をしないようにすれば大丈夫そうです。
コードの修正と動作確認
上記デバッグモードに関する理解を踏まえ、
- MQTTの接続をする前に環境変数'WEERKZUG_RUN_MAIN'のチェックをし、この環境変数が定義されている場合のみ接続する。
- プロセス終了前にMQTTの接続を切断し、切断の完了を待つようにする。
- ついでにon_message()の処理を追加し、所望のデバイスのステータスを追跡するようにし、HTTPリクエストに応答してステータスを報告する。
のように再度hello.pyを修正しました。二番目の修正は親プロセスがコードの変更を検知してreloadする際に、子プロセス1を停止して子プロセス2を新たに起動するわけですが、子プロセス1のMQTT接続の切断を待たずに子プロセス2が起動してしまうと同じことが起きてしまいそうなので、念のため追加しておくことにしました。
from flask import Flask
import os
import threading
import json
import paho.mqtt.client as mqtt
from datetime import datetime
def on_connect(client, userdata, flags, respons_code):
client.on_message = on_message
client.subscribe('iot-2/type/+/id/+/mon')
print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt connected")
cond = threading.Condition()
notified = False
def on_disconnect(client, userdata, rc):
global notified
print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt disconnected")
with cond:
notified = True
cond.notify()
status = 'Unknown'
def on_message(client, userdata, msg):
global status
response_json = msg.payload.decode("utf-8")
response_dict = json.loads(response_json)
if(response_dict["ClientID"] == '追跡したいデバイス・ゲートウェイのclient idをここに指定'):
if( response_dict["Action"] == "Disconnect" ):
status = "Disconnected"
elif( response_dict["Action"] == "Connect" ):
status = "Connected"
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
client = mqtt.Client(client_id='a:oooooo:appl1', protocol=mqtt.MQTTv311)
client.username_pw_set('a-oooooo-kkkkkkkkkk', password='tttttttttttttttttt')
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect('de.messaging.internetofthings.ibmcloud.com', 1883, 120)
client.loop_start()
app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))
@app.route('/')
def root():
return "Status: " + status
if __name__ == '__main__':
app.run(host='0.0.0.0', port=port, debug=True)
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
client.loop_stop()
client.disconnect()
with cond:
if( not notified ):
cond.wait()
上記のようにコードを修正して再度実行してみました。
# python hello.py
* Serving Flask app "hello" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 210-659-291
2020/05/13 23:04:28: mqtt connected
127.0.0.1 - - [13/May/2020 23:04:38] "GET / HTTP/1.1" 200 -
接続・切断の繰り返しが止まりました!
curlでHTTPリクエストしてみると、
# curl localhost:8000
Status: Disconnected
MQTTメッセージの処理も上手くいっているようです。
試しに登録済みのIoTデバイスを接続してみます。
今回は、mosquitto_subを使ってDevice commandをsubscribeすることでIoTデバイスの代わりとします。
設定や接続の方法については下記のリンクの記事が詳しいです。
https://qiita.com/kuraoka/items/5380f6b5e97e8cd1ad98
# mosquitto_sub -h de.messaging.internetofthings.ibmcloud.com -u use-token-auth -P "tttttttttttttttttt" -i (hello.pyで監視しているclient id) -t 'iot-2/type/(type)/id/(id)/cmd/control/fmt/json'
上記のようにIoTデバイスからMQTT接続がされた状態で、再度curlでHTTPリクエストをしてみます。
# curl localhost:8000
Status: Connected
今度はConnectedのステータスが返ってきました。mosquitto_subでMQTT接続したことで、Device statusメッセージがhello.pyに送信され、on_message()関数の中でstatus変数が変更されたようです。
これで全て期待通りです。
まとめ
今回初めてFlaskを使用してみました。サンプルコードから最小のコードを作成しそこから実装したい部分を付け足していこうとしたら、思わぬところで足を掬われてしまった格好です。こういう試行錯誤をするために習作したという考え方もあるのですが、やはり多少なりとも最初に勉強してから使うべきだったようです。
Watson IoT PlatformはIoTデバイスを管理・制御しデータ収集するための強力なクラウドサービスです。
IoTデバイスが収集したデータをクラウドにアップしていく部分がシステムの根幹になるわけですが、IoTデバイスのステータスを監視したり、必要に応じて制御をするアプリケーションもクラウド上で実装したくなるであろうと思い、今回の習作を思い立ちました。これでステータスの監視部分については最低限の動作が確認できたので、Device commandの送信によるIoTデバイスの制御などもう少し手を広げて調査していきたいと思います。