Python de GUI で、マルチスレッド
ということで、前回は、tkinterは、メインスレッドじゃないとNG的なとこまではやったので、マルチスレッドのメインスレッドじゃ無い方で、シリアル通信してみます。
題材が、電圧をA/Dコンバータ➡Serial通信で送ってくるというやつなので、通信というより、Serial(COM)経由で、データを送りつけてくる仕様となります。
PythonでSerial通信
Pythonのシリアル通信は、その名の通り、
import serial
で、OKですね。
では使ってみましょう。
最初にすべてを。
import socket
import time
import serial
import serial.tools.list_ports
from hiraide import hiraide
# シリアル通信でデータの送受信を行う
class hiraMiniSerial:
def __init__(self):
pass
#---------------------------------------
def __enter__(self):
print("前処理")
return self
#---------------------------------------------
def __exit__(self, exc_type, exc_value, traceback):
print("後処理")
pass
#---------------------------------------------
def __del__(self):
print("__del__")
pass
#---------------------------------------------
bou = 115200
#bou = 38400
ser = None
use_port = None
#---------------------------------------------
#
#---------------------------------------------
def uart_write_read(self,w_data, r_size):
self.ser.write(w_data) # Write
print('SSend: '+ str(w_data))
r_data = self.ser.read_until(size=r_size) # Read
print('SRecv: ' + str(r_data))
return r_data
#---------------------------------------------
# 有効なCOMポートを自動的に探して返す
#---------------------------------------------
def search(self):
#print(serial)
coms = serial.tools.list_ports.comports()
print("coms",coms)
comlist = []
for com in coms:
comlist.append(com.device)
ct = 0
for comc in com:
print("com[" + str(ct) + "]::" + str(comc) + "::")
ct = ct + 1
if str(com[1]).startswith("Silicon Labs CP210x USB to UART Bridge"):
self.use_port = com
return True
elif str(com[1]).startswith("CP2104 USB to UART Bridge Controller"):#RaspberryPi経由
self.use_port = com
return True
elif str(com[1]).startswith("USB Serial Port"):#M5StickCはこんなアバウトなやつやってん。^^;FT232RLモジュール?
self.use_port = com
return True
elif str(com[1]).startswith("M5Stack Intf"):#M5StickCのRaspberryPi経由
self.use_port = com
return True
else:
#print("##",com[1],"##");
continue
return False
#---------------------------------------------
# 接続
#---------------------------------------------
def connect(self):
#print("self.use_port",self.use_port)
#print("self.bou",self.bou)
print("#---------------------------")
print("port:",self.use_port)
print("bou :",self.bou)
print("#---------------------------")
try:
self.ser = serial.Serial(self.use_port[0], self.bou)
return True
except Exception as e:
return False
#---------------------------------------------
def write(self,data):
try:
data = data + "\n"
#self.ser.write(data) # 出力
self.ser.write(str(data).encode('utf-8')) # 出力
return True
except Exception as e:
return False
#---------------------------------------------
def writeRead(self,data,r_size):
try:
self.ser.write(data)
r_data = self.ser.read_until(size=r_size) # Read
return r_data
except Exception as e:
return ""
#---------------------------------------------
def read(self,r_size):
try:
r_data = self.ser.read_until(size=r_size) # Read
return r_data
except Exception as e:
return ""
#---------------------------------------------
def readline(self):
try:
r_data = self.ser.readline() # ReadLine
return r_data
except Exception as e:
return "*"
#---------------------------------------------
def close(self):
try:
self.ser.close()
return True
except Exception as e:
return False
#---------------------------------------------
def setParent(self,p):
self.parent = p
#---------------------------------------------
if __name__ == '__main__':
for w in range (100):
with hiraMiniSerial() as A:
if A.search():
if A.connect():
ct=0
while True:
ct = ct + 1
try:
s = A.readline()
ss = s.decode()
if(ss == "START\n"):
print("start !")
ct=0
stttime = (time.time()) #秒単位
continue
elif(ss == "STOP\n"):
eddtime = (time.time()) #秒単位
devtime = eddtime - stttime
print("stop !")
#----------------------------------------
else:
print("data:",s)
except Exception as e:
pass
else:
time.sleep(1)
print("Not Found COM")
まず、searchってやつを呼んでますよね。
これは、SerialのLISTから、合致するものをセレクトしている部分です。
SerialのLISTは、
coms = serial.tools.list_ports.comports()
からとってきます。
そして、
for com in coms:
でまわして、
com及び、com[0]~com[2]が、
coms [<serial.tools.list_ports_common.ListPortInfo object at 0x000001FE0959BDD0>]
com COM3 - USB Serial Port (COM3)
com[0]::COM3::
com[1]::USB Serial Port (COM3)::
com[2]::USB VID:PID=0403:6001 SER=A50285BIA::
みたいな感じになります。(::は区切りとして出力)
このcom[1]をみて、自分が思った名称かどうかを比較して判断します。
とはいうものの、Amazonで【FT232RLモジュール】あたりを買うと、だいたい、
Windowsにおいては、【USB Serial Port (COM**)】って名前になるので、
まぁ、複数これがあったら、通信してみて判断するしかないんですけどね。
参考までに、いろいろやってみた結果の文字列です。
Silicon Labs CP210x USB to UART Bridge #家にあるLaser加工機のCOM
CP2104 USB to UART Bridge Controller #RaspberryPi経由
USB Serial Port #M5StickC
M5Stack Intf #M5StickCのRaspberryPi経由
まぁ今回は通信させるまではやってません。
使うときは、COMはたぶん1コなので。
Serial通信サンプル結果
前処理
coms [<serial.tools.list_ports_common.ListPortInfo object at 0x000001EAD136BCD0>]
com COM3 - USB Serial Port (COM3)
com[0]::COM3::
com[1]::USB Serial Port (COM3)::
com[2]::USB VID:PID=0403:6001 SER=A50285BIA::
#---------------------------
port: COM3 - USB Serial Port (COM3)
bou : 115200
#---------------------------
start !
data: b' 973\n'
data: b' 981\n'
data: b' 180\n'
data: b' 0\n'
data: b' 47\n'
~~省略~~
data: b'1023\n'
data: b'1023\n'
stop !
GUIへの組み込み
で、このCOMをどうするかというと、
COMの受信をスレッド化して、その値をGUI部分で受け取って表示しようっていうことになります。
では、メインのSRCから。
import time
import threading
import multiprocessing
import tkinter as TKTK
from tkinter import messagebox
from adMeterCtrl import adMeterCtrl
#-----------------------------------------
class hiraGui3:
#---------------------------------------
title = "TTTitle"
geometry = "600x200+200+400"
AR = []
TEXTAR = []
#---------------------------------------
def clicked(self):
messagebox.showinfo("self.1","self.2")
#--------------------------
def AD_METER_Exec(self):
self.AD = adMeterCtrl()
self.AD.setParent(self)
time.sleep(2) # 2秒待つ。
self.AD.goCOM() # 基本、もどってこない
#--------------------------
def preExec(self):
adMeterCtrl
R = threading.Thread(target = self.AD_METER_Exec)
R.start()
#--------------------------
def raedFromCOM(self,str):
print("str:",str)
self.TEXTAR[0]["text"] = str
#--------------------------
def exec(self):
self.T = TKTK.Tk()
self.T.title( self.title )
self.T.geometry( self.geometry )
n = len(self.AR)
for w in range(n):
#----------------------------------------
if(self.AR[w]["type"] == "BTN"):
btn_1 = TKTK.Button(
self.T
,text=self.AR[w]["text"]
,command=self.AR[w]["com"]
)
btn_1.pack(expand=True)
#----------------------------------------
elif(self.AR[w]["type"] == "LBL"):
lbl_1 = TKTK.Label(
self.T
,text=self.AR[w]["text"]
)
lbl_1.pack(expand=True)
self.TEXTAR.append(lbl_1)
#----------------------------------------
self.T.mainloop()
#--------------------------
def setTitle(self,v):
self.title = v
#--------------------------
def setGeometry(self,v):
self.geometry = v
#--------------------------
#-------------------------------------------------------
if __name__ == '__main__':
GUI = hiraGui3()
GUI.setTitle("★タイトルですよ★")
GUI.setGeometry("600x400+200+400")
o2 = {
"type":"LBL"
,"text":"らべらべらべる"
}
o = {
"type":"BTN"
,"text":"牡丹だよ"
,"com": GUI.clicked
}
GUI.AR.append(o)
GUI.AR.append(o2)
GUI.preExec()
GUI.exec()
では、順をおって。
GUI = hiraGui3()
GUI.setTitle("★タイトルですよ★")
GUI.setGeometry("600x400+200+400")
クラスを生成して、タイトルと、Windowの大きさのための変数を設定。
設定は、変数への代入だけですね。
o2 = {
"type":"LBL"
,"text":"らべらべらべる"
}
o = {
"type":"BTN"
,"text":"牡丹だよ"
,"com": GUI.clicked
}
GUI.AR.append(o)
GUI.AR.append(o2)
これはGUIの部品を外だしにしたくてこのような書き方をしてるだけです。
あとから、この内容にそって設定しますね。
GUI.preExec()
これによって、スレッドを作成して、スレッド側を実行させます。
実際には、adMeterCtrlクラスを生成し、goCOM関数を呼び出しています。
ただし、メインが準備する前に動作開始してもなんなので、2秒待っています。
そして、、、
GUI.exec()
にて、実行開始です。
実際には、tkinterでWindowを生成し、mainloop関数を呼び出します。
raedFromCOM関数は、adMeterCtrlクラスのインスタンスから呼ばれることになります。
Serial読み込みを統括するクラス
前の項で出た、adMeterCtrlクラスです。
import time
import threading
import multiprocessing
import tkinter as TKTK
from tkinter import messagebox
from hiraMiniSerial import hiraMiniSerial
#-----------------------------------------
class adMeterCtrl:
#---------------------------------------
def setParent(self,v):
self.parent = v
print("parent",self.parent)
#---------------------------------------
def __init__(self):
pass
#---------------------------------------
def __enter__(self):
#print("前処理")
return self
#--------------------------
def __exit__(self, exc_type, exc_value, traceback):
#print("後処理")
pass
#--------------------------
def __del__(self):
pass
#--------------------------
def goCOM(self):
stttime = 0
eddtime = 0
devtime = 0
with hiraMiniSerial() as A:
if A.search():
if A.connect():
ct=0
while True:
ct = ct + 1
try:
s = A.readline()
ss = s.decode()
if(ss == "START\n"):
ct=0
stttime = (time.time()) #秒単位
self.parent.readFromCOM("START")
continue
elif(ss == "STOP\n"):
eddtime = (time.time()) #秒単位
devtime = eddtime - stttime
comstr = "STTOP:" + str(ct) + ":" + str(devtime)
self.parent.readFromCOM( comstr)
#----------------------------------------
else:
self.parent.readFromCOM("DATA:" + ss)
except Exception as e:
self.parent.readFromCOM("ERR:EXCEPTION")
else:
self.parent.readFromCOM("ERR:CONNECT ERROR")
else:
time.sleep(1)
self.parent.readFromCOM("ERR:NOT FOUND COM")
#-------------------------------------------------------
if __name__ == '__main__':
pass
こんな感じで。
まぁ普通に動作したので、あとは、GUIをそれっぽく完成させます。
長くなったので、、つづく