0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AWS IoT Twinmakerを使って、複数データをまとめて管理してみる①

Last updated at Posted at 2023-03-19

AWS IoT系のサービスのひとつ、IoT Twinmakerをご存知でしょうか?
2022年4月頃にGAされた、デジタルツインを迅速・簡単に構築できるサービスで、宣伝文だけでキラキラ感満載の面白そうなサービスとなっています。

re:Invent2022でも、Twinmaker絡みのワークショップが開催されていて、AWSの公式からもちょこちょこ「使ってみた」系の記事も紹介されています!

ただ、複数のデータを管理してみるとどうなるかといった紹介があまりなかったので、今回実際に触ってみながらどれぐらい実用的に使えるのかといったことを試してみたので、3回にわたって紹介します!

AWS IoT Twinmakerって?

実世界のデジタルツインを容易に、素早くデジタル空間に再現できるサービスです。IoTセンサーを始めとした、様々なデータソースに簡単にアクセスでき、臨場感あふれる3Dビューを実現し、時空間的に分析可能な、マネージドサービスと紹介されています。

AWS IoT TwinMakerは、次に説明する4つの機能を順に使用して、各種データを取り込み、3Dのモデルと組み合わせてユーザーに表示するシステムを作り上げている。

  1. データコネクタ
  2. モデルビルダー
  3. シーンコンポーザー
  4. アプリケーション

それぞれ簡単に説明すると。。


1. データコネクタ

異なる複数のデータソースへ接続を行う箇所です。

  • 既存のデータソースに接続して参照可能。
  • AWS IoT SiteWise、Amazon S3、Amazon Kinesis Video Streamsへの接続を、標準でサポート。
  • 独自のコネクター データコネクタ(Amazon Timestream, Snowflake 等)も作成可能。

2. モデルビルダー

エンティティ(物理システムそのもの)の関係とデータのプロパティを定義する箇所です。

  • ナレッジグラフを作成するための柔軟なモデリング
  • データストアからデータを取得するためのシンプルな API

3. シーンコンポーザー

モデルとデータより、3Dビューの作成する箇所です。

  • 物理システムをリアルタイムに反映する、3Dビジュアルを構築
  • オープンソースのgLTF フォーマットに対応し、CADや点群ファイルからの変換にも対応
  • データやアラートを3Dビューに重ねて表示可能
  • 物理的な変化に合わせて簡単にアップデート可能

4. アプリケーション

ユーザーがオペレーションやプランニングを実施するためのウェブアプリを構築する箇所です。

  • Grafana向けのローコードプラグイン搭載
  • Open SourceのGrafanaとAmazon Managed Grafana の両方に対応
  • 開発者向けの SDK を使ってカスタムアプリケーションを開発、またはAWS パートナーソリューションとの統合が可能

ということです。データ取得から表示するところまで、一括で引き受けられるサービスなんですが、データを取得するところやアウトプットする際には、他のAWSサービスなどを使う必要があります。
image.png

ということで、今回は複数センサーが搭載されたRaspberry Piを使用して、上の構成図のようにIoT CoreやTimestream、IoT Twinmakerを経由してGrafanaに測定データを表示させてみました。

第一弾として、本記事ではIoT Coreを使用して、複数センサーからデータを取得する方法についてご紹介します!

AWS IoT Coreの設定

IoT Coreは、IoTデバイスとAWSを繋ぐ架け橋となるサービスです。IoT Coreで証明書を作成し、モノとして登録したうえでデバイス側に配置することで、デバイス別にAWSへデータを送信することが可能になります。

ということで、IoT Coreではポリシーの作成→証明書作成の順番でご紹介していきます。

ポリシー作成

まず、AWSアカウントやIoTのトピックを許可するポリシーをIoT Coreにて作成していきます。今回は、MQTTを使用してIoTメッセージを識別するため、IoT Core側ではデバイスからのメッセージのトピック名を指定して許可する必要があるわけです。

IoT Coreのページより、左ペインの「ポリシー」を選択し、作成していきます。
ポリシーはJson形式で記載していきます。任意のポリシー名を入力し、以下該当箇所を埋めたうえでポリシーを作成してください。なお、今回はデバイスごとにポリシーを作成するので、トピック名についてはデバイスを識別できるように、raspberry/Rpi1などにしました。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:Publish",
      "Resource": "arn:aws:iot:[AWSアカウントのリージョン]:[AWSアカウントNo]:topic/[デバイス側で設定するトピック名]"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Subscribe",
      "Resource": "arn:aws:iot:[AWSアカウントのリージョン]:[AWSアカウントNo]:topicfilter/[デバイス側で設定するトピック名]"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": "*"
    }
  ]
}

