PythonからAWS IoTに送るデータを圧縮して、AWS IoT Rule Actions側(+Lambda連携)で解凍する方法を書きます。
データ送信スクリプト
以下のPythonスクリプトを使ってデータを送信します。
import sys
import json
import bz2
import boto3
# 適当なデータを準備
data = {
"int": 1,
"float": 1.1,
"str": "abcdefghijklmnopqrstuvwxyz",
"japanese": "あいうえおかきくけこさしすせそたちつてと",
}
# Bytes形式に変換しサイズ確認
json_data = bytes(json.dumps(data),"utf-8")
print(f"original size -> {sys.getsizeof(json_data)} bytes") # -> 230bytes
# bz2で圧縮しサイズ確認
json_data_bz2 = bz2.compress(json_data)
print(f"bz2 compressed size -> {sys.getsizeof(json_data_bz2)} bytes") # -> 184bytes
# AWS IoTに圧縮済みデータをPublish
client = boto3.client("iot-data")
topic_name = "my/topic"
client.publish(topic=topic_name, payload=json_data_bz2)
注釈は以下の通りです。
- 上記のテスト用のデータだと圧縮率低いです。IoTを通すような小さいデータだと圧縮かけた方が大きくなる場合もあるので注意してください。
- bz2を使っていますが、zlibやgzipなど他の圧縮形式の方がサイズが小さくなる場合もあるので、サンプルデータ等でサイズを確認してみることをお勧めします
AWS側のMQTTテストクライアントで確認してみると、圧縮済みのバイナリデータが飛んできていることがわかります。このままだと使えないので、こちらを解凍して元のJSONデータを復元します。
解凍用のLambdaを準備
Lambdaを準備します。こちらもPythonで作成しました。今回の処理はARMアーキテクチャでも動きますので、安価なARMを選択しています。
コードは以下の通りです。
先程のAWS IoTに渡ってきたバイナリデータを event["payload"]
に渡します。この際にAWS IoTからLambdaへはByteをそのまま渡せないので、base64エンコードして渡してくるので、base64でデコードもしています。
import json
import base64
import bz2
def lambda_handler(event, context):
decompressed_data = json.loads(bz2.decompress(base64.b64decode(event["payload"])).decode("utf-8"))
return decompressed_data
AWS IoT Rule作成
上で作成したLambdaにデータを渡すためのAWS IoTルールを作成します。
my/topic
のトピックから取得したバイナリデータを encode(*,"base64")
でbase64エンコードして、JSON形式でLambdaにインプットします。
返り値は as payload
として返すようにします。
SELECT
aws_lambda("arn:aws:lambda:ap-northeast-1:xxxxxx:function:xxxxxx",{"payload":encode(*,"base64")}) as payload
FROM 'my/topic'
アクションに再パブリッシュで my/topic/decompressed
のトピックに投げるように設定しておきます。
Lambdaのトリガー登録
先程作成したLambdaに、先程作成したAWS IoTルールをトリガーとして設定します。
結果確認
上手く設定できていれば解凍されたデータが my/topic/decompressed
のトピックで受信できることが確認できると思います。
最後に
扱うデータに対して圧縮の効果がどれくらいあるのか、通信量の削減が、Lambda等のAWSコスト増加と比較してコストメリットがあるのかなどなど、確認が必要な項目はありますが、役立つ場面もありそうな気がしています。
参考URL