はじめに
こんにちは!社会人一年目?のタレカツです!
今回はIoT Coreを使用したクラウド目覚まし時計である、cloud (alarm) watch(おそらくclockの方が正解)を作ってみました。実装した機能としては、ラズベリーパイにスピーカーとカメラがついており、時間になると音が鳴りつづけ、カメラ画像を送信して表情で音を止めるというものになっています。
コードなどはQ Developerを使用して作成しており、Pythonで書かれています。Q Developerと作りながら勉強しているような感じではあるので、間違いやご指摘などございましたらお申し付けください。
システム構成
システム構成は以下のものになっています。
IoT Coreとは
IoT Coreはインターネットに接続されたデバイスから、AWSのその他のサービスとの連携を図ったり、他のデバイスと安全に通信するための、クラウドプラットフォームです。デバイスとAWSを接続することで、接続したデバイスとデータのやり取りをしたり、今回は触っていないですが、デバイスのアプリケーションの操作も可能だと明記されています。いくつかの通信プロトコルが選択でき、今回はIoTユースケースでよく使用されているMQTTと呼ばれるプロトコルを使用しました。
MQTTとは
軽量のパブリッシュ-サブスクライブ (Pub/Sub) のプロトコルでネットワーク帯域幅とデバイスのリソースを最小限に抑えるように設計されています。TLSを使用した通信もサポートしています。MQTTは四つの構成要素があります。
- メッセージ
通信データそのものであり、様々なデータ型で送られます。図のファイルの部分です。 - トピック
全てのメッセージには関連づけられたトピックがあり、以下のように/で階層レベルで区切られて表現されます。送信する側はトピックを設定してどの受信者に受信させるか、受信側はそのトピックに応じて受信します。
device/sensor1
device/sensor2 - サブスクライバー
サブスクライバーは関心のあるトピックに登録して、メッセージを受信します。 - パブリッシャー
パブリッシャーはメッセージを送信するコンポーネントであり、トピックに関するメッセージを作成して、そのトピックを登録したサブスクライバーに送信します。1対多の通信であり、相手であるサブスクライバーを気にする必要はありません。
IoT Core初期設定
初期設定ではIoT Coreにて、モノの作成を行わないといけないのですが、接続1個のデバイスを接続からと管理→モノ→モノを作成から行うことができます。一つ目の方では、セットアップに沿って作業が進み、最後テストまで行なってくれます。また、両方で接続する際の認証に必要な証明書が発行されるので、ダウンロードしてください。
また以下のような形でルールを作成します。SQLステートメントのSELECT * FROM smile/image(最初は喜び判定のみだったため)と書かれていますが、FROMの後に書かれているのが、トピック名であり、今回送られてくる画像を表情判定するLambdaが呼び出されています。
目覚まし設定画面
静的Webホスティングでページを公開してボタンを押すことでDynamoDBに保存されているデータを更新します。こちらはとりあえず作った物なので説明は省きます。
目覚まし起動待機システム
ラズベリーパイ側
ラズベリーパイでは、Pythonのプログラムが動いています。下はmqtt接続を行う際の部分のコードになります。上の3行ではスレッドの管理やホスト名の解決の設定を行なっており、接続の管理を行っています。
その後、接続オブジェクトを作成して、接続を実行しています。オブジェクトを作成する際に、endpointやkeyを設定します。この際に先ほどIoT Coreで作成した証明書やendpointの確認が必要になります。
IoT Coreからメッセージを待つのでサブスクライバーとしての設定が必要になります。topicとqos(メッセージの配信の品質レベル)、メッセージ受信時の処理が設定されています。.result()で書かれている行は、接続の確立するのを待機しています。
呼び出した関数では、次に設定する目覚ましを実行するための別のPythonプログラムを実行しています。
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
self.mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=self.endpoint,
cert_filepath=self.cert_filepath,
pri_key_filepath=self.pri_key_filepath,
client_bootstrap=client_bootstrap,
ca_filepath=self.ca_filepath,
client_id=self.client_id,
clean_session=False,
keep_alive_secs=6
)
connect_future = self.mqtt_connection.connect()
connect_future.result()
print("IoT Core接続完了")
# アラームトピック購読
subscribe_future, packet_id = self.mqtt_connection.subscribe(
topic=f"alarm/{self.client_id}",
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_alarm_received
)
subscribe_future.result()
print(f"アラームトピック購読完了: alarm/{self.client_id}")
Lambda側
Lambda側は1分ごとに以下のコードの処理をしており、DynamoDBからデータを持ってきて設定された時間と同じであれば、先ほどのラズベリーパイ側にパブリッシャーとしてメッセージを送信します。
dynamodb = boto3.resource('dynamodb')
iot_client = boto3.client('iot-data')
table = dynamodb.Table('AlarmSettings')
# 現在時刻(UTC + 9時間でJST)
utc_now = datetime.utcnow()
now = utc_now + timedelta(hours=9)
current_time = now.strftime('%H:%M')
current_day = now.strftime('%a').lower()
print(f"現在時刻: {current_time}, 曜日: {current_day}")
try:
# アラーム設定を取得
response = table.scan()
triggered_alarms = []
for item in response['Items']:
device_id = item['device_id']
alarm_time = item['alarm_time']
enabled = item.get('enabled', False)
repeat_days = item.get('repeat_days', [])
print(f"チェック中: {device_id}, 時間: {alarm_time}, 有効: {enabled}")
# アラーム条件チェック
if (enabled and
alarm_time == current_time and
current_day in repeat_days):
print(f"アラーム発動: {device_id}")
# アラーム通知をIoTデバイスに送信
message = {
"action": "start_alarm",
"device_id": device_id,
"alarm_time": alarm_time,
"timestamp": now.isoformat()
}
iot_client.publish(
topic=f"alarm/{device_id}",
qos=1,
payload=json.dumps(message)
)
目覚まし実行システム
ラズベリーパイ側
実行プログラムでは、実行した際にスピーカーから音楽の再生とカメラを起動します。接続する際の設定などは、先ほどの待機のプログラムとした後に、以下のような設定で待機します。こちらのトピックは、後ほど説明するカメラ画像を送信して、表情判定の結果が受け取ることのできるtopicを設定しています。その際に、どれだけその表情であるのかを閾値として判定して、プログラムを終了するかを決めています。
subscribe_future, packet_id = self.mqtt_connection.subscribe(
topic="smile/result",
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=self.on_message_received
)
別のスレッドで以下のようなカメラループが動いており、self.play_seで最初にどういった表情が必要かの説明が再生されます。3秒ごとにAWSに画像が送信されます。また30秒ごとにも説明が入るようになっています。
データを送信する通信効率を上げるために、エンコードをしてBase64形式に変換して、json形式として扱っています。パブリッシャーとして、先ほどのルールで設定したtopicに対して送信されます。
def camera_loop(self):
print("カメラループ開始")
frame_count = 0
self.last_check_time2 = 0
self.play_se(self.target_emotions[0]) # リストの最初の要素を使用
while self.is_running:
ret, frame = self.cap.read()
if ret:
frame_count += 1
# フレーム情報表示(30フレームごと)
if frame_count % 30 == 0:
print(f"フレーム {frame_count} 処理中... 音声状態: {'再生中' if self.is_playing else '停止中'}")
current_time = time.time()
if current_time - self.last_check_time > 3:
print("画像をAWSに送信中...")
self.play_se('notification') # 送信時に効果音
self.send_image(frame)
self.last_check_time = current_time
if current_time - self.last_check_time2 > 30:
self.play_se(self.target_emotions[0]) # リストの最初の要素を使用
self.last_check_time2 = current_time
else:
print("フレーム取得失敗")
time.sleep(0.1)
def send_image(self, frame):
"""画像をIoT Coreに送信"""
try:
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70])
image_base64 = base64.b64encode(buffer).decode('utf-8')
message = {
"device_id": self.client_id,
"timestamp": time.time(),
"image": image_base64,
"target_emotions": self.target_emotions
}
self.mqtt_connection.publish(
topic="smile/image",
payload=json.dumps(message),
qos=mqtt.QoS.AT_LEAST_ONCE
)
print("画像送信完了")
Lambda側
Lambdaでは、送られてきたメッセージから画像と今回の表情を設定します。そして、Rekognitionを用いて表情検出を行います。表情の検出が70%以上だった場合、検出された表情が格納されます。そして、結果をIoT Coreに送信するのですが、ターゲットの表情が検出されたか、表情の種類や信頼度などが、先ほどラズベリーパイで受けとるために設定したtopicと同じtopicで送信されます。
rekognition = boto3.client('rekognition')
iot_client = boto3.client('iot-data')
logger.info(f"受信イベント: {json.dumps(event, default=str)}")
# Base64画像をデコード
image_data = base64.b64decode(event['image'])
# ターゲット表情を取得(デフォルトはランダム選択)
target_emotions = event.get('target_emotions', [])
logger.info(f"ターゲット表情: {target_emotions}")
# 表情検出
response = rekognition.detect_faces(
Image={'Bytes': image_data},
Attributes=['EMOTIONS']
)
# 検出された表情を整理
detected_emotions = []
target_detected = False
max_confidence = 0
detected_target_emotion = ""
for face in response['FaceDetails']:
for emotion in face['Emotions']:
emotion_info = {
'type': emotion['Type'],
'confidence': emotion['Confidence']
}
detected_emotions.append(emotion_info)
# ターゲット表情かチェック
if emotion['Type'] in target_emotions and emotion['Confidence'] > 70:
if emotion['Confidence'] > max_confidence:
target_detected = True
max_confidence = emotion['Confidence']
detected_target_emotion = emotion['Type']
# 結果をIoT Coreに送信
result = {
'detected_emotions': detected_emotions,
'target_emotions': target_emotions,
'emotion_detected': target_detected,
'detected_emotion': detected_target_emotion,
'confidence': max_confidence,
'timestamp': event.get('timestamp', context.aws_request_id),
'device_id': event.get('device_id', 'unknown'),
}
iot_client.publish(
topic='smile/result',
payload=json.dumps(result)
)
実際に使ってみた
感想
今回ラズベーパイにマイクやスピーカーが鳴らない時や、一度Windowsで作成してしまい以降にかなり手間取ってしまった。おそらく、プログラミングの問題で、強制終了した際にマイクを別で使ったままになっているなどかと考えています。また、環境を移行しやすくするために、コンテナで行うなどしていれば早かったかのかと考えています。
コードの部分はAIにかなり頼り切りなってしまっており、マルチスレッドを用いたようなプログラムはAIがないと、書けなかったと思うので、もう少しコードの理解を深めたい。
ただ、自身が使ってみたいようなものを作るのは楽しいので、とても楽しく作成できました。
今後
今回は、ラズパイ側に赤外線センサーを使った自動点灯やエアコン熱風機能などに加えて、表情で止めるだけではなく、ランダム性を加えることによって目が覚めるような、目覚ましが作成できると思っているので、もう少し機能を加えたいと考えています!
参考