Hass.io
は、Raspberry Pi などでホームオートメーションハブを構築するシステムです。>> Home Assistant
IFTTT
だとかGoogle Home
とか、IoT家電とか、そもそも ラズパイ でゴニョゴニョできるナニとかを連携して、DIY的近未来が楽しめます。日没時刻に「日が沈みます。お洗濯の取り込みは大丈夫ですか?」と Google Home に発言させるのも比較的簡単で。嫁さんにも好評です。
WEB API で センサーを作る
正攻法的なSensor
と言えば、温度センサーなどの物理的なカンジの情報取得でしょう。x℃になったら冷房を弱にする、というように、センサーが検知した値をトリガーに、何かアクションを実行するのがイメージしやすいですね。
でも、Hass.ioには天気予報センサーというものがデフォルトで利用できます。これは 天気予報の Web API を定期的に Pull して取得したデータを返すセンサーです。
おもしろいですね。「雨が降りそうです、お洗濯ものを取り込んでください」と言うギミックが作れそうです。
でも気に食わないのは、このデフォルトの天気予報センサー(Yrと言います)がノルウェーの天気予報サービスだという点です。予報精度が良いか悪いかはわかりませんが、日本の天気をノルウェーに聞きに行くのは馬鹿げています。
仕掛け
兎にも角にも、自分が利用したい WEB API をセンサーとして利用するのは面白そうです。Hass.io のセンサーの仕様を調べてみましょう。
- Yr はコンポーネントだ
- Yr はPythonで書かれている
- どうやら
config\custom_components\sensor\
配下に配置するとセンサーとして認識されるようだ
まぁ正攻法では Python をベンキョーするということだ。
Yr のコードを丁寧に読んでたら、2日目ぐらいで意味が分かってきた。
非同期処理(つまりWEBの応答には時間が要するので、その間、他の処理を止めない実行方法)はちょいと後回しにして、そのほかの Sensor としての要件を満たすサンプルコードを書いてみました。一分ごとにテキトーな数字と文字をランダムで返します。
WEB API にアクセスする処理に差し替え、埋め込めば、定期的にインターネットから情報を取得するセンサーにカスタムできるはずです。(その完成品は、また次回お披露目します)
※ほかにも天気予報情報と連動させる方法は沢山あります。例えばIFTTTからHass.ioのWebhookをキックしても良いでしょうし、MQTTで中継させることもできるでしょう。
サンプルソース
より実用的なHass.io Yahoo 気象情報APIセンサーもご参照ください
"""
定期的に非同期でデータをPULLするセンサー
参照ソースは Yr
https://github.com/home-assistant/home-assistant/blob/master/homeassistant/components/sensor/yr.py
"""
import asyncio
import voluptuous as vol
import homeassistant.helpers.config_validation as confValue
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_LATITUDE, CONF_LONGITUDE, CONF_MONITORED_CONDITIONS, ATTR_ATTRIBUTION, CONF_NAME
)
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import (async_track_utc_time_change, async_call_later)
import random #デモデータ作成用
# ------------------------------------------------------------------------------
# 設定
DEFAULT_NAME = 'Sensor Base'
CONF_ATTRIBUTION = "開発ツール > Status などに表示される帰属"
# データ取得間隔(秒)
FETCH_INTERVAL = 60;
# このコンポーネントでセンシングするセンサーのリストを定義
# スキーマ設定、エンティティ定義で参照される
SENSOR_TYPES = {
'num': ['Number', 'num'],
'msg': ['String', None],
}
# スキーマ設定
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_MONITORED_CONDITIONS, default=['a']):
vol.All(confValue.ensure_list, vol.Length(min=1), [vol.In(SENSOR_TYPES)]),
vol.Optional(CONF_LATITUDE): confValue.latitude,
vol.Optional(CONF_LONGITUDE): confValue.longitude,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): confValue.string,
})
# ------------------------------------------------------------------------------
# エンティティ定義
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
latitude = config.get(CONF_LATITUDE, hass.config.latitude)
longitude = config.get(CONF_LONGITUDE, hass.config.longitude)
name = config.get(CONF_NAME)
# [monitored_conditions](正規化済み)のリスト毎にセンサーを生成
# エンティティリスト entities に詰める
entities = []
for sensor_type in config[CONF_MONITORED_CONDITIONS]:
entities.append(mySensorEntities(name, sensor_type))
# 非同期エンティティリストとして登録
async_add_entities(entities)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# データ取得Classを作成
dataFetcher = myDataFetcher(hass, entities)
# 時間がパターンと一致した場合に起動する同期リスナーを追加します。
# (必要か?)
# http://dev-docs.home-assistant.io/en/master/api/helpers.html
async_track_utc_time_change(hass, dataFetcher.updating_devices, second=0)
# 最初の取得と同期、ループの開始
await dataFetcher.fetching_data()
# ------------------------------------------------------------------------------
# エンティティ定義
# ほぼ定型?
class mySensorEntities(Entity):
def __init__(self, name, sensor_type):
self.client_name = name
self._state = None
self.type = sensor_type
self._name = SENSOR_TYPES[sensor_type][0]
self._unit_of_measurement = SENSOR_TYPES[sensor_type][1]
@property
def name(self):
return '{} {}'.format(self.client_name, self._name)
@property
def state(self):
return self._state
@property
def should_poll(self):
return False
@property
def device_state_attributes(self):
return {
ATTR_ATTRIBUTION: CONF_ATTRIBUTION,
}
@property
def unit_of_measurement(self):
return self._unit_of_measurement
# ------------------------------------------------------------------------------
# データ取得Class
class myDataFetcher:
def __init__(self, hass, entities):
self.data = {}
self.hass = hass
self.entities = entities
# データを取得し、センサーデータを更新、ループする
async def fetching_data(self, *_):
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# データを取得して self.data に詰める
# 例示としてテキトーなデータを生成
# 本当はAPIアクセスなどの非同期処理
numA = random.randint(0, 50)
self.data = {
'num_raw': int(numA),
'msg_raw': "Message " + str(numA),
}
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# 後処理
# センサークラスに更新を発火
await self.updating_devices()
# 指定秒後に再実行
async_call_later(self.hass, FETCH_INTERVAL, self.fetching_data)
# センサーデータを更新する
async def updating_devices(self, *_):
# データが無いなら何もしない
if not self.data:
return
# センサーごとに更新データをupdateStateTasksに詰める
updateStateTasks = []
for checkEntity in self.entities:
newState = None
if checkEntity.type == 'num':
newState = int(self.data['num_raw'])
elif checkEntity.type == 'msg':
newState = str(self.data['msg_raw'])
# 値に変化があれば更新タスクに追加
if newState != checkEntity._state:
checkEntity._state = newState
updateStateTasks.append(checkEntity.async_update_ha_state())
# updateStateTasks があれば更新を発火
if updateStateTasks:
await asyncio.wait(updateStateTasks, loop=self.hass.loop)
configuration.yaml の記述
pullsensorsample.py
というファイル名で保存したなら、設定はこんな感じ
sensor:
- platform: pullsensorsample
monitored_conditions:
- num
- msg