証明書の作成

証明書の作成を行います。今回は、複数デバイスを登録するため、後述するモノ証明書については、登録するデバイス分、準備が必要となります。

まず、左ペインよりモノを選択してください。そこから、デバイス単位のモノを作成していきます。こちらは、特に制約などはないですが、後ほどどのデバイスと対応しているか、わかりやすくするために、Rpi1などと登録することをオススメします。
作成途中で、ポリシー選択画面が表示されるので、先ほど作成したポリシーを選択しましょう。
そして、重要ポイント。証明書類については、ひとまず全部ダウンロードしておきましょう。(実際に使うのは一部ですが)

これで、IoT Core側で、デバイスから情報を受け取る準備は完了です。
ちなみに、IoT Coreのセキュリティを高めたいケースなどでは、AWS IoT Device Defenderなどといったサービスを使用することが推奨されていそうですが、今回は個人的に試してみてるだけなのでお見送り。。

Raspberry Piの設定

ラズパイについては、Raspberry Pi 4を5台使用しました。
キッティングについては、世の中にたくさん参考になる記事がありますので、そちらをご参考にしてください。

一通り、キッティングが完了したら、試しにセンサーが正しく観測できるか試してみましょう。
私は今回、以下のセンサーを用いてデータ取得を実施しました。

取得対象データ センサー
CO2 CO2センサモジュール MH-Z19C
温度、湿度、気圧 BME680 温湿度・気圧・ガスセンサモジュールキット
照度 TSL25721使用 照度センサーモジュール
人感 焦電型赤外線(人感)センサーモジュール SB412A
臭気 TGS8100 においセンサモジュールキット

上記記事の通りに設定すれば、デバイス側の設定は完了です!
ちなみに、先ほどIoT Coreでダウンロードした、ルートCA1証明書や公開キー・秘密キー (pem.key) をここで使用します。

あとは、センサーの公式HPなどを参考にしつつ、ラズベリーパイ側で実行するコードを記載して完了です。
参考までに、上記センサーの情報は以下スクリプトで送信していました。
(10秒ごとに取得データをAWSに送信するプログラムになっています。


from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import logging
import time
import argparse
import json
import re
import subprocess
import RPi.GPIO as GPIO
from time import sleep
import bme680
import smbus
from gpiozero import MCP3002
from gpiozero import OutputDevice
from gpiozero.pins.pigpio import PiGPIOFactory

i2c = smbus.SMBus(1)

AllowedActions = ['both', 'publish', 'subscribe']

# Custom MQTT message callback
def customCallback(client, userdata, message):
    print("Received a new message: ")
    print(message.payload)
    print("from topic: ")
    print(message.topic)
    print("--------------\n\n")

# Read in command-line parameters
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub",
                    help="Targeted client id")
parser.add_argument("-t", "--topic", action="store", dest="topic", default="raspberry/Rpi1", help="Targeted topic")
parser.add_argument("-m", "--mode", action="store", dest="mode", default="both",
                    help="Operation modes: %s"%str(AllowedActions))
parser.add_argument("-M", "--message", action="store", dest="message", default="Hello World!",
                    help="Message to publish")

args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
port = args.port
useWebsocket = args.useWebsocket
clientId = args.clientId
topic = args.topic

# Port defaults
if args.useWebsocket and not args.port:  # When no port override for WebSocket, default to 443
    port = 443
if not args.useWebsocket and not args.port:  # When no port override for non-WebSocket, default to 8883
    port = 8883

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)  # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2)  # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)  # 5 sec

# センサーごとのプログラム
# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
if args.mode == 'both' or args.mode == 'subscribe':
    myAWSIoTMQTTClient.subscribe(topic, 0, customCallback)
time.sleep(2)

# Jinkan Sensor
human_pin = 13
GPIO.setmode(GPIO.BCM)
GPIO.setup(human_pin, GPIO.IN)

# BME680
try:
    sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY)
except (RuntimeError, IOError):
    sensor = bme680.BME680(bme680.I2C_ADDR_SECONDARY)
sensor.set_gas_heater_temperature(320)
sensor.set_gas_heater_duration(150)
sensor.select_gas_heater_profile(0)

# TSL25721
TSL2572_ADR      = 0x39
TSL2572_COMMAND  = 0x80
TSL2572_TYPE_REP = 0x00
TSL2572_TYPE_INC = 0x20
TSL2572_ALSIFC   = 0x66

