本記事までの経緯
以下の機能を有するシステムを作成しようと試行錯誤しています。
- 現在の気圧を照会する
- 直近2時間のデータ推移をグラフで表示する
- 急な気圧変化があった場合に通知する
- 気圧の変わり目(上昇傾向→下降傾向、又はその逆)を通知する
- 直近1時間の変化量を確認出来る
それに伴い、ここまでいくつか記事を起こしてきました。
今回は上記の記事の続きから始める記事となりますので、ご興味のある方は是非ご一読下さい。。
使用ハードウェア
- RaspberryPi 3B+
- 自作温湿度、気圧センサモジュール
開発、実験用はラズパイ4を使用していますが、長時間運用時は省エネ意識でラズパイ3で稼働させます。
トピック別リンク
ちょっとボリュームもりもりになってしまったので、目的別にリンクを張っておきます。
- LINEBOTに現在の気温や気圧を教えてもらう
- LINEBOTに状況変化を通知してもらう
- 気圧データを活用する
- スプレッドシートでリアルタイム更新グラフを作りたい
- スプレッドシートで特定範囲のデータを別のシートに自動コピペするには?
- スプレッドシートのグラフを画像化したい
- LINEBOTで画像送信
- いいからソース見せろ!
LINEBOTをインターフェースにしてデータを取得してみよう
先ずはおなじみLINEのオウム返しBOTを利用して、特定のキーワードを受け取るとスプレットシートからデータを取ってきてリプライ出来るようにします。
LINEBOTに関してはラズパイ×flask×ngrokの構成とします。
以前、この構成でBOTを作ってみたので下記の記事をご参考までに。
では、BOTの本体となるプログラム、app.py
を改良し、データを取得してみましょう。
ワークシートの取得
def get_sheet():
gc = gspread.service_account(filename="line-bot-project1-hogehoge.json")
workbook = gc.open_by_key(SPREADSHEET_KEY)
sh = workbook.worksheet("シート1")
return sh
先ずはスプレッドシートワークブックより、実際に操作するシートを取得しましょう。filename="line-bot-hogehoge.json"
は発行された秘密鍵です。ファイル名は適宜変更を。
ワークシートからデータを取得、リプライメッセージ生成
def select_temp_and_humidity(sheet):
values = sheet.get_all_records()
last = values[-1]
return float(last["temp"]), float(last["humidity"])
def create_response_text():
sheet = get_sheet()
temp, humidity = select_temp_and_humidity(sheet)
return f"待ってました!\n現在の気温は{round(temp)}℃、湿度は{round(humidity)}%です~"
では、スプレッドシートから気温と湿度を取得してみましょう。
先程定義したget_sheet
で、シート情報をsheet変数に代入します。
sheet
をselect_temp_and_humidity
に渡して、最新のデータを取得します。
get_all_records()
でシートのデータを全て取得します。
シートのヘッダをキーにした辞書型で帰ってくるので、一番新しいデータ(一番後ろ)のデータを取り、欲しい数値のキーを指定します。
あとはcreate_response_text
で返す文字列に、先ほど取得したtemp
とhumidity
の値を埋め込みます。
LINEBOTでリプライ
@handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
common_messages = ["おはようございます", "どうしました?",
"無駄使いはだめですよ?", "今日は何を開発するんですか?", "変な事いわないでください!"]
my_comments = event.message.text
if "気温わかる?" in my_comments:
botRes = create_response_text()
else:
botRes = common_messages[random.randint(0, 4)]
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=botRes))
if __name__ == "__main__":
app.run()
では、「気温わかる?」という文字列をキーに、先ほど作成した気温と湿度を返してくれる関数を実行出来るようにしましょう。
プログラムが書けたらLINEでメッセージを送ってみましょう。
私**「気温わかる?」**
実行結果
問題なく最新データをリプライしてくれました!
気温、湿度、その他と任意の種類のデータを指定できるのでめっちゃ便利です。
では、今度は最新のデータが適正値から外れて居たら通知してくれる機能を作りましょう。
LINEBOTに状況変化を通知してもらおう
次に、しきい値を超えた際に通知を送る機能を実装してみましょう。
app.py
(オウム返しBOT)から一旦離れて前回作成したデータをスプレットシートに定期書き込みするプログラムをベースに作成します。
不快指数が適正範囲から外れたら通知させてみる
以下のサイトを参考に、しきい値を設定します。
ふむふむ、60~75の範囲であれば快適っぽいですね。
では測定値がこの範囲を外れた場合に通知を送らせてみましょう。
しきい値を設定する
ALART_DISC_MIN = 60
ALART_DISC_MAX = 75
def invalid_disc(disc):
return disc < ALART_DISC_MIN or ALART_DISC_MAX < disc
まずはしきい値を設定し、適正範囲を定義します。
75を超えたら暑い、60以下になったら寒いと通知させる設定です。
通知プログラムを考えよう
def send_temp_message(data, temp, disc):
if not invalid_disc(float(data[-1]["disconfort"])):
if disc < ALART_DISC_MIN:
messages = TextSendMessage(
text=f"寒くなって来たよ・・・\n今{round(temp)}℃です~")
line_bot_api.push_message(USER_ID, messages=messages)
if disc > ALART_DISC_MAX:
messages = TextSendMessage(
text=f"暑くなって来たよ・・・\n今{round(temp)}℃です~")
line_bot_api.push_message(USER_ID, messages=messages)
先程設定したinvalid_disc
の条件を満たしたら処理を実行しましょう。
引数についてはこの関数を実行するまでの処理でセンサから取得した生データと、スプレッドシートから取得したデータを渡しています。
data
の中身はget_all_records()
です。
5分おきに処理を通すことになるので、特に何もしなければ不快指数が閾値を外れている間、五分おきに通知するようになってしまいます。
一度通知してくれれば問題ないので、処理の内容を工夫しましょう。
if not invalid_disc(float(data[-1]["disconfort"])):
メッセージ送信処理に入る前に前回記録した数値を渡して、前回値がしきい値を超えていない場合のみメッセージを送るようにします。
もう一つ、この逆の条件を設定して不快指数が適正値に戻ったらその旨を教えてもらいましょう。
def send_conf_disc_message(data):
if invalid_disc(float(data[-1]["disconfort"])):
messages = TextSendMessage(text="快適な状態に戻ったよ!")
line_bot_api.push_message(USER_ID, messages=messages)
先程と逆の設定になりますね。ではこの処理をmain
関数に組み込んでみましょう。
main処理に組み込む
def main():
print("System enable ...")
time_buff = 0
count = 0
while True:
while(time.time() - time_buff < 1):
pass
time_buff = time.time()
count += 1
if (count == 1) or (count % 300) == 0:
sheet1 = get_sheet()
data = sheet1.get_all_records()
# 必要なセンサの生データをcreateしてget
temp, humidity, pressure, disc = create_data()
# 不快指数が適正値を外れたら実行
if invalid_disc(disc):
send_temp_message(data, temp, disc)
# 不快指数が適正の時実行
if not invalid_disc(disc):
send_conf_disc_message(data)
# スプシの更新
sheet_update(sheet1, column, temp, humidity,
disc, pressure, change_pre)
if __name__ == "__main__":
main()
こんな感じで無限ループをギュンギュン回して5分毎に
- 作業シートの取得
- センサからデータを取得
- 取得したデータがしきい値を外れていないか確認
- 終わったらスプシの更新
という処理を通していきます。処理を実行するたびにget_sheet
しているのですが、どうやら一度シートの認証して一定時間経つと接続できなくなるようで、エラーを吐いていたのでこの位置に持ってきています。
幸か不幸か超快適な季節なので、動作確認の為にセンサーに軽くドライヤでもあててから実行してみましょう!
実行結果
ちゃんと通知してくれました。そして扇風機を当ててセンサの温度を下げます。
ちゃんと戻ったことも教えてくれました。
スプシのデータを見て頂ければわかる通り、連続して通知しないようになっています。
上手くいったようなので、今度は気圧変化をお知らせしてもらいましょう!
2種類の変化をチェックしよう
気圧のデータを活かしてやってみたいことは次の2点です。
- 直近の急な気圧変化を通知する
- 気圧の傾向(下降→上昇など)が変わった際に通知する
温度の扱いに比べて若干ひねりが必要ですが、どう落とし込むか考えてみましょう!
直近の大きな気圧変化を通知する
直近一時間の気圧差を求めて、その差が大きければ通知するようにすれば問題なさそうです。
2日程実際にログ取ってみたのですが、開発やってるこの期間天気が穏やかすぎて思ったように急な変化は観測できなかったので、動作確認を兼ねて今回はしきい値を1hPaとしておきます。
ALART_PRE_MIN = -1.0
ALART_PRE_MAX = 1.0
def change_pressure_term(change_pre):
return change_pre < ALART_PRE_MIN or ALART_PRE_MAX < change_pre
不快指数の時の要領でしきい値を設定します。
引数のchange_pre
は現在の気圧と1時間前の気圧の比較値です。
これが簡単に出来るのがスプレッドシートを利用する利点ですね。
change_pre = pressure - (data[-12]["pressure"])
こんな感じで比較値を取っておきます。
def send_pre_message(data, change_pre):
before = float(data[-1]["change_pre"])
if not change_pressure_term(before):
if change_pre > ALART_PRE_MAX:
messages = TextSendMessage(
text=f"気圧が大きく上がったよ!\n直近で{round(change_pre, 1)}hPa変化したけど、何か変化あった?")
line_bot_api.push_message(USER_ID, messages=messages)
if change_pre < ALART_PRE_MIN:
messages = TextSendMessage(
text=f"気圧が大きく下がったよ!\n直近で{round(change_pre, 1)}hPa変化したから、お天気悪くなるかも。")
line_bot_api.push_message(USER_ID, messages=messages)
直近1時間の変化値を連続で取っているので、一度通知すると変化値の収束に時間がかかるので、こちらも不快指数の時にやった方法と同じく、連続では通知を送らないようにしておきます。
気圧傾向の変化を通知する
こちらは上記で取得した気圧の変化値を利用して、直近1時間の平均値を計算し、その結果を比較します。直近1時間の変化値の平均がマイナスなら降下傾向、逆なら上昇傾向です。
この平均値を比較して前回値と今回値がマイナスからプラスに変わったタイミングを通知させましょう。
def make_pre_ave(data):
ps = [d.get("change_pre") for d in data[-12:]]
ave = sum(ps)/len(ps)
return ave
スプシのデータを渡して、直近12個(5分おきに記録しているので12個で1時間とします)の気圧変化値の平均を取る処理です。
辞書型なので、内包表記でキーを指定しつつ12個前から1つ前までのデータを拾います。あとはこのデータの合計を個数で割ったら平均が出ますね。
def change_pre_state(before_pre_ave, pre_ave):
return (before_pre_ave <= 0 and 0 <= pre_ave) or (before_pre_ave >= 0 and 0 >= pre_ave)
あとは通知条件です。こんな感じで指定して、マイナスからプラス(又はその逆)になった事を条件としましょう。before_pre_ave
については後程。
メッセージ送信関数はもう似た感じなので端折ります。
ではこれらのプログラムをmain
関数に適用しましょう!
ここまでの関数を実装しよう!
実際に組み込むとこんな感じになりました。
def main():
print("System enable ...")
time_buff = 0
count = 0
before_pre_ave = 0
while True:
while(time.time() - time_buff < 1):
pass
time_buff = time.time()
count += 1
if (count == 1) or (count % 300) == 0:
sheet1 = get_sheet()
data = sheet1.get_all_records()
temp, humidity, pressure, change_pre, disc, pre_ave = create_dataset(
data)
if invalid_disc(disc):
send_temp_message(data, temp, disc)
if not invalid_disc(disc):
send_conf_disc_message(data)
if change_pressure_term(change_pre):
send_pre_message(data, change_pre)
if change_pre_state(before_pre_ave, pre_ave):
if(count != 1):
send_change_message(pre_ave)
sheet_update(sheet1, column, temp, humidity,
disc, pressure, change_pre)
before_pre_ave = pre_ave
if __name__ == "__main__":
main()
センサの生データからプログラム側で加工するデータも増えてきましたので、関数にまとめてデータを生成します。
def create_dataset(data):
tp, hm = get_temp(data)
pr = get_pressure()
ch_pr = pr - (data[-12]["pressure"])
dc = disconfort_index(tp, hm)
pr_ave = make_pre_ave(data)
return tp, hm, pr, ch_pr, dc, pr_ave
main
関数のループの処理の最後にbefore_pre_ave = pre_ave
しておきます。こうすることで次の処理に入った時にbefore_pre_ave
とpre_ave
の比較が可能となります。
staticな感じでbefore_pre_ave
を使う為に無限ループに入る前に0を代入しているので、プログラム起動直後は絶対通知が飛んでしまいます。初回のみこれを回避するため、if(count != 1)
を追記しています。
実行結果
ちょうどいい感じにしきい値を超えたので通知してくれました。まぁ1hpaぐらいは急な変化とは言えないと思うので2とか3に調整して使いましょう。
変化値の平均値が切り替わったところで通知してくれました。
なんだこいつ、賢くなってきたぞ!
グラフはどうするよ
某アウトドアウォッチの機能に気圧グラフ表示機能があります。
直近2時間の気圧グラフを表示してくれるみたいですので、これに寄せて行きましょう。
こちらのデータベースはスプレッドシートなので、いい感じのグラフは一瞬でつくれますが、一工夫必要そうです。
- 直近2時間のデータをグラフ化する
- 上記のグラフをリアルタイムに更新する
よくあるシートの1番下までグラフの範囲を指定する方法ではグラフが溜まりすぎます。また、範囲指定したとしても常に新しい行にデータが追加されるのでうまく表示できません。
もう少しうまい方法がありそうですが、私の場合は今回ご紹介する方法で実現しました。
そもそもデータが無限に溜まる
そうなんです、一日稼働させると288行、1週間で2000行。5分おきにget_all_records
しないといけませんし、処理も重くなりそうです。
1日分あれば十分そうですが、何かと使えそうなので今回は3日分残してそれ以上データが溜まらないようにします。
DATA_MAX = 864
column = len(data)
if column >= DATA_MAX:
delete_rows(sheet1)
column -= 1
三日分にあたる864行を最大値に設定し、get_all_records
した辞書型リストの量(行数)を取り、それが設定した最大値になったら一番古い行を消去します。
def delete_rows(sheet1):
sheet1.add_rows(1)
sheet1.delete_rows(2)
ヘッダを除いた一番上の行を消して、1行空白行を追加することで常時864行のデータがシートにとどまってくれます。
グラフが変だ!
841行目から865行目(直近24行=2時間)まで範囲指定してグラフ作ればはい完成!と思っていたのですが、1番上の行が消去される関係上、グラフの範囲がずれて更新されませんでした。
空白行まで範囲に追加すると更新はされますがデータが溜まる一方で、2時間の範囲を指定して更新できません。
さんざん時間を溶かした挙句、実現方法を思いつきました。
自動で別のシートに最新2時間データをコピペする
これならグラフのデータ範囲もずれないのでいけそうですね!
当然リアルタイム更新のグラフにしたいわけですから、この処理は自動化します。
まずはシート2を作成して、get_sheet関数でシート2を取得し、戻り値を追加して一気にシートをゲットできるようにしましょう。
def sheet_copy(sheet2, data, temp, humidity, disc, pressure, change_pre):
del(data[0:841])
df = pd.DataFrame(data)
sheet2.update([df.columns.values.tolist()] + df.values.tolist())
sheet2.update(f"A25:F25",
[[str(create_datetime()), temp, humidity, int(disc), round(pressure, 1), round(change_pre, 1)]])
実際に出来たプログラムは上記ですが、いざやってみると「シートの特定範囲のデータを別のシートにコピペ」したいだけだった所、データ型の違いでエラーが出たりうまく行かず大苦戦でした。公式ドキュメントを参照したところpandas
を利用してデータフレーム型に変換し、それをアップデートする方法が紹介されていたので利用しました。
main
関数内での処理順の関係上、ここでアップデートするデータは前回値となります。(一通り処理を通してからメインのシートにデータを追加する為)
なので、コピペと同時に今回メインのシートに記録する内容をこちらにも適用します。
この方法を使う時は
pip3 install pandas
からのimport
しておきましょう。
ではこちらをmain
関数にぶっこんでいきます。
完成したプログラムはこちら
def main():
print("System enable ...")
time_buff = 0
count = 0
before_pre_ave = 0
while True:
while(time.time() - time_buff < 1):
pass
time_buff = time.time()
count += 1
if (count == 1) or (count % 300) == 0:
sheet1, sheet2 = get_sheet()
data = sheet1.get_all_records()
column = len(data)
if column >= DATA_MAX:
delete_rows(sheet1)
column -= 1
temp, humidity, pressure, change_pre, disc, pre_ave = create_dataset(
data)
if change_pressure_term(change_pre):
send_pre_message(data, change_pre)
if invalid_disc(disc):
send_temp_message(data, temp, disc)
if not invalid_disc(disc):
send_conf_disc_message(data)
if change_pre_state(before_pre_ave, pre_ave):
if(count != 1):
send_change_message(pre_ave)
sheet_update(sheet1, column, temp, humidity,
disc, pressure, change_pre)
sheet_copy(sheet2, data, temp, humidity,
disc, pressure, change_pre)
before_pre_ave = pre_ave
if __name__ == "__main__":
main()
実行結果
シート1の選択範囲がしっかりシート2に反映されています!
5分おきにデータが勝手に更新され、それに連動してグラフが変化します。これは感動!うまく行ったようです!
気圧グラフだと変化がショボいので温室度と不快指数のグラフを作って動かしてみました。
以下はグラフが変化した瞬間を捉えた貴重な映像。
グラフ、サクッとみるにはどうする?
リアルタイム更新されるグラフが出来たところで、いつでも見れるようにしたいですね。Google driveにあるのでいつでも見れますが、どうせならワンアクションでサクッと画像で見れるほうがスマートです。
となると、ここで再びオウム返しBOT、app.py
の出番です!
そしてスプレッドシートのグラフは、画像データとしてURL公開が出来ます。
しかもグラフが更新されたら公開している画像にも反映してくれるという優れもの。これはいい予感しかしない。
早速実装していきましょう!
スプレッドシートのグラフを画像化
方法はめっちゃ簡単です。グラフの右上のメニューアイコンから「グラフを公開」を選択します。
赤枠でかこったドロップダウンを画像に変えて、その下にあるURLを利用して画像を使えます。グラフの画像化と利用する準備は整いました。
後はオウム返しBOTがこのグラフ画像を送れるようにしましょう!
LINEで画像を送ろう
app.py
を改良して画像送信機能を追加します。
まずは新たなモジュールを追加しましょう。
# ここに"ImageSendMessage"を追加する
from linebot.models import (
MessageEvent, TextMessage, TextSendMessage, ImageSendMessage
)
あとは特定のキーワードに対して画像を返すようにします。
if "気圧グラフある?" in my_comments:
messages = ImageSendMessage(original_content_url=PRESS_URL,
preview_image_url=PRESS_URL)
line_bot_api.push_message(USER_ID, messages=messages)
time.sleep(2)
botRes = "送った!直近2時間の変化が見れるよ"
ImageSendMessage()の引数に、先ほどのグラフ画像URLを指定します。各種認証ファイルを格納しているjson
ファイルにまとめて記載しておけばこのように変数で放り込めるので便利です。
ベースがオウム返しBOTなので処理の流れとしては
メッセージ受信→何らかの処理→リプライ
が一連の流れとなるので、メッセージ送信に至る前に画像をとっとと送ってしまおうというのがこちらの処理です。送信処理が短いスパンで行われるので一応sleep
をかけてリプライを遅らせます。
では実際にグラフがちゃんと出来てるか、LINEBOTちゃんに聞いてみましょう。
実行結果
なんかご機嫌斜めのようですが、見事に画像でグラフを送ってくれました!!
折角なので、ついでに温湿度と不快指数のグラフも作ってみましたがそちらもしっかり送ってくれています。
5分毎にリアルタイム更新グラフなので、いつでもどこでも、直近2時間のデータ変化を参照できます。
- 現在の気圧を照会する
- 直近2時間のデータ推移をグラフで表示する
- 急な気圧変化があった場合に通知する
- 気圧の変わり目(上昇傾向→下降傾向、又はその逆)を通知する
- 直近1時間の変化量を確認出来る
これでようやく全ての機能+αぐらいの実装が完了しました!
厳密には5は記事内に書いていないですが、直近の大きな気圧変化を通知するの項目でご紹介した気圧の比較値を、ワークシートからデータを取得、リプライメッセージ生成に当てはめるだけなので割愛しています。
これにて超かわいい多機能気圧計の完成です!!
最後までお読みいただき、ありがとうございます!!
おまけとして現段階のソースコードを記載しておきます。何かの参考になれば幸いです。
完成したプログラム
BOTとやり取りするプログラム
以前作った下記のシステムと統合
from flask import Flask, request, abort
import gspread
import random
from linebot import (
LineBotApi, WebhookHandler
)
from linebot.exceptions import (
InvalidSignatureError
)
from linebot.models import (
MessageEvent, TextMessage, TextSendMessage, ImageMessage, ImageSendMessage
)
import json
import subprocess
import time
import datetime
file = open("info.json", "r")
info = json.load(file)
SPREADSHEET_KEY = info["SPREADSHEET_KEY"]
CHANNEL_ACCESS_TOKEN = info["CHANNEL_ACCESS_TOKEN"]
USER_ID = info["USER_ID"]
WEBHOOK_HANDLER = info["WEBHOOK_HANDLER"]
PRESS_URL = info["PRESS_URL"]
TEMP_URL = info["TEMP_URL"]
line_bot_api = LineBotApi(CHANNEL_ACCESS_TOKEN)
handler = WebhookHandler(WEBHOOK_HANDLER)
record = [""] * 6
botRes = []
app = Flask(__name__)
def create_datetime():
date = datetime.datetime.now()
date = date.replace(microsecond=0)
return date
def init_record(record):
record = [""] * 6
return(record)
def get_sheet():
gc = gspread.service_account(
filename="hogefuga.json")
workbook = gc.open_by_key(SPREADSHEET_KEY)
sh2 = workbook.worksheet("シート2")
return sh2
def get_sheet_all():
gc = gspread.service_account(
filename="foobar.json")
workbook = gc.open_by_key(SPREADSHEET_KEY)
sh2 = workbook.worksheet("シート2")
sh3 = workbook.worksheet("シート3")
return sh2, sh3
def select_temp_and_humidity(sheet2):
values = sheet2.get_all_records()
last = values[-1]
return float(last["temp"]), float(last["humidity"])
def select_disc(sheet2):
values = sheet2.get_all_records()
last = values[-1]
return float(last["disconfort"])
def select_press(sheet2):
values = sheet2.get_all_records()
last = values[-1]
return float(last["pressure"])
def followUp_press(sheet2):
values = sheet2.get_all_records()
now = values[-1]["pressure"]
before = values[-12]["pressure"]
return (float(now - before))
def create_response_text():
sheet2 = get_sheet()
temp, humidity = select_temp_and_humidity(sheet2)
return f"待ってました!\n現在の気温は{round(temp)}℃、湿度は{round(humidity)}%です~"
def create_response_disc():
sheet2 = get_sheet()
disc = select_disc(sheet2)
disc_message = ["ちょっと暑い", "暑すぎ!!\n服にカビ生やさないでよ",
"ちょっと寒いかも", "寒すぎ!!", "快適だよ~"]
if 80 > disc > 75:
return disc_message[0]
elif disc > 80:
return disc_message[1]
elif 55 < disc < 60:
return disc_message[2]
elif disc < 55:
return disc_message[3]
else:
return disc_message[4]
def create_response_pre():
sheet2 = get_sheet()
pressure = select_press(sheet2)
return f"今のの気圧ね、{round(pressure, 1)}hPaだよ!"
def create_response_followUp():
sheet2 = get_sheet()
follow_up = followUp_press(sheet2)
if -1 <= follow_up <= 0:
return f"直近1時間の変化は{round(follow_up, 1)}hPaだね。\n緩やかに下降中。"
elif 0 <= follow_up <= 1:
return f"直近1時間の変化は{round(follow_up, 1)}hPaだね。\n緩やかに上昇中。"
elif 1 < follow_up < 4:
return f"直近1時間の変化は{round(follow_up, 1)}hPaだね。\n急上昇中!お天気変わった?。"
elif -1 > follow_up > -4:
return f"直近1時間の変化は{round(follow_up, 1)}hPaだね。\n急降下中!お天気崩れてない?。"
elif 4 <= follow_up:
return f"直近1時間の変化は{round(follow_up, 1)}hPaだね。\nって、超急上昇中だよ!!"
elif -4 >= follow_up:
return f"直近1時間の変化は{round(follow_up, 1)}hPaだね。\nって、超急降下!!天気荒れてない!?"
elif follow_up == 0:
return "特に変化してないなぁ"
def sheet_update():
sheet2, sheet3 = get_sheet_all()
values = sheet2.get_all_records()
length = sheet3.get_all_records()
last = values[-1]
sheet3.update(f"A{len(length) + 2}:K",
[[str(create_datetime()), float(last["temp"]), float(last["humidity"]), float(round(last["pressure"])), float(last["change_pre"]), str(record[4]), str(record[1]), str(record[2]), str(record[3]), float(record[0]), str(record[5])]])
def create_message(my_comments):
global botRes
global record
common_messages = ["気圧のこと色々わかるよ!", "気温と湿度が分かるよ", "どうしたん?",
"無駄使いはあかんで?", "今日は何を開発するん?", "変な事言わんとって!", "日々進化してるよ"]
wait_messages = ["行ってらっしゃい!気を付けてね!",
"早く帰ってきてね!", "財布忘れてない??", "ちゃんとマスク持った?"]
# データゲット
if "気温わかる?" in my_comments:
botRes = create_response_text()
elif "快適?" in my_comments:
botRes = create_response_disc()
elif "気圧調べて" in my_comments:
botRes = create_response_pre()
elif "状況は?" in my_comments:
botRes = create_response_followUp()
# お留守番実行
elif "行ってきます" in my_comments:
botRes = wait_messages[random.randint(0, 3)]
subprocess.Popen(["python3", "facial_req_bot.py"],
cwd="./facial_recognition")
# グラフゲット
elif "気圧グラフある?" in my_comments:
messages = ImageSendMessage(original_content_url=PRESS_URL,
preview_image_url=PRESS_URL)
line_bot_api.push_message(USER_ID, messages=messages)
time.sleep(2)
botRes = "送った!直近2時間の変化が見れるよ"
elif "気温グラフある?" in my_comments:
messages = ImageSendMessage(original_content_url=TEMP_URL,
preview_image_url=TEMP_URL)
line_bot_api.push_message(USER_ID, messages=messages)
time.sleep(2)
botRes = "マシン使いが荒い・・・送った!\n直近二時間のデータまとめといたよ。"
# チートシート
elif "コマンド忘れた" in my_comments:
botRes = "え~!!はよ覚えてよ・・・\n\n気温わかる?:温湿度を見ます\n快適?:不快指数を見ます\n気圧調べて:気圧を見ます\n状況は?:直近一時間の気圧変化を見ます\n行ってきます:お留守番モードに入ります\n気圧or気温グラフある?:リアルタイムグラフ送ります\n釣れた:釣果記録モードに入ります\nおやすみ:本体の電源を切ります\n\nリアリティに欠けるのでちゃんと覚えて!"
elif my_comments in "おやすみ":
botRes = "おやすみなさい!また明日ね~!"
# コモンメッセージ
else:
botRes = common_messages[random.randint(0, 6)]
return botRes
@app.route("/callback", methods=['POST'])
def callback():
signature = request.headers['X-Line-Signature']
body = request.get_data(as_text=True)
app.logger.info("Request body: " + body)
try:
handler.handle(body, signature)
except InvalidSignatureError:
print("Invalid signature. Please check your channel access token/channel secret.")
abort(400)
return 'OK'
@handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
my_comments = event.message.text
comment = create_message(my_comments)
line_bot_api.reply_message(
event.reply_token, TextSendMessage(text=comment))
if comment == "おやすみなさい!また明日ね~!":
subprocess.run(["sudo", "shutdown", "now"])
if __name__ == "__main__":
app.run()
スプレッドシートへの記録と、ユーザへの通知をするプログラム
from logging import makeLogRecord
import gspread
from oauth2client.service_account import ServiceAccountCredentials
from linebot import LineBotApi
from linebot.models import TextSendMessage
import json
import smbus
import RPi.GPIO as GPIO
import dht11
import time
import datetime
import pandas as pd
file = open("info.json", "r")
info = json.load(file)
CHANNEL_ACCESS_TOKEN = info["CHANNEL_ACCESS_TOKEN"]
SPREADSHEET_KEY = info["SPREADSHEET_KEY"]
USER_ID = info["USER_ID"]
line_bot_api = LineBotApi(CHANNEL_ACCESS_TOKEN)
# 気圧センサ各レジスタアドレス
ADDR = 0x5C
CTRL_REG1 = 0x20
PRE_OUT_XL = 0x28
# 書き込みデータ
WRITE_REG1 = 0x90
MASK_DATA = 0x80
# 不快指数適正範囲最大値、最低値
ALART_DISC_MIN = 60
ALART_DISC_MAX = 75
# 気圧変化閾値
ALART_PRE_MIN = -1.0
ALART_PRE_MAX = 1.0
# 三日分のデータ
DATA_MAX = 864
GPIO.setwarnings(True)
GPIO.setmode(GPIO.BCM)
# 気温センサインスタンス
instance = dht11.DHT11(pin=18)
# 気圧センサ初期化
bus = smbus.SMBus(1)
bus.write_byte_data(ADDR, CTRL_REG1, WRITE_REG1)
# 定期的にシートを更新しないとエラーになる?
def get_sheet():
scope = ["https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive"]
credentials = ServiceAccountCredentials.from_json_keyfile_name(
"hogefuga.json", scope)
gc = gspread.authorize(credentials)
workbook = gc.open_by_key(SPREADSHEET_KEY)
sh1 = workbook.worksheet("シート1")
sh2 = workbook.worksheet("シート2")
return sh1, sh2
# 不快指数が範囲外
def invalid_disc(disc):
return disc < ALART_DISC_MIN or ALART_DISC_MAX < disc
# 気圧変化が大きい場合
def change_pressure_term(change_pre):
return change_pre < ALART_PRE_MIN or ALART_PRE_MAX < change_pre
# 気圧の変化傾向が変わった場合
def change_pre_state(before_pre_ave, pre_ave):
return (before_pre_ave <= 0 and 0 <= pre_ave) or (before_pre_ave >= 0 and 0 >= pre_ave)
# タイムスタンプ生成
def create_datetime():
date = datetime.datetime.now()
date = date.replace(microsecond=0)
return date
# 温湿度データ取得
def get_temp(data):
result = instance.read()
if result.is_valid():
temp = result.temperature
humidity = result.humidity
print("Temperature: %-3.1f C" % result.temperature)
print("Humidity: %-3.1f %%" % result.humidity)
else:
# たまにバグってデータ取れないのでその時は前回値を使う
temp = float(data[-1]["temp"])
humidity = float(data[-1]["humidity"])
print("Temperature: %-3.1f C" % temp)
print("Humidity: %-3.1f %%" % humidity)
return temp, humidity
# 不快指数算出
def disconfort_index(temp, humidity):
temp = float(temp)
humidity = float(humidity)
return 0.81*temp + 0.01*humidity*(0.99*temp-14.3)+46.3
# 気圧データ取得
def get_pressure():
p_data = bus.read_i2c_block_data(ADDR, PRE_OUT_XL | MASK_DATA, 3)
pre = (p_data[2] << 16 | p_data[1] << 8 | p_data[0]) / 4096.0
print("%.2f hpa" % pre)
return pre
# 気圧変化平均値の生成
def make_pre_ave(data):
ps = [d.get("change_pre") for d in data[-12:]]
ave = sum(ps)/len(ps)
return ave
# スプシに送信するデータをまとめて生成
def create_dataset(data):
tp, hm = get_temp(data)
pr = get_pressure()
ch_pr = pr - (data[-12]["pressure"])
dc = disconfort_index(tp, hm)
pr_ave = make_pre_ave(data)
return tp, hm, pr, ch_pr, dc, pr_ave
# 不快指数が適正範囲外になったら通知
def send_temp_message(data, temp, disc):
if not invalid_disc(float(data[-1]["disconfort"])):
if disc < ALART_DISC_MIN:
messages = TextSendMessage(
text=f"寒くなって来たよ・・・\n今{round(temp)}℃です~")
line_bot_api.push_message(USER_ID, messages=messages)
if disc > ALART_DISC_MAX:
messages = TextSendMessage(
text=f"暑くなって来たよ・・・\n今{round(temp)}℃です~")
line_bot_api.push_message(USER_ID, messages=messages)
# 不快指数が適正範囲外から戻ったら通知
def send_conf_disc_message(data):
if invalid_disc(float(data[-1]["disconfort"])):
messages = TextSendMessage(text="快適な状態に戻ったよ!")
line_bot_api.push_message(USER_ID, messages=messages)
# 気圧が大きく変化したら通知
def send_pre_message(data, change_pre):
before = float(data[-1]["change_pre"])
if not change_pressure_term(before):
if change_pre > ALART_PRE_MAX:
messages = TextSendMessage(
text=f"気圧が大きく上がったよ!\n直近で{round(change_pre, 1)}hPa変化したけど、何か変化あった?")
line_bot_api.push_message(USER_ID, messages=messages)
if change_pre < ALART_PRE_MIN:
messages = TextSendMessage(
text=f"気圧が大きく下がったよ!\n直近で{round(change_pre, 1)}hPa変化したから、お天気悪くなるかも。")
line_bot_api.push_message(USER_ID, messages=messages)
# 気圧の傾向が変化したら通知
def send_change_message(pre_ave):
if pre_ave >= 0:
messages = TextSendMessage(
text="気圧が上昇傾向に変わったよ!。")
line_bot_api.push_message(USER_ID, messages=messages)
if pre_ave <= 0:
messages = TextSendMessage(
text="気圧が下降傾向に変わったよ!お天気の崩れないか心配・・・")
line_bot_api.push_message(USER_ID, messages=messages)
# スプシの古いデータ削除、消した分の行を追加
def delete_rows(sheet1):
sheet1.add_rows(1)
sheet1.delete_rows(2)
# スプシにデータを書き込み
def sheet_update(sheet1, column, temp, humidity, disc, pressure, change_pre):
sheet1.update(f"A{column + 2}:F",
[[str(create_datetime()), temp, humidity, int(disc), round(pressure, 1), round(change_pre, 1)]])
# 別シートに2時間分のデータをコピペ
def sheet_copy(sheet2, data, temp, humidity, disc, pressure, change_pre):
del(data[0:841])
df = pd.DataFrame(data)
sheet2.update([df.columns.values.tolist()] + df.values.tolist())
sheet2.update(f"A25:F25",
[[str(create_datetime()), temp, humidity, int(disc), round(pressure, 1), round(change_pre, 1)]])
def main():
print("System enable ...")
time_buff = 0
count = 0
before_pre_ave = 0
while True:
while(time.time() - time_buff < 1):
pass
time_buff = time.time()
count += 1
if (count == 1) or (count % 300) == 0:
sheet1, sheet2 = get_sheet()
data = sheet1.get_all_records()
column = len(data)
if column >= DATA_MAX:
delete_rows(sheet1)
column -= 1
temp, humidity, pressure, change_pre, disc, pre_ave = create_dataset(
data)
if change_pressure_term(change_pre):
send_pre_message(data, change_pre)
if invalid_disc(disc):
send_temp_message(data, temp, disc)
if not invalid_disc(disc):
send_conf_disc_message(data)
if change_pre_state(before_pre_ave, pre_ave):
if(count != 1):
send_change_message(pre_ave)
sheet_update(sheet1, column, temp, humidity,
disc, pressure, change_pre)
sheet_copy(sheet2, data, temp, humidity,
disc, pressure, change_pre)
before_pre_ave = pre_ave
if __name__ == "__main__":
main()
おわりに
今回はのシステムを作成しながら、あれやこれやとハマりどころが次々現れたり、機能を増やすほど機能が競合したり、エラー吐いたり、ソースコードが長くなるとコードを追うのが難しくなるので極力簡潔に書く工夫をしたり・・・
思い返すだけでも私自身がエラー吐きそうなぐらい大変でしたが、いよいよ機能から逆算して技術選択やプログラムを書く、といった流れを勉強できたのかなと思います。
LINEBOTが思ったよりめちゃめちゃ便利で、ローカルのハードを操作するコントローラとして今後も重宝しそうです。
機能を色々詰め込んだので、だんだんコードが読みづらくなってきました。
いよいよクラス、構造体を自前で作る必要が出てきたので、より綺麗なコードを書いて今回のプログラムも改善出来ればと思います。
まだ開発途中ですが、こんな機能を追加してみたり、今後もこの調子で楽しく開発しつつスキルアップして行きたいと思います。