はじめに
Pythonのpyserialとthreadingでリアルタイムなシリアル通信をする。で作ったコードにKivyでGUIを付けてみた。下図が作成したアプリ。Raspbeiry Pi Pico等の電子デバイスをシリアル通信(UART)で制御するため用。
kivyで使ったウィジェット
今回kivyで使ったウィジェットはButton
、Spinner
、TextInput
。kivyについてはPythonのGUIライブラリKivyを使うを参照。
Spinner
でポート番号(PortNo)、ボーレート(Speed(bps))、パリティー(Parity)を選択できるようにして、Openボタンでシリアル通信を開始。ポート番号は、PortNoをクリックしたときにリアルタイムにポート番号をスキャンする形にした。
データ送信は、TextInput
で設定、EnterキーもしくはSendOutボタンで送信できるようにした。
送受信したデータはTextInput
に出力。1/30秒(1秒30フレーム)ごとに更新するようにした。出力する文字の上限は最大50000文字で調整。
作成したコード
作成したコードは下記。送信、受信とも改行単位で通信する前提のため、改行なくリアルタイムに通信する場合は、self.Serial_Port.write
に設定するパラメータの調整と、self.Serial_Port.readline
をself.Serial_Port.read
に変更必要。
グローバル変数の排他処理など作り込みは甘いが、それなりに動作することは確認。pythonファイルにkivyコードも埋め込んでいる。
RaspberryPiでUART制御する際、/dev/ttyS0を自動検出しないので、下記行を追加。RaspberryPiでない場合は不要。
if '/dev/ttyAMA0' in self.root.ids.spinner_port.values:
self.root.ids.spinner_port.values.append('/dev/ttyS0')
from kivy.app import App
from kivy.lang import Builder
from kivy.clock import Clock
from kivy.uix.boxlayout import BoxLayout #boxのレイアウト
import japanize_kivy # https://github.com/momijiame/japanize-kivy 日本語化
import time
import serial
import serial.tools.list_ports
import threading
#kivy GUI
Layout = Builder.load_string('''
#第一階層
BoxLayout:
orientation:'vertical'
#Serial設定===============================
BoxLayout:
orientation:'horizontal'
size_hint_y:0.05 #y軸のサイズ
# Port
Spinner:
id :spinner_port
size_hint_x :0.25 #x軸のサイズ
text :'PortNo'
values :''
on_press :app.port_list() # Spinnerを押されたとき
# Speed
Spinner:
id :spinner_speed
size_hint_x :0.25 #x軸のサイズ
text :'Speed(bps)'
values :'9600','38400','115200'
# Parity
Spinner:
id :spinner_parity
size_hint_x :0.25 #x軸のサイズ
text :'Parity'
values :'N:None','O:Odd','E:Even'
#Open
Button:
id:button_open
size_hint_x :0.25 #x軸のサイズ
background_color:(0,1,0,1)
text:'Open'
on_press:app.serial_open()
#送信データ設定==========================
BoxLayout:
orientation:'horizontal'
size_hint_y:0.05 #y軸のサイズ
# Text
TextInput:
id:textinput_sendout
text: ''
font_size:15
size_hint_x :0.90 #x軸のサイズ
on_text:app.enter_out()
# SendOut
Button:
id:button_sendout
size_hint_x :0.10 #x軸のサイズ
background_color:(0,1,0,1)
text:'SendOut'
on_press:app.send_out()
#送受信データ出力========================
TextInput:
id:textinput_serial
text: ''
font_size:15
size_hint_y:0.95 #y軸のサイズ
''')
#--------------------
# Serial通信
#--------------------
class Serial_app(App):
#Global
Serial_Port=''
OutPut=''
Thread_tx=''
Thread_rx=''
Running='on'
Cnt_enter_out=0
#----------------------
# Kivy初期化
#----------------------
def build(self):
self.title = 'Serial_app'
#データ送信受信関数を設定
self.Thread_tx = threading.Thread(target=self.serial_write)
self.Thread_rx = threading.Thread(target=self.serial_read)
self.Thread_tx.start()
self.Thread_rx.start()
#Kivyの周期関数を設定
Clock.schedule_interval(self.update_gui, 1/30)
return Layout
#----------------------
# Kivy終了
#----------------------
def on_stop(self):
print('Please EnterKey.')
self.Running='off'
#----------------------
# 1/30 sec 毎に画面更新
#----------------------
def update_gui(self, dt):
self.root.ids.textinput_serial.text = self.OutPut
#50000文字以上になると古い文字から削除
if len(self.OutPut)>50000:
self.OutPut = self.OutPut[50000:]
#----------------------
# SendOutボタンでデータ送信
#----------------------
def send_out(self):
self.Serial_Port.write((self.root.ids.textinput_sendout.text+'\r\n').encode('utf-8'))
#----------------------
# データ入力でEnterでデータ送信
# Enterで2回入ってくるので偶数回の時だけ処理する。#2回入る理由確認中。
#----------------------
def enter_out(self):
if self.root.ids.textinput_sendout.text.endswith('\n'):
if self.Cnt_enter_out%2==0:
self.root.ids.textinput_sendout.text.strip()
self.Serial_Port.write((self.root.ids.textinput_sendout.text+'\r\n').encode('utf-8'))
self.root.ids.textinput_sendout.text=''
self.Cnt_enter_out+=1
#----------------------
# thread_tx(データを送信)
#----------------------
def serial_write(self):
while(1):
if self.Running == 'on':
if self.Serial_Port !='':
data=input()+'\r\n'
data=data.encode('utf-8')
self.Serial_Port.write(data)
else:
time.sleep(0.1)
else:
break
#----------------------
# thread_rx(データを受信)
#----------------------
def serial_read(self):
while(1):
if self.Running == 'on':
if self.Serial_Port !='':
data=self.Serial_Port.readline() #1行受信
data=data.decode('utf-8')
self.OutPut+=data
else:
time.sleep(0.1)
else:
break
#----------------------
# シリアルポートのlist
#----------------------
def port_list(self):
self.root.ids.spinner_port.values.clear()
for port in serial.tools.list_ports.comports():
self.root.ids.spinner_port.values.append(f'{port.device}')
#RaspberryPiのminiUART検出できないので、/dev/ttyAMA0があれば自動的に/dev/ttyS0を追加
if '/dev/ttyAMA0' in self.root.ids.spinner_port.values:
self.root.ids.spinner_port.values.append('/dev/ttyS0')
#----------------------
# シリアルポートをOpen
#----------------------
def serial_open(self):
port_val = self.root.ids.spinner_port.text
boud_val = self.root.ids.spinner_speed.text
prty_val = self.root.ids.spinner_parity.text[:1] # 'N:None','O:Odd','E:Even'のN/O/Eを抜き出す。
#OpenPort
if self.Serial_Port =='':
try:
self.Serial_Port=serial.Serial(port=port_val, baudrate=boud_val, parity= prty_val)
self.OutPut = f'OpenPort({port_val},{boud_val},{prty_val})'
except:
self.OutPut = 'The parameter is not available.'
if __name__ == '__main__':
Serial_app().run()
以上