5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

【開発】YOLOv8とSwitchBotを使用した睡眠検知と寝落ち自動消灯アプリを作ってみた

Last updated at Posted at 2024-06-16

はじめに

この記事では、YOLOv8とSwitchbotを使用した睡眠検知と自動消灯アプリについてまとめました。

また、この記事は 【開発】YOLOv8を使用した睡眠検知アプリを作ってみた の続きです。もし詳細が気になる方は参考にしてみてください。

コードに関しては、以下のリポジトリに上げているので、気になる方は参考にしてみてください。

1. SwitchBotとは

SwitchBotは、スマートホームの自動化をサポートするために設計された製品群を提供するブランドです。今回の開発では、以下の2つの製品を使用しました。

以下は公式サイトに記載されている商品説明です。

SwitchBot ハブ2

高齢の家族が自宅にいるけどエアコンがオフになっている場合、エアコン状態を確認した後、遠隔でオンにできます。ネットワークが切断されても、エアコンのコントロールが可能です。SwitchBotハブ2はエアコンの赤外線コードを学習する際に、自動的に対応する赤外線コードをローカルに保存します。 ルーターやサーバーに障害が発生した場合でも、スマホからBluetoothを使って操作することが可能です。

SwitchBot ボット

軽量&コンパクトで、壁スイッチ・炊飯器・コーヒーメーカー・給湯器など様々なスイッチ・ボタンに適用です。SwitchBotアプリには「押す」モードと「スイッチ」モードという2つの動作モードが選択可能。スイッチ・ボタンを押すのみの場合は、「押す」モードを選び、スイッチのオン・オフを切替たいなら、「スイッチ」モードを選択してください。また、交換可能なCR2型リチウム電池を使用して、約600日まで使用可能です。壁のタッチパネルに対応不可。

要約すると、ハブ2はその他デバイスを統合的に管理するデバイスです。また、外出時などWi-Fiがない状態でも遠隔で操作できる利点があります。ボットは、操作一つでスイッチを押す動作を行ってくれるデバイスです。電気スイッチだけでなく、パソコンやテレビ、炊飯器といった広範な場面で使用することが出来ます。(今回の開発ではハブ2は不要だったかもしれません…)

筆者は以下のようにボットを電気スイッチに取り付けることで、消灯を行いました。

21073.jpg

これらのデバイスはAPIが提供されており、自分好みのプログラムを組むことで自由度の高い処理を行うことが出来ます。

※ただし、一日のAPI呼び出し回数は 10000回なので注意してください。(リアルタイム処理でもない限り使い切ることはないと思いますが…)

APIの呼び出しにはアカウント登録やトークンの取得など事前の準備が必要になります。

公式サイトに詳しい手順が記載されているので、参考にしてみてください。

今回の記事では、以下の動画を参考にさせていただきました。SwitchBotのAPI処理に関する記事が少なかったので、大変参考になりました。

他のデバイスの操作方法などを知りたい方は、以下のリンクに記載されているので是非参考にしてみてください。

2. コード

作成したコードは以下になります。

基本的には前回の記事で実装したものにSwitchBotのAPI呼び出しを加えただけなので、ほとんど違いはありません。

処理の流れとしては以下の手順になります。

①YOLOを使用した睡眠検出。
②Window Time期間分の睡眠状態の比率mean_rateを算出する。
③mean_rateのsleep_thresholdを超過した期間がsleep_timeより長くなったとき寝落ちと見なす。
④寝落ち判定をしたとき、Botに「押す」動作のリクエストを送信する。
⑤消灯。
⑥画面をスリープ状態にする。

sitchbot_api.py
import json
import time
import hashlib
import hmac
import base64
import uuid
import requests
from rich import print

