1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SwitchBotをAIエージェント化してSlackから家電を操作

Posted at

SwitchBotをAIエージェント化してSlackから家電を操作

SwitchBot、便利ですよね。でも、SwitchBotで操作する家電が増えてくるとこんな悩みが出てきました。

  • 「あれ、エアコン操作用のウィジェットどこだっけ?」とスマホの画面で迷子。
switch_bot_capture
  • 在宅にてPCで作業中、エアコンをつけたいけど、スマホやエアコンのリモコンが近くにない

  • 「部屋をもう少し暖かくしたい」といった曖昧な操作をしたい。

そこで今回、「AIエージェント」を活用して、SlackのチャットからSwitchBotを操作できる環境を構築しました。

SlackからSwitchBotを操作するイメージ
完成形.png

メリット

  • マルチプラットフォーム対応: Windows PCからでもSlack経由でサクッと操作可能。

  • UIからの解放: 大量のウィジェットから探す手間がなく、文字入力だけでOK。

  • 文脈の理解: 「もう少し部屋を暖かくして」といった指示が通じる。

システム構成

ユーザーがSlackで送ったメッセージをPythonプログラムが受け取り、LLMが「どのデバイスをどう操作するか」を判断してSwitchBot APIを叩く構成です。

準備するもの

  1. SwitchBot ハブ製品(ハブ2やハブミニなど)

  2. SwitchBotで操作可能な家電(ライト、カーテン、プラグなど)

  3. OpenAI API Key

  4. SwitchBot API Key & Secret

    • アプリの「プロフィール」>「設定」>「アプリバージョン」を10回連打して「開発者向けオプション」を出し、そこから取得します。
  5. Slack AppのトークンSLACK_BOT_TOKEN, SLACK_APP_TOKEN

  6. Pythonを実行できる環境

実装のポイント

1. AIエージェントの核となる処理

単なるチャットボットではなく、Function Calling(tools) を利用して、LLMが「関数を実行する」と判断できるようにしています。
toolsはSwitchBotで操作しているデバイスに合わせて定義する必要があります。

私のSwitchBotを操作するために実装したソースコードを、この記事の最後に参考として載せています。

import openai

def process_user_request(user_input, conversation_history=None):
    """ユーザーのリクエストをLLMで解釈し、適切なツールを呼び出す"""
    
    # デバイス情報を動的に取得(事前にリスト化しておくと精度が上がります)
    devices = get_device_info() # 自作のデバイス一覧取得関数
    device_info_text = "\n".join([f"- {d['name']} ({d['type']})" for d in devices])

    system_prompt = f"""あなたはSwitchbotデバイスを操作するアシスタントです。
以下のデバイスが利用可能です:
{device_info_text}

重要: 関数を呼び出す際は、上記のデバイスリストから実際のデバイス名を使用してください。
会話履歴を参照して文脈を理解してください。
例:前のメッセージで「シーリングライト」について話していた場合、「暗くして」は「シーリングライトを暗くする」という意味です。
"""

    messages = [{"role": "system", "content": system_prompt}]
    if conversation_history:
        messages.extend(conversation_history)
    messages.append({"role": "user", "content": user_input})

    # OpenAI API呼び出し
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        tools=tools, # SwitchBot操作用の関数(Tool)を定義して渡す
        tool_choice="auto"
    )

    return response.choices[0].message

2. SwitchBot API v1.1 の署名生成

SwitchBot APIを利用するには、セキュリティのための署名(Signature)生成が必要です。

import time
import hashlib
import hmac
import base64

def generate_sign(token, secret):
    """SwitchBot API v1.1 用の署名を生成する"""
    nonce = "" # 空でも可、あるいはuuidなど
    t = str(int(time.time() * 1000))
    string_to_sign = '{}{}{}'.format(token, t, nonce)
    string_to_sign = bytes(string_to_sign, 'utf-8')
    secret = bytes(secret, 'utf-8')
    sign = base64.b64encode(hmac.new(secret, msg=string_to_sign, digestmod=hashlib.sha256).digest())
    
    return sign, str(t), nonce

3. Slack Boltによる受け口

Slackからの入力を待ち受ける部分は、slack-bolt ライブラリを使用し、Socket Modeで起動するとローカル環境でもテストしやすくなります。