TSL2572_SAI   = 0x40
TSL2572_AIEN  = 0x10
TSL2572_WEN   = 0x80
TSL2572_AEN   = 0x02
TSL2572_PON   = 0x01

TSL2572_ENABLE   = 0x00
TSL2572_ATIME    = 0x01
TSL2572_WTIME    = 0x03
TSL2572_AILTL    = 0x04
TSL2572_AILTH    = 0x05
TSL2572_AIHTL    = 0x06
TSL2572_AIHTH    = 0x07
TSL2572_PRES     = 0x0C
TSL2572_CONFIG   = 0x0D
TSL2572_CONTROL  = 0x0F
TSL2572_ID       = 0x12
TSL2572_STATUS   = 0x13
TSL2572_C0DATA   = 0x14
TSL2572_C0DATAH  = 0x15
TSL2572_C1DATA   = 0x16
TSL2572_C1DATAH  = 0x17

#TSL2572 setings
atime = 0xC0
gain = 1.0

def initTSL2572() :
  if (getTSL2572reg(TSL2572_ID)!=[0x34]) :
    #check TSL2572 ID
    return -1
  setTSL2572reg(TSL2572_COMMAND | TSL2572_TYPE_INC | TSL2572_CONTROL,0x00)
  setTSL2572reg(TSL2572_COMMAND | TSL2572_TYPE_INC | TSL2572_CONFIG,0x00)
  setTSL2572reg(TSL2572_COMMAND | TSL2572_TYPE_INC | TSL2572_ATIME,atime)
  setTSL2572reg(TSL2572_COMMAND | TSL2572_TYPE_INC | TSL2572_ENABLE,TSL2572_AEN | TSL2572_PON)
  return 0

def setTSL2572reg(reg,dat) :
  i2c.write_byte_data(TSL2572_ADR,reg,dat)

def getTSL2572reg(reg) :
  dat = i2c.read_i2c_block_data(TSL2572_ADR,TSL2572_COMMAND | TSL2572_TYPE_INC | reg,1)
  return dat

def getTSL2572adc() :
  dat = i2c.read_i2c_block_data(TSL2572_ADR,TSL2572_COMMAND | TSL2572_TYPE_INC | TSL2572_C0DATA,4)
  adc0 = (dat[1] << 8) | dat[0]
  adc1 = (dat[3] << 8) | dat[2]
  return[adc0,adc1]
  
# GPI8100
PIN_PULSE = 21
Vref = 3.3
factory = PiGPIOFactory()
pulse_pin = OutputDevice(PIN_PULSE, pin_factory=factory)
pulse_pin.off()
adc_ch0 = MCP3002(channel=0, max_voltage=Vref, pin_factory=factory)

# Publish to the same topic in a loop forever
loopCount = 0
while True:
    if args.mode == 'both' or args.mode == 'publish':
        co2 = re.findall(r"\d+", str(subprocess.check_output(['sudo', 'python3', '-m', 'mh_z19'])))[1]
        human = GPIO.input(human_pin)
        adc = getTSL2572adc()
        cpl = 0.0
        lux1 = 0.0
        lux2 = 0.0
        cpl = (2.73 * (256 - atime) * gain)/(60.0)
        lux1 = ((adc[0] * 1.00) - (adc[1] * 1.87)) / cpl
        lux2 = ((adc[0] * 0.63) - (adc[1] * 1.00)) / cpl
        pulse_pin.on()
        val = adc_ch0.value
        pulse_pin.off()
        
        message = {}
        message['co2'] = int(co2)
        message['human'] = human
        message['temperature'] = round(sensor.data.temperature,2)
        message['pressure'] = round(sensor.data.pressure,2)
        message['humidity'] = round(sensor.data.humidity,2)
        message['gas'] = round(sensor.data.gas_resistance,2)
        message['smell'] = round(val,2)
        
        if ((lux1 <= 0) and (lux2 <= 0)) :
            message['light'] = 0
        elif (lux1 > lux2) :
            message['light'] = round(lux1,2)
        elif (lux1 < lux2) :
            message['light'] = round(lux2,2)
        
        messageJson = json.dumps(message)
        myAWSIoTMQTTClient.publish(topic, messageJson, 1)
        if args.mode == 'publish':
            print('Published topic %s: %s\n' % (topic, messageJson))
        loopCount += 1
    time.sleep(10)

だいぶ余分なソースコードも残っていますが、必要に応じて適宜書き換えちゃってください!

次回は、Timestreamでデータを整理し、Twinmakerに連携するところを解説していきます!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?