class Bot:
    def __init__(self):
        bot_id = 'C93830325118'
        self.apiHeader = {}
        # open token
        token_path = 'credentials/token.txt'
        secret_path = 'credentials/secret.txt'

        # open token file
        with open(token_path, 'r') as file:
            token = file.read().replace('\n', '')

        # open secret file
        with open(secret_path, 'r') as file:
            secret = file.read().replace('\n', '')

        nonce = uuid.uuid4()
        t = int(round(time.time() * 1000))
        string_to_sign = '{}{}{}'.format(token, t, nonce)
        string_to_sign = bytes(string_to_sign, 'utf-8')
        secret = bytes(secret, 'utf-8')
        sign = base64.b64encode(hmac.new(secret, msg=string_to_sign, digestmod=hashlib.sha256).digest())

        #Build api header JSON
        self.apiHeader['Authorization']=token
        self.apiHeader['Content-Type']='application/json'
        self.apiHeader['charset']='utf8'
        self.apiHeader['t']=str(t)
        self.apiHeader['sign']=str(sign, 'utf-8')
        self.apiHeader['nonce']=str(nonce)

        self.bot_id = bot_id
        self.devices_url = f'https://api.switch-bot.com/v1.1/devices/{bot_id}/status'
        self.command = f'https://api.switch-bot.com/v1.1/devices/{bot_id}/commands'
        self.input_data = {
            'commandType': 'command',
            'command': 'press',
            'parameter': 'default'
        }

    def get_status(self):
        res = requests.get(self.devices_url, headers=self.apiHeader)
        data = res.json()
        power = data['body']['power']
        deviceMode = data['body']['deviceMode']
        return power, deviceMode

    def press(self):
        res = requests.post(self.command, headers=self.apiHeader, json=self.input_data)
        data = res.json()
        return data

    def switch(self):
        power, deviceMode = self.get_status()
        if power == 'on':
            self.input_data['command'] = 'turnOff'
        else:
            self.input_data['command'] = 'turnOn'
        res = requests.post(self.command, headers=self.apiHeader, json=self.input_data)
        data = res.json()
        return data
    
if __name__ == '__main__':
    device = Bot()
    print(device.press())
    print(device.switch())
    print(device.get_status())
app.py
import tkinter as tk
from tkinter import font
import numpy as np
import matplotlib.pyplot as plt
import cv2
from ultralytics import YOLO
from PIL import Image, ImageTk
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.animation as animation
import time
import ctypes
from switchbot_api.switchbot_api import Bot

class SleepRecognition:
    def __init__(self, warn_threshold=0.5, sleep_threshold=0.9, sleep_time=10):
        self.is_sleep = False
        self.warn_threshold = warn_threshold
        self.sleep_threshold = sleep_threshold
        self.sleep_time = sleep_time
        self.start_time = 0
    
    def check_sleep(self, mean_rate):
        if mean_rate > self.sleep_threshold:
            if not self.is_sleep:
                self.start_time = time.time()
                self.is_sleep = True
            else:
                if time.time() - self.start_time > self.sleep_time:
                    return True
                else:
                    print("sleeping")
        else:
            self.is_sleep = False
        return False

# Define the function
def monitor_off():
    # -1 to turn off the monitor
    ctypes.windll.user32.SendMessageW(0xFFFF, 0x112, 0xF170, 2)

def get_class_name(all_classes, all_confidences):
    if len(all_classes) == 0:
        return 0  # 検出されない場合もawakeとする
    else:
        awake_index = np.array(np.where(all_classes == 0))
        sleep_index = np.array(np.where(all_classes == 1))

        if awake_index.shape[1] == 0 and sleep_index.shape[1] != 0:
            return 1
        elif awake_index.shape[1] != 0 and sleep_index.shape[1] == 0:
            return 0
        else:
            awake_probs = np.max(all_confidences[awake_index])
            sleep_probs = np.max(all_confidences[sleep_index])

            if awake_probs > sleep_probs:
                return 0
            else:
                return 1