from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler

app = App(token="xoxb-your-token")

@app.event("message")
def handle_message_events(body, logger):
    event = body["event"]
    # スレッドや履歴の管理ロジックをここに挟む
    response_msg = process_user_request(event["text"])
    # 実際のアクション(APIコール)とSlackへの返信を実行

今後の展望:クラウド化

現在はローカル環境で実行しているため、ソースコードが実行中でないと、SlackからSwitchBotを操作できない状況です。
24時間稼働させるためにクラウド上で実行できるようにしようと思っています。

最後に

今流行りのAIエージェントを日常に組み込むことって、何かワクワクしますよね。
そのワクワクを優先しすぎて、作成したものが「本当に必要なものなんだっけ」と思うことも。
それでも、やりたいと思ったことを今後も実現していきたいので、また新しくワクワクするものが見つかったら、新たに記事を投稿します。

本記事投稿主のSwitchBotを操作するためのソースコード

生成AIにコーディングしてもらったため、あまりよいソースコードではないことをご了承ください。
ファイルを実行するには、下記ファイルをすべて同じフォルダ配下に格納します。

slack_bot.py(このファイルを実行)
import os
import time
import threading
from datetime import datetime
from dotenv import load_dotenv
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from agent import process_user_request
import schedule
from scheduler import load_schedules, register_schedule, get_schedules

# 環境変数を読み込む
load_dotenv()

# Slackクライアントを初期化
slack_client = WebClient(token=os.getenv("SLACK_BOT_TOKEN"))
CHANNEL_ID = os.getenv("SLACK_CHANNEL_ID")

# 最後に処理したメッセージのタイムスタンプ
last_processed_ts = None
bot_user_id = None

# スレッドごとの会話履歴を保存
# { "thread_ts": [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] }
thread_conversations = {}

# スレッドごとの最後に処理したタイムスタンプ
# { "thread_ts": "ts" }
thread_last_ts = {}

def get_bot_user_id():
    """ボット自身のユーザーIDを取得"""
    try:
        response = slack_client.auth_test()
        return response["user_id"]
    except SlackApiError as e:
        print(f"エラー: ボットユーザーIDの取得に失敗 - {e}")
        return None

def get_new_messages():
    """新しいメッセージを取得"""
    global last_processed_ts

    try:
        # チャンネルの履歴を取得
        kwargs = {
            "channel": CHANNEL_ID,
            "limit": 10  # 直近10件を取得
        }

        # 前回のタイムスタンプがある場合、それ以降のみ取得
        if last_processed_ts:
            kwargs["oldest"] = last_processed_ts

        response = slack_client.conversations_history(**kwargs)
        messages = response["messages"]

        # デバッグ: メッセージ数をログ出力
        print(f"[DEBUG] conversations_history: {len(messages)}件取得")

        # 既に処理済みのメッセージを除外
        new_messages = []
        for msg in messages:
            if last_processed_ts and msg["ts"] <= last_processed_ts:
                # 既に処理済み
                continue
            new_messages.append(msg)

        if new_messages:
            print(f"[DEBUG] 新しいメッセージ: {len(new_messages)}")
            # メッセージは新しい順なので、最初のメッセージのtsを保存
            last_processed_ts = new_messages[0]["ts"]

        return new_messages

    except SlackApiError as e:
        print(f"エラー: メッセージ取得に失敗 - {e}")
        return []

def get_thread_replies(thread_ts):
    """スレッド内の新しい返信を取得"""
    try:
        # スレッドの返信を取得
        response = slack_client.conversations_replies(
            channel=CHANNEL_ID,
            ts=thread_ts,
            oldest=thread_last_ts.get(thread_ts, thread_ts)  # 前回のタイムスタンプ以降
        )

        messages = response["messages"]

        # 最初のメッセージ(親メッセージ)は除外
        # 新しい返信のみを返す
        new_replies = [msg for msg in messages if msg["ts"] != thread_ts]

        # 最後に処理したタイムスタンプを更新
        if new_replies:
            thread_last_ts[thread_ts] = new_replies[-1]["ts"]

        return new_replies

    except SlackApiError as e:
        print(f"エラー: スレッド返信取得に失敗 - {e}")
        return []

