前回の記事:【Python】Arduino で温度センサーから室温を取得して画面に表示する(前編) の続きです。
Python 側ではシリアル通信モジュールを利用して、USB シリアル通信から室温データを取得します。
取得は思ったよりも簡単でしたが、インターフェースが USB なのでデバイスの抜き差しに関するコーティングがそこそこ大変でした。
やりたいこと
- Arduino で温度センサーから室温を取得
- USB シリアル通信で RaspberryPi に室温データを送る
- Python で受信して画面に室温を表示する ← 今ここ!
後半で用意するもの
- Arduino Duemilanove と温度センサー一式
- Raspberry pi 3 Model B Rev 1.2 - Raspbian 9.6 (Stretch) - Python 3.5.3
こだわりポイント
- 起動時に USB が挿さっていない(シリアル通信が行われていない)場合は気温表示を無効化
- 表示中に USB が抜かれた場合は直ちに気温表示を無効化
- USB が刺さった(シリアル通信が開始・再開した)場合は直ちに表示を有効化
- USB はどのポートに挿しても問題なく認識する
- そもそも USB 抜き差し程度の例外エラーで Python プログラムが止まることはあり得ない
GitHub(Version 3.1)
処理フロー
- プログラム開始
- デバイスがあるか検索
- シリアルポートを初期化(デバイスがあれば)
- 通信を開始し、約 1 秒ごとに気温を取得
- 通信を終了後にプログラム終了
プログラム開始
シリアル通信の状態を管理するため、変数 self.ser_init
を定義します。
True のときは通信準備完了(通信中でない場合も含む)、False のときは通信準備中です。
# センサー表示クラス
class Sensor(Frame):
# コンストラクタ
def __init__(self, master):
# 通信準備中
self.ser_init=False
デバイスがあるか検索
dev_search()
で RaspberryPi に接続中のデバイスリストを取得します。
その中から Arduino Duemilanove(FT232R USB UART)があればセンサーデバイス名(/dev/ttyUSBx)を返却します。該当するものが無ければ空白を返します。
参考:python の pyserial で 自動でarduino unoに接続する - Qiita
# センサーデバイスを検索
def dev_search():
dev=""
# デバイスリストを取得
devices=serial.tools.list_ports.comports()
for device in devices:
# センサーデバイス名を取得
if device.usb_description()=="FT232R USB UART":
dev=device[0]
# センサーデバイス名を返却
return dev
シリアルポートを初期化(デバイスがあれば)
dev_search()
がデバイス名を返した場合はそのポートに接続を試みます。
問題なければ self.ser_init
変数を True にして通信準備完了状態に変更します。
例外エラー発生時は何もせず、次のタイミングで再接続を試みます。
また、通信中にデバイスが取り外された(通信準備完了かつ、認識されていない)場合は一旦接続を終了します。
# 表示を更新
def update(self):
# センサーデバイスを探す
dev=sensor_setting.dev_search()
try:
# 通信準備中かつ、センサーデバイスが OS に認識されている場合
if not self.ser_init and dev!="":
# シリアル通信の初期化
self.ser=serial.Serial(dev, sensor_setting.bps, timeout=sensor_setting.timeout)
# 通信準備完了
self.ser_init=True
# 例外時は何もしない
except:
pass
# 通信準備完了かつ、認識されていない場合
if dev=="" and self.ser_init:
# シリアル通信を終了
self.ser.close()
# 通信準備中
self.ser_init=False
通信を開始し、約 1 秒ごとに気温を取得
self.ser.open()
により、通信中となります。
バッファから 1 行受信し、改行コード削除、utf-8 へ変換、整数値変換を経て画面に表示します。
例外エラー発生時は無効表示に切り替えます。
try:
# シリアル通信を開始
if self.ser.is_open==False:
self.ser.open()
# 1行受信(b'気温¥r¥n' の形式で受信)
serval=self.ser.readline(self.ser.inWaiting())
# 改行コードを削除(b'気温')
serval=serval.strip()
# バイナリ形式から文字列に変換(気温)
serval=serval.decode("utf-8")
# 気温を更新
self.wst2.configure(text="{0}°c".format(round(float(serval))))
# 例外時
except:
# 気温表示を無効化
self.wst2.configure(text="-°c")
# 1秒後に再表示
self.master.after(1000, self.update)
通信を終了後にプログラム終了
プログラム終了時にクラスが破棄されますが、その前にデストラクタが呼び出されます。
そのタイミングでシリアル通信を終了させます。
# デストラクタ
def __del__(self):
# 通信準備完了の場合
if self.ser_init:
# シリアル通信を終了
self.ser.close()
ソースコード全体
from tkinter import Tk, Frame, Label
import os
import sensor_setting
import serial
import sys
import time
# センサー表示クラス
class Sensor(Frame):
# コンストラクタ
def __init__(self, master):
# 親クラスのコンストラクタ
super().__init__(master, bg="white")
# スペーサ(センサー表示上部の間隔調整)
self.wsp=Label(self, bg="white")
self.wsp.pack(pady=20)
# 温度表示
self.wst1=Label(self, text="気温", bg="white", font=("Sans", 20, "bold"))
self.wst1.pack(anchor="e", padx=20, pady=5)
self.wst2=Label(self, text="-°c", bg="lightblue", font=("Carlito", 40, "bold"))
self.wst2.pack(anchor="e", padx=20)
# 湿度表示
self.wsh1=Label(self, text="湿度", bg="white", font=("Sans", 20, "bold"))
self.wsh1.pack(anchor="e", padx=20, pady=5)
self.wsh2=Label(self, text="-%", bg="silver", font=("Carlito", 40, "bold"))
self.wsh2.pack(anchor="e", padx=20)
# 気圧表示
self.wsp1=Label(self, text="気圧", bg="white", font=("Sans", 20, "bold"))
self.wsp1.pack(anchor="e", padx=20, pady=5)
self.wsp2=Label(self, text="1013", bg="white", font=("Carlito", 40, "bold"))
self.wsp2.pack(anchor="e", padx=20)
# 通信準備中
self.ser_init=False
# デストラクタ
def __del__(self):
# 通信準備完了の場合
if self.ser_init:
# シリアル通信を終了
self.ser.close()
# 表示を更新
def update(self):
# センサーデバイスを探す
dev=sensor_setting.dev_search()
try:
# 通信準備中かつ、センサーデバイスが OS に認識されている場合
if not self.ser_init and dev!="":
# シリアル通信の初期化
self.ser=serial.Serial(dev, sensor_setting.bps, timeout=sensor_setting.timeout)
# 通信準備完了
self.ser_init=True
# 例外時は何もしない
except:
pass
# 通信準備完了かつ、認識されていない場合
if dev=="" and self.ser_init:
# シリアル通信を終了
self.ser.close()
# 通信準備中
self.ser_init=False
try:
# シリアル通信を開始
if self.ser.is_open==False:
self.ser.open()
# 1行受信(b'気温¥r¥n' の形式で受信)
serval=self.ser.readline(self.ser.inWaiting())
# 改行コードを削除(b'気温')
serval=serval.strip()
# バイナリ形式から文字列に変換(気温)
serval=serval.decode("utf-8")
# 気温を更新
self.wst2.configure(text="{0}°c".format(round(float(serval))))
# 例外時
except:
# 気温表示を無効化
self.wst2.configure(text="-°c")
# 1秒後に再表示
self.master.after(1000, self.update)
# 単独処理の場合
def main():
# メインウィンドウ作成
root=Tk()
# メインウィンドウタイトル
root.title("Sensor")
# メインウィンドウサイズ
root.geometry("1024x768")
# メインウィンドウの最大化
root.attributes("-zoom", "1")
# 常に最前面に表示
root.attributes("-topmost", True)
# メインウィンドウの背景色
root.configure(bg="white")
# Sensor クラスのインスタンスを生成
sensor=Sensor(root)
# 画面に配置
sensor.pack(expand=1, fill="y")
# センサー表示の更新を開始(update メソッド呼び出し)
sensor.update()
# メインループ
root.mainloop()
# import sensor による呼び出しでなければ単独処理 main() を実行
if __name__ == "__main__":
main()
from tkinter import Tk, messagebox
import serial.tools.list_ports
import sys
bps="9600" # 通信速度は固定
timeout=None # タイムアウトなし
# センサーデバイスを検索
def dev_search():
dev=""
# デバイスリストを取得
devices=serial.tools.list_ports.comports()
for device in devices:
# センサーデバイス名を取得
if device.usb_description()=="FT232R USB UART":
dev=device[0]
# センサーデバイス名を返却
return dev
# import sensor_setting による呼び出しでなければ単独処理を実行
if __name__ == "__main__":
# デバイスを検索
dev=dev_search()
# メッセージボックスだけ表示する
root=Tk()
root.withdraw()
# 検索結果を表示
if dev!="":
messagebox.showinfo("結果", "センサーデバイス名:\n" + dev)
else:
messagebox.showerror("結果", "センサーデバイスが見つかりませんでした")
# 終了
sys.exit()
成果
課題
- Python プログラムを複数プロセス同時に起動すると、古いプロセスが新しいプロセスにシリアル通信を奪われる
- 秒単位で目紛しく気温が変わるので 1 分単位に丸めた方が良い?
- エラー発生時の原因がわかりにくい