class Application(tk.Frame):
    def __init__(self, master, video_source=0, model_weight=None):
        super().__init__(master)

        self.master.geometry("1500x750")
        self.master.title("Tkinter with Video Streaming and Capture")
        self.model = YOLO(model_weight)

        # ---------------------------------------------------------
        # ポイント設定
        # ---------------------------------------------------------
        self.camera_x = 10
        self.camera_y = 10
        self.camera_w = 640
        self.camera_h = 480
        self.camera_padx = 10
        self.camera_pady = 10

        self.graph_x = self.camera_x + self.camera_w + self.camera_padx
        self.graph_y = self.camera_y
        self.graph_w = 640
        self.graph_h = 480
        self.graph_padx = 10
        self.graph_pady = 10
        # ---------------------------------------------------------
        # グラフの設定
        # ---------------------------------------------------------
        self.class_array = np.array([])
        self.mean_rate_array = np.array([])
        self.window_time = 100
        self.next_time = 3
        self.processing_time = 40
        self.delay = self.next_time + self.processing_time  # [mili seconds]
        self.reset_threshold = int(self.window_time / (self.delay / 1000))
        self.xrange = np.arange(0, self.window_time, self.delay / 1000)
        self.warn_threshold = 0.5
        self.sleep_threshold = 0.9
        self.wait_time = 10

        # ---------------------------------------------------------
        # フォント設定
        # ---------------------------------------------------------
        self.font_frame = font.Font(family="Meiryo UI", size=15, weight="normal")
        self.font_btn_big = font.Font(family="Meiryo UI", size=20, weight="bold")
        self.font_btn_small = font.Font(family="Meiryo UI", size=15, weight="bold")

        self.font_lbl_bigger = font.Font(family="Meiryo UI", size=45, weight="bold")
        self.font_lbl_big = font.Font(family="Meiryo UI", size=30, weight="bold")
        self.font_lbl_middle = font.Font(family="Meiryo UI", size=15, weight="bold")
        self.font_lbl_small = font.Font(family="Meiryo UI", size=12, weight="normal")

        # ---------------------------------------------------------
        # ビデオソースのオープン
        # ---------------------------------------------------------

        self.vcap = cv2.VideoCapture(video_source)
        self.width = self.vcap.get(cv2.CAP_PROP_FRAME_WIDTH)
        self.height = self.vcap.get(cv2.CAP_PROP_FRAME_HEIGHT)

        # ---------------------------------------------------------
        # 状態の更新フラグ
        # ---------------------------------------------------------
        self.is_updated = False

        # ---------------------------------------------------------
        # スリープ判定
        # ---------------------------------------------------------
        #睡眠を判定する時間間隔
        self.sleep_time = 10
        self.sleep_recognition = SleepRecognition(warn_threshold=self.warn_threshold, sleep_threshold=self.sleep_threshold, sleep_time=self.sleep_time)

        # ---------------------------------------------------------
        # SwitchBot API
        # ---------------------------------------------------------
        self.switchbot = Bot()
    
        # ---------------------------------------------------------
        # ウィジェットの作成
        # ---------------------------------------------------------

        self.create_widgets()

        self.update()

    def create_widgets(self):
        # Frame_Camera
        self.frame_cam = tk.LabelFrame(self.master, text='Camera', font=self.font_frame)
        self.frame_cam.place(x=self.camera_x, y=self.camera_y, width=self.camera_w, height=self.camera_h)
        self.frame_cam.grid_propagate(0)

        # 画像用Canvas
        self.canvas1 = tk.Canvas(self.frame_cam, width=self.camera_w, height=self.camera_h)
        self.canvas1.grid(column=0, row=0, padx=10, pady=10)

        # Graph
        self.frame_graph = tk.LabelFrame(self.master, text='Graph', font=self.font_frame)
        self.frame_graph.place(x=self.graph_x, y=self.graph_y, width=self.graph_w, height=self.graph_h)
        self.frame_graph.grid_propagate(0)

        self.fig = plt.Figure()
        self.ax = self.fig.add_subplot(111)
        self.ax.axhline(y=self.warn_threshold, color='orange', linestyle='--')
        self.ax.axhline(y=self.sleep_threshold, color='red', linestyle='--')
        self.ax.set_xlabel('Time [s]')
        self.ax.set_ylabel('Sleep Rate')
        self.ax.text(0, self.warn_threshold, 'warn', color='orange')
        self.ax.text(0, self.sleep_threshold, 'sleep', color='red')
        self.ax.set_xlim(0, self.window_time)
        self.ax.set_ylim(0, 1)

        self.canvas2 = FigureCanvasTkAgg(self.fig, master=self.frame_graph)
        self.canvas2.get_tk_widget().pack(fill=tk.BOTH, expand=True)

        # Control
        self.control = tk.LabelFrame(self.master, text='Control', font=self.font_frame)
        self.control.place(x=10, y=550, width=self.camera_w + self.graph_padx + self.graph_w + self.graph_padx + 150, height=200)
        self.control.grid_propagate(0)

        # Window Time
        self.lbl_window_time = tk.Label(self.control, text="Window Time", font=self.font_lbl_small)
        self.lbl_window_time.grid(column=0, row=0, padx=10, pady=10)
        self.entry_window_time = tk.Entry(self.control, font=self.font_lbl_small)
        self.entry_window_time.grid(column=1, row=0, padx=10, pady=10)

        # Warn Threshold
        self.lbl_warn_threshold = tk.Label(self.control, text="Warn Threshold", font=self.font_lbl_small)
        self.lbl_warn_threshold.grid(column=2, row=0, padx=10, pady=10)
        self.entry_warn_threshold = tk.Entry(self.control, font=self.font_lbl_small)
        self.entry_warn_threshold.grid(column=3, row=0, padx=10, pady=10)

        # Sleep Threshold
        self.lbl_sleep_threshold = tk.Label(self.control, text="Sleep Threshold", font=self.font_lbl_small)
        self.lbl_sleep_threshold.grid(column=4, row=0, padx=10, pady=10)
        self.entry_sleep_threshold = tk.Entry(self.control, font=self.font_lbl_small)
        self.entry_sleep_threshold.grid(column=5, row=0, padx=10, pady=10)

        # Update Button
        self.btn_update = tk.Button(self.control, text='Update', font=self.font_btn_big, command=self.update_settings)
        self.btn_update.grid(column=6, row=0, padx=20, pady=10)

        # Close Button
        self.btn_close = tk.Button(self.control, text='Close', font=self.font_btn_big, command=self.press_close_button)
        self.btn_close.grid(column=7, row=0, padx=20, pady=10)

        # ---------------------------------------------------------
        # Check sleep
        self.lbl_sleep = tk.Label(self.control, text="Sleep Time", font=self.font_lbl_small)
        self.lbl_sleep.grid(column=0, row=1, padx=10, pady=10)

        self.entry_sleep = tk.Entry(self.control, font=self.font_lbl_small)
        self.entry_sleep.grid(column=1, row=1, padx=10, pady=10)
        # ---------------------------------------------------------
        #Show Status
        # ---------------------------------------------------------
        self.lbl_status = tk.Label(self.master, text=f"Status: Window Time: {self.window_time} [s] Warn Threshold: {self.warn_threshold} Sleep Threshold: {self.sleep_threshold} Sleep Time: {self.sleep_time} [s]", font=self.font_lbl_small)
        self.lbl_status.place(x=20, y=720)



    def update_settings(self):
        try:
            self.window_time = int(self.entry_window_time.get())
            self.warn_threshold = float(self.entry_warn_threshold.get())
            self.sleep_threshold = float(self.entry_sleep_threshold.get())
            self.sleep_time = int(self.entry_sleep.get())
            self.reset_threshold = int(self.window_time / (self.delay / 1000))

            self.sleep_recognition = SleepRecognition(warn_threshold=self.warn_threshold, sleep_threshold=self.sleep_threshold, sleep_time=self.sleep_time)

            self.xrange = np.arange(0, self.window_time, self.delay / 1000)

            self.mean_rate_array = np.array([])
            self.class_array = np.array([])
            self.xrange = np.arange(0, self.window_time, self.delay / 1000)

            self.x = self.xrange[:len(self.mean_rate_array)]
            self.y = self.mean_rate_array

            self.ax.clear()
            self.ax.axhline(y=self.warn_threshold, color='orange', linestyle='--')
            self.ax.axhline(y=self.sleep_threshold, color='red', linestyle='--')

            self.ax.set_xlabel('Time [s]')
            self.ax.set_ylabel('Sleep Rate')

            self.ax.text(0, self.warn_threshold, 'warn', color='orange')
            self.ax.text(0, self.sleep_threshold, 'sleep', color='red')

            self.ax.set_xlim(0, self.window_time)
            self.ax.set_ylim(0, 1)

            self.lbl_status = tk.Label(self.master, text=f"Status: Window Time: {self.window_time} [s] Warn Threshold: {self.warn_threshold} Sleep Threshold: {self.sleep_threshold} Sleep Time: {self.sleep_time} [s]", font=self.font_lbl_small)
            self.lbl_status.place(x=20, y=720)
            self.canvas2.draw()
        except ValueError:
            print("Invalid input for one of the settings")

    def update(self):
        # ビデオソースからフレームを取得
        _, frame = self.vcap.read()

        results = self.model(frame, show=False, conf=0.65, iou=0.5, device='cuda:0')

        all_classes = results[0].boxes.cls.cpu().numpy()
        all_confidences = results[0].boxes.conf.cpu().numpy()

        detected_class = get_class_name(all_classes, all_confidences)  # 0:awake, 1:sleep


        self.class_array  = np.append(self.class_array, detected_class)

        self.reset_threshold = int(self.window_time / (self.delay / 1000))

        if len(self.class_array) > self.reset_threshold:
            self.class_array = self.class_array[1:]

        self.wait_threshold = int(self.wait_time / (self.delay / 1000))
        if len(self.class_array) > self.wait_threshold:
            mean_rate = np.sum(self.class_array[self.class_array != None]) / len(self.class_array)
        else:
            mean_rate = 0  # 初期のの推論結果は変動幅が大きいため0とする

        self.mean_rate_array = np.append(self.mean_rate_array, mean_rate)

        if len(self.mean_rate_array) > self.reset_threshold:
            self.mean_rate_array = self.mean_rate_array[1:]

        self.x = self.xrange[:len(self.mean_rate_array)]
        self.y = self.mean_rate_array

        # グラフの更新
        self.ax.clear()

        # グラフに閾値を表示
        self.ax.axhline(y=self.warn_threshold, color='orange', linestyle='--')
        self.ax.axhline(y=self.sleep_threshold, color='red', linestyle='--')

        # ラベルの設定
        self.ax.set_xlabel('Time [s]')
        self.ax.set_ylabel('Sleep Rate')

        self.ax.text(0, self.warn_threshold, 'warn', color='orange')
        self.ax.text(0, self.sleep_threshold, 'sleep', color='red')

        # 閾値を超えたら色を変える
        if self.y[-1] > self.sleep_threshold:
            self.ax.plot(self.x, self.y, color='red')
        elif self.y[-1] > self.warn_threshold:
            self.ax.plot(self.x, self.y, color='orange')
        else:
            self.ax.plot(self.x, self.y, color='blue')
        self.canvas2.draw()
        
        # 画像の更新
        annotated_img = results[0].plot()
        annotated_img = cv2.cvtColor(annotated_img, cv2.COLOR_BGR2RGB)
        self.photo = ImageTk.PhotoImage(image=Image.fromarray(annotated_img))

        # self.photo -> Canvas
        self.canvas1.create_image(0, 0, image=self.photo, anchor=tk.NW)

        self.master.after(self.next_time, self.update)

        #睡眠判定
        is_sleep = self.sleep_recognition.check_sleep(mean_rate)
        if is_sleep:
            print("sleep")
            self.switchbot.press()
            monitor_off()
            time.sleep(10)
            self.master.destroy()
        else:
            #print("awake")
            pass

    def press_close_button(self):
        self.master.destroy()