def send_message(text, thread_ts=None):
    """Slackにメッセージを送信"""
    try:
        slack_client.chat_postMessage(
            channel=CHANNEL_ID,
            text=text,
            thread_ts=thread_ts  # スレッドで返信
        )
    except SlackApiError as e:
        print(f"エラー: メッセージ送信に失敗 - {e}")

def process_message(message, is_thread_reply=False):
    """メッセージを処理

    Args:
        message: Slackメッセージ
        is_thread_reply: スレッド内の返信かどうか
    """
    global bot_user_id, thread_conversations

    # ボットのユーザーIDを取得(初回のみ)
    if bot_user_id is None:
        bot_user_id = get_bot_user_id()

    # デバッグログ
    print(f"[DEBUG] process_message: user={message.get('user')}, bot={bot_user_id}, subtype={message.get('subtype')}, text={message.get('text', '')[:30]}")

    # ボット自身のメッセージは無視
    if message.get("user") == bot_user_id:
        print(f"[DEBUG] ボット自身のメッセージをスキップ")
        return

    # サブタイプがあるメッセージ(編集、削除など)は無視
    if "subtype" in message:
        print(f"[DEBUG] サブタイプメッセージをスキップ: {message.get('subtype')}")
        return

    text = message.get("text", "").strip()
    ts = message.get("ts")
    thread_ts = message.get("thread_ts", ts)  # スレッドのタイムスタンプ

    if not text:
        print(f"[DEBUG] 空のメッセージをスキップ")
        return

    # スレッド内の返信かどうかを表示
    thread_indicator = "[スレッド返信] " if is_thread_reply else ""
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {thread_indicator}受信: {text}")

    try:
        # 会話履歴を取得(スレッドの場合)
        conversation_history = None
        if is_thread_reply or thread_ts != ts:
            # スレッド内の返信の場合、会話履歴を使用
            conversation_history = thread_conversations.get(thread_ts, [])
            if conversation_history:
                print(f"  → 会話履歴を参照: {len(conversation_history)}")
        else:
            # 新しいメインメッセージの場合、会話履歴をクリア
            thread_conversations[thread_ts] = []

        # ユーザーのメッセージを履歴に追加
        thread_conversations.setdefault(thread_ts, []).append({
            "role": "user",
            "content": text
        })

        # AIエージェントで処理
        response = process_user_request(text, conversation_history)
        print(f"[{datetime.now().strftime('%H:%M:%S')}] 応答: {response}")

        # ボットの応答を履歴に追加
        thread_conversations[thread_ts].append({
            "role": "assistant",
            "content": response
        })

        # スレッドで返信
        send_message(response, thread_ts=thread_ts)

    except Exception as e:
        error_msg = f"エラーが発生しました: {str(e)}"
        print(f"[{datetime.now().strftime('%H:%M:%S')}] {error_msg}")
        send_message(error_msg, thread_ts=thread_ts)

def run_scheduler_thread():
    """スケジューラーをバックグラウンドで実行"""
    # スケジュールを読み込み
    load_schedules()
    schedules_list = get_schedules()

    # 既存のスケジュールを登録
    for item in schedules_list:
        register_schedule(item)

    print(f"スケジューラー起動: {len(schedules_list)}件のスケジュールを読み込みました")

    # スケジューラーを実行
    while True:
        schedule.run_pending()
        time.sleep(10)  # 10秒ごとにチェック

