はじめに
Raspbeiry Pi Pico等の電子デバイスをシリアル通信(UART)で制御する場合、Windows PCであれば、定番のTera Termを使えばよいが、あえてPythonでシリアル通信をする方法がないか?を調べてみたので記事にする。
目次
pyserial
Pythonでシリアル通信するライブラリにpyserial
がある。pip install pyserial
でライブラリをインストールする。pyserialはRTS/CTSのフローコントロールもできるよう。今回はTx/Rxのみ使う。ドキュメントは下記。
ポート番号のリストとポートのオープン
ポート番号のリストはserial.tools.list_ports.comports()
で取得する。取得したリストで、.device
を指定するとWindowsであればCOM番号を取得できる。Linuxだと/dev/tty。
import serial
import serial.tools.list_ports
for port in serial.tools.list_ports.comports():
print(f'# {port = } / {port.device = }')
# port = <serial.tools.list_ports_common.ListPortInfo object at 0x00000XXXXXXXXXXX> / port.device = 'COM1'
# port = <serial.tools.list_ports_common.ListPortInfo object at 0x00000YYYYYYYYYYY> / port.device = 'COM2'
ポートのオープンは、serial.Serial
を使う。パラメータport
に先ほど取得したポート番号を設定する。その他boudrate
とparity
の値を設定すれば、シリアル通信のポートがOpenされる。
import serial
import serial.tools.list_ports
# 'COM2' 9600bps Parityなしの場合
Serial_Port=serial.Serial(port='COM2', baudrate=9600, parity= 'N')
シリアル通信の送信・受信
シリアル通信の送信受信はwrite()
:送信、read()
:受信でできる。データの送受信はバイナリデータで行うため、文字列を送受信する場合はencode()/decode()
する必要がある。read()
は、指定したbyte数分だけデータを受信する。readline()
を使えば、改行コードまでのデータを続けて受信する。注意点として、指定されたbyte数(改行)分シリアル通信で受信するまでread()/readline()
内で処理が留まる。ただし、ポートのオープン時にtimeout
を指定していれば、timeoutで処理を抜けることもできる。
import serial
import serial.tools.list_ports
# 'COM2' 9600bps Parityなしの場合
Serial_Port=serial.Serial(port='COM2', baudrate=9600, parity= 'N')
#送信(tx)
data=input()+'\r\n'
data=data.encode('utf-8')
Serial_Port.write(data)
#受信(rx)
data=Serial_Port.readline() # 1byte受信なら data=Serial_Port.read(1)
data=data.strip()
data=data.decode('utf-8')
print(data)
シリアル通信例
シリアル通信の送受信をリアルタイムに実施できるよう、threading
を使って、送信と受信を同時並行処理できるようにしている。例では、接続先の機器が\r\n
の改行コード単位で処理することを想定して、送信時に改行コード\r\n
を付与しているが、改行コード不要であれば、input()の後の\r\n
は削ればOK。
RaspberryPiでUART制御する際、/dev/ttyS0を自動検出しないので、下記行を追加。RaspberryPiでない場合は不要。
if '/dev/ttyAMA0' in serial_ports.values():
serial_ports[str(len(serial_ports))]='/dev/ttyS0'
import os
import serial
import serial.tools.list_ports
import threading
#----------------------
# データを送信
#----------------------
def serial_write():
global Serial_Port
while(1):
if Serial_Port !='':
data=input()+'\r\n'
data=data.encode('utf-8')
Serial_Port.write(data)
#----------------------
# データを受信
#----------------------
def serial_read():
global Serial_Port
while(1):
if Serial_Port !='':
#data=Serial_Port.read(1)
data=Serial_Port.readline()
data=data.strip()
data=data.decode('utf-8')
print(data)
#----------------------
# シリアルポートをOpen
#----------------------
def serial_open():
global Serial_Port
#portリストを取得
serial_ports={}
for i,port in enumerate(serial.tools.list_ports.comports()):
serial_ports[str(i)]=port.device
#RaspberryPiのminiUART検出できないので、/dev/ttyAMA0があれば自動的に/dev/ttyS0を追加
if '/dev/ttyAMA0' in serial_ports.values():
serial_ports[str(len(serial_ports))]='/dev/ttyS0'
port_val = serial_ports[ input(f'ポート番号を選んでください。{serial_ports}:') ]
boud_val = int(input('ボーレートbpsを数値で入力してください。:'))
prty_val = input(f'パリティーを選んでください。[N:None, O:Odd, E:Even]:')
Serial_Port=serial.Serial(port=port_val, baudrate=boud_val, parity= prty_val)
print(f'open{port_val}/{boud_val}bps/parity:{prty_val}')
if __name__ == '__main__':
Serial_Port=''
#port open
serial_open()
thread_1 = threading.Thread(target=serial_write)
thread_2 = threading.Thread(target=serial_read)
thread_1.start()
thread_2.start()
以上