def main():
    pretrained_weight_path = "yolov8_sleep_recognition/runs/detect/train10/weights/best.pt"

    root = tk.Tk()
    app = Application(master=root, video_source=0, model_weight=pretrained_weight_path)
    app.mainloop()

if __name__ == "__main__":
    main()

実行した動画がこちらになります。

分かりにくいかもしれませんが、閾値を一定時間超過すると自動で消灯する処理になっていることが分かると思います。

まとめ

ここまで読んでくださり、ありがとうございます。

今回の記事は、前回の記事で説明したコードをほぼ流用しているので説明する部分が少なくなってしまい、必然的に記事の内容も薄くなってしまいました。

念願の寝落ち自動消灯を実現することが出来ました。これで長い間苦しめられてきた寝落ちによる体調不良ともおさらばできます。

前回の記事ではSwitchBotのWi-Fiトラブルにより、自動消灯の処理までは実装できませんでしたが、このことを研究室の友人に話すと、「 モバイルホットスポット 使ってみたら?」という助言をいただきました。彼のおかげで、今回の開発は実現したといっても過言ではありません(本当に)。

YOLOを使用しているだけあって、推論精度、速さ共に高水準を維持していますが、まだまだ改善の余地があります。物体検出をしていることもあり、服の色や組み合わせで推論精度が大きく変わってしまう、固定カメラを使用しているため異なる画角の画像は使用できない等の問題点があります。この問題はいずれ解決したいと思います。

参考文献

SwitchBot(スイッチボット)公式サイト
【SwitchBot API】PythonでSwitchBotデバイスを操作してみた
SwitchBot Open API Documents - GitHub

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?