def main():
    """メインループ"""
    global last_processed_ts, bot_user_id

    print("=== Switchbot Slackボット(ポーリング方式) ===")
    print(f"チャンネルID: {CHANNEL_ID}")

    # ボットユーザーIDを取得
    bot_user_id = get_bot_user_id()
    if not bot_user_id:
        print("エラー: ボットの初期化に失敗しました")
        return

    print(f"ボットユーザーID: {bot_user_id}")

    # スケジューラーを別スレッドで起動
    scheduler_thread = threading.Thread(target=run_scheduler_thread, daemon=True)
    scheduler_thread.start()

    print("監視を開始します...\n")

    # 初回起動時は最新メッセージのタイムスタンプを取得(過去メッセージは処理しない)
    try:
        initial_response = slack_client.conversations_history(
            channel=CHANNEL_ID,
            limit=1
        )
        if initial_response["messages"]:
            last_processed_ts = initial_response["messages"][0]["ts"]
            print(f"初期化: 最新メッセージts={last_processed_ts}\n")
        else:
            last_processed_ts = None
            print("初期化: チャンネルにメッセージがありません\n")
    except Exception as e:
        print(f"初期化エラー: {e}\n")
        last_processed_ts = None

    polling_interval = 3  # ポーリング間隔(秒)

    # アクティブなスレッドのリスト(スレッドTSのセット)
    active_threads = set()

    try:
        while True:
            # 1. メインチャンネルの新しいメッセージを取得
            messages = get_new_messages()

            print(f"[DEBUG] ポーリング: {len(messages)}件のメッセージ, アクティブスレッド: {len(active_threads)}")

            # メッセージは新しい順なので、古い順に処理するため反転
            for message in reversed(messages):
                ts = message.get("ts")
                thread_ts = message.get("thread_ts")

                # メインメッセージの場合
                if not thread_ts or thread_ts == ts:
                    print(f"[DEBUG] メインメッセージを処理: ts={ts}")
                    process_message(message, is_thread_reply=False)

                    # このメッセージがスレッドの親になる可能性があるので、アクティブなスレッドに追加
                    active_threads.add(ts)

            # 2. アクティブなスレッドの返信をチェック
            for thread_ts in list(active_threads):
                replies = get_thread_replies(thread_ts)

                if replies:
                    print(f"[DEBUG] スレッド {thread_ts}: {len(replies)}件の返信")

                for reply in replies:
                    process_message(reply, is_thread_reply=True)

            # 次のポーリングまで待機
            time.sleep(polling_interval)

    except KeyboardInterrupt:
        print("\n\nボットを終了します。")
    except Exception as e:
        print(f"\n予期しないエラー: {e}")

if __name__ == "__main__":
    main()

agent.py
import os
import json
from dotenv import load_dotenv
from openai import OpenAI
from utils import get_devices, control_device, set_ceiling_light_brightness, set_ceiling_light_color_temp
from scheduler import add_schedule, get_schedules, remove_schedule

# 環境変数を読み込む
load_dotenv()

# OpenAIクライアントを初期化
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

# デバイスタイプごとの操作コマンドマッピング
DEVICE_COMMANDS = {
    "Smart Lock Ultra": {
        "unlock": "unlock",
        "lock": "lock",
        "開ける": "unlock",
        "ロック解除": "unlock",
        "閉める": "lock",
        "ロック": "lock",
    },
    "Curtain3": {
        "open": "turnOn",
        "close": "turnOff",
        "開ける": "turnOn",
        "閉める": "turnOff",
        "半開き": "setPosition",
    },
    "Plug Mini (JP)": {
        "on": "turnOn",
        "off": "turnOff",
        "オン": "turnOn",
        "オフ": "turnOff",
        "つける": "turnOn",
        "消す": "turnOff",
    },
    "Ceiling Light": {
        "on": "turnOn",
        "off": "turnOff",
        "オン": "turnOn",
        "オフ": "turnOff",
        "つける": "turnOn",
        "消す": "turnOff",
    }
}

def get_device_info():
    """デバイス情報を取得して整形"""
    devices_data = get_devices()
    device_list = []

    if devices_data.get("statusCode") == 100:
        for device in devices_data["body"]["deviceList"]:
            device_list.append({
                "id": device["deviceId"],
                "name": device["deviceName"],
                "type": device["deviceType"]
            })

    return device_list

def find_device_by_name(device_name, devices):
    """デバイス名からデバイスを検索(部分一致)"""
    for device in devices:
        if device_name.lower() in device["name"].lower() or device["name"].lower() in device_name.lower():
            return device
    return None

def execute_device_command(device_id, device_type, action):
    """デバイスコマンドを実行"""
    command_map = DEVICE_COMMANDS.get(device_type, {})
    command = command_map.get(action)

    if not command:
        # デフォルトコマンド
        if action in ["on", "オン", "つける", "開ける"]:
            command = "turnOn"
        elif action in ["off", "オフ", "消す", "閉める"]:
            command = "turnOff"
        else:
            command = action

    result = control_device(device_id, command)
    return result

# Function calling用の関数定義
tools = [
    {
        "type": "function",
        "function": {
            "name": "control_switchbot_device",
            "description": "Switchbotデバイスを操作します。デバイス名と操作内容を指定してください。",
            "parameters": {
                "type": "object",
                "properties": {
                    "device_name": {
                        "type": "string",
                        "description": "操作するデバイスの名前(例:カーテン、プラグ、スマートロック、シーリングライト)"
                    },
                    "action": {
                        "type": "string",
                        "description": "実行する操作(例:on, off, open, close, lock, unlock, オン、オフ、開ける、閉める)"
                    }
                },
                "required": ["device_name", "action"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "set_light_brightness",
            "description": "シーリングライトの明るさを調整します。",
            "parameters": {
                "type": "object",
                "properties": {
                    "device_name": {
                        "type": "string",
                        "description": "シーリングライトのデバイス名"
                    },
                    "brightness": {
                        "type": "integer",
                        "description": "明るさ(1-100の範囲)。例:50で50%の明るさ"
                    }
                },
                "required": ["device_name", "brightness"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "set_light_color_temperature",
            "description": "シーリングライトの色温度を調整します。暖色、昼白色、寒色などに変更できます。",
            "parameters": {
                "type": "object",
                "properties": {
                    "device_name": {
                        "type": "string",
                        "description": "シーリングライトのデバイス名"
                    },
                    "color_temp": {
                        "type": "string",
                        "description": "色温度。warm/暖色(暖かい色)、neutral/昼白色(中間色)、cool/寒色/白(白い色)のいずれか"
                    }
                },
                "required": ["device_name", "color_temp"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "add_device_schedule",
            "description": "デバイスを指定時刻に自動操作するスケジュールを追加します。",
            "parameters": {
                "type": "object",
                "properties": {
                    "device_name": {
                        "type": "string",
                        "description": "デバイス名(例:シーリングライト、カーテン)"
                    },
                    "time": {
                        "type": "string",
                        "description": "実行時刻(HH:MM形式、例:07:00、19:30)"
                    },
                    "action": {
                        "type": "string",
                        "description": "実行する操作(例:turnOn、turnOff、setBrightness)"
                    },
                    "brightness": {
                        "type": "integer",
                        "description": "明るさ(setBrightnessの場合のみ必要、1-100)"
                    },
                    "color_temp": {
                        "type": "string",
                        "description": "色温度(setColorTemperatureの場合のみ必要)"
                    }
                },
                "required": ["device_name", "time", "action"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_schedules",
            "description": "登録されているスケジュールの一覧を取得します。",
            "parameters": {
                "type": "object",
                "properties": {},
                "required": []
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_device_list",
            "description": "利用可能なSwitchbotデバイスの一覧を取得します。",
            "parameters": {
                "type": "object",
                "properties": {},
                "required": []
            }
        }
    }
]

def process_user_request(user_input, conversation_history=None):
    """ユーザーのリクエストを処理

    Args:
        user_input: ユーザーの入力
        conversation_history: 会話履歴のリスト([{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}])
    """
    # デバイス情報を取得
    devices = get_device_info()
    device_info_text = "\n".join([f"- {d['name']} ({d['type']})" for d in devices])

    # システムプロンプト
    system_prompt = f"""あなたはSwitchbotデバイスを操作するアシスタントです。
以下のデバイスが利用可能です:

{device_info_text}

重要: 関数を呼び出す際は、上記のデバイスリストから実際のデバイス名を使用してください。
デバイス名は部分一致で検索されますが、できるだけ正確な名前を使ってください。
例:
- ユーザー: 「カーテン」→ device_name: 「カーテン1」または「カーテン」
- ユーザー: 「シーリングライト」→ device_name: 「白いシーリングライト」または「シーリングライト」

シーリングライトには以下の機能があります:
- ON/OFF: つける、消す
- 明るさ調整: 1-100%で指定(例:「明るさを50%に」「半分の明るさに」)
- 色温度調整: 暖色/warm(暖かい色)、昼白色/neutral(中間色)、寒色/cool/白(白い色)

会話履歴を参照して文脈を理解してください。
例:前のメッセージで「シーリングライト」について話していた場合、「暗くして」という指示は「シーリングライトを暗くする」という意味です。
"""

    # OpenAI APIを呼び出し
    messages = [
        {"role": "system", "content": system_prompt}
    ]

    # 会話履歴を追加
    if conversation_history:
        messages.extend(conversation_history)

    # 現在のユーザー入力を追加
    messages.append({"role": "user", "content": user_input})

    response = client.chat.completions.create(
        model="gpt-4o-mini",  # コスト効率の良いモデル
        messages=messages,
        tools=tools,
        tool_choice="auto"
    )

    response_message = response.choices[0].message
    tool_calls = response_message.tool_calls

    # Function callingが呼び出された場合
    if tool_calls:
        for tool_call in tool_calls:
            function_name = tool_call.function.name
            function_args = json.loads(tool_call.function.arguments)

            if function_name == "get_device_list":
                return f"利用可能なデバイス:\n{device_info_text}"

            elif function_name == "control_switchbot_device":
                device_name = function_args["device_name"]
                action = function_args["action"]

                # デバイスを検索
                device = find_device_by_name(device_name, devices)

                if not device:
                    return f"エラー: '{device_name}'というデバイスが見つかりません。\n利用可能なデバイス:\n{device_info_text}"

                # デバイスを操作
                result = execute_device_command(device["id"], device["type"], action)

                if result.get("statusCode") == 100:
                    return f"OK: {device['name']}{action}しました!"
                else:
                    return f"エラー: {device['name']}の操作に失敗しました。\n詳細: {result}"

            elif function_name == "set_light_brightness":
                device_name = function_args["device_name"]
                brightness = function_args["brightness"]

                # デバイスを検索
                device = find_device_by_name(device_name, devices)

                if not device:
                    return f"エラー: '{device_name}'というデバイスが見つかりません。\n利用可能なデバイス:\n{device_info_text}"

                # 明るさを設定
                result = set_ceiling_light_brightness(device["id"], brightness)

                if result.get("statusCode") == 100:
                    return f"OK: {device['name']}の明るさを{brightness}%に設定しました!"
                else:
                    return f"エラー: {device['name']}の明るさ設定に失敗しました。\n詳細: {result}"

            elif function_name == "set_light_color_temperature":
                device_name = function_args["device_name"]
                color_temp = function_args["color_temp"]

                # デバイスを検索
                device = find_device_by_name(device_name, devices)

                if not device:
                    return f"エラー: '{device_name}'というデバイスが見つかりません。\n利用可能なデバイス:\n{device_info_text}"

                # 色温度を設定
                result = set_ceiling_light_color_temp(device["id"], color_temp)

                if result.get("statusCode") == 100:
                    return f"OK: {device['name']}の色温度を{color_temp}に設定しました!"
                else:
                    return f"エラー: {device['name']}の色温度設定に失敗しました。\n詳細: {result}"

            elif function_name == "add_device_schedule":
                device_name = function_args["device_name"]
                time_str = function_args["time"]
                action = function_args["action"]

                # デバイスを検索
                device = find_device_by_name(device_name, devices)

                if not device:
                    return f"エラー: '{device_name}'というデバイスが見つかりません。\n利用可能なデバイス:\n{device_info_text}"

                # パラメータを取得
                params = {}
                if "brightness" in function_args:
                    params["brightness"] = function_args["brightness"]
                if "color_temp" in function_args:
                    params["color_temp"] = function_args["color_temp"]

                # スケジュールを追加
                schedule_item = add_schedule(
                    time_str,
                    device["id"],
                    device["name"],
                    action,
                    **params
                )

                param_text = ""
                if params:
                    param_text = f"{', '.join([f'{k}={v}' for k, v in params.items()])}"

                return f"OK: {device['name']}{time_str}{action}{param_text}するスケジュールを追加しました!"

            elif function_name == "get_schedules":
                schedule_list = get_schedules()

                if not schedule_list:
                    return "登録されているスケジュールはありません。"

                result_text = "登録済みスケジュール:\n"
                for i, item in enumerate(schedule_list):
                    params_text = ""
                    if item.get("params"):
                        params_text = f" ({', '.join([f'{k}={v}' for k, v in item['params'].items()])})"

                    result_text += f"{i+1}. {item['time']} - {item['device_name']} - {item['action']}{params_text}\n"

                return result_text

    # Function callingが呼び出されなかった場合
    return response_message.content

def main():
    """メインループ"""
    print("=== Switchbot AIエージェント ===")
    print("自然言語でSwitchbotデバイスを操作できます")
    print("例: 'カーテンを開けて', 'プラグをオンにして', 'デバイス一覧を見せて'")
    print("終了するには 'exit' または 'quit' と入力してください\n")

    while True:
        user_input = input("あなた: ").strip()

        if user_input.lower() in ["exit", "quit", "終了"]:
            print("エージェントを終了します。")
            break

        if not user_input:
            continue

        try:
            response = process_user_request(user_input)
            print(f"エージェント: {response}\n")
        except Exception as e:
            print(f"エラーが発生しました: {str(e)}\n")

if __name__ == "__main__":
    main()
utils.py
import os
import requests
from dotenv import load_dotenv

# .envファイルから環境変数を読み込む
load_dotenv()

# トークンとシークレットを取得
token = os.getenv("SWITCH_BOT_TOKEN")
client_secret = os.getenv("SWITCH_BOT_CLIENT_SECRET")

if not token:
    raise ValueError("SWITCH_BOT_TOKENが.envファイルに設定されていません")

# APIヘッダーを設定
headers = {
    "Authorization": token,
    "Content-Type": "application/json"
}

# デバイス一覧を取得
def get_devices():
    """Switchbotデバイスの一覧を取得"""
    res = requests.get(
        "https://api.switch-bot.com/v1.1/devices",
        headers=headers
    )
    return res.json()

# デバイスステータスを取得
def get_device_status(device_id):
    """デバイスのステータスを取得

    Args:
        device_id: デバイスID
    """
    url = f"https://api.switch-bot.com/v1.1/devices/{device_id}/status"
    res = requests.get(url, headers=headers)
    return res.json()

# デバイスを操作
def control_device(device_id, command, parameter="default"):
    """Switchbotデバイスを操作

    Args:
        device_id: デバイスID
        command: コマンド (例: "turnOn", "turnOff", "press")
        parameter: パラメータ (デフォルト: "default")
    """
    url = f"https://api.switch-bot.com/v1.1/devices/{device_id}/commands"
    data = {
        "command": command,
        "parameter": parameter
    }
    res = requests.post(url, headers=headers, json=data)
    return res.json()

# シーリングライトの明るさを設定
def set_ceiling_light_brightness(device_id, brightness):
    """シーリングライトの明るさを設定

    Args:
        device_id: デバイスID
        brightness: 明るさ (1-100)
    """
    if not 1 <= brightness <= 100:
        raise ValueError("明るさは1-100の範囲で指定してください")

    return control_device(device_id, "setBrightness", str(brightness))

# シーリングライトの色温度を設定
def set_ceiling_light_color_temp(device_id, color_temp):
    """シーリングライトの色温度を設定

    Args:
        device_id: デバイスID
        color_temp: 色温度 (2700-6500K) または "warm"/"cool"などの文字列
    """
    # 文字列指定の場合は数値に変換
    temp_map = {
        "warm": 2700,      # 暖色
        "暖色": 2700,
        "あたたかい": 2700,
        "neutral": 4500,   # 中性
        "昼白色": 4500,
        "cool": 6500,      # 寒色
        "white": 6500,
        "寒色": 6500,
        "": 6500,
    }

    if isinstance(color_temp, str):
        color_temp = temp_map.get(color_temp.lower(), 4500)

    if not 2700 <= color_temp <= 6500:
        raise ValueError("色温度は2700-6500の範囲で指定してください")

    return control_device(device_id, "setColorTemperature", str(color_temp))

if __name__ == "__main__":
    # デバイス一覧を表示
    devices = get_devices()
    print("=== Switchbotデバイス一覧 ===")
    print(devices)
requirements.txt
python-dotenv==1.0.0
requests==2.31.0
openai==1.54.0
slack-bolt==1.18.0
slack-sdk==3.26.0
schedule==1.2.0
.env_template
SWITCH_BOT_TOKEN=
SWITCH_BOT_CLIENT_SECRET=
SLACK_BOT_TOKEN=
SLACK_CHANNEL_ID=
OPENAI_API_KEY=
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?