LoginSignup
9
5

Pythonのpyserialとthreadingでリアルタイムなシリアル通信をする。

Last updated at Posted at 2023-11-19

はじめに

Raspbeiry Pi Pico等の電子デバイスをシリアル通信(UART)で制御する場合、Windows PCであれば、定番のTera Termを使えばよいが、あえてPythonでシリアル通信をする方法がないか?を調べてみたので記事にする。

目次

pyserial

Pythonでシリアル通信するライブラリにpyserialがある。pip install pyserialでライブラリをインストールする。pyserialはRTS/CTSのフローコントロールもできるよう。今回はTx/Rxのみ使う。ドキュメントは下記。

戻る

ポート番号のリストとポートのオープン

ポート番号のリストはserial.tools.list_ports.comports()で取得する。取得したリストで、.deviceを指定するとWindowsであればCOM番号を取得できる。Linuxだと/dev/tty。

port_list.py
import serial
import serial.tools.list_ports

for port in serial.tools.list_ports.comports():
	print(f'# {port = } / {port.device = }')

# port = <serial.tools.list_ports_common.ListPortInfo object at 0x00000XXXXXXXXXXX> / port.device = 'COM1'
# port = <serial.tools.list_ports_common.ListPortInfo object at 0x00000YYYYYYYYYYY> / port.device = 'COM2'

ポートのオープンは、serial.Serialを使う。パラメータportに先ほど取得したポート番号を設定する。その他boudrateparityの値を設定すれば、シリアル通信のポートがOpenされる。

port_open.py
import serial
import serial.tools.list_ports

# 'COM2' 9600bps Parityなしの場合
Serial_Port=serial.Serial(port='COM2', baudrate=9600, parity= 'N')

戻る

シリアル通信の送信・受信

シリアル通信の送信受信はwrite():送信、read():受信でできる。データの送受信はバイナリデータで行うため、文字列を送受信する場合はencode()/decode()する必要がある。read()は、指定したbyte数分だけデータを受信する。readline()を使えば、改行コードまでのデータを続けて受信する。注意点として、指定されたbyte数(改行)分シリアル通信で受信するまでread()/readline()内で処理が留まる。ただし、ポートのオープン時にtimeoutを指定していれば、timeoutで処理を抜けることもできる。

tx_rx.py
import serial
import serial.tools.list_ports

# 'COM2' 9600bps Parityなしの場合
Serial_Port=serial.Serial(port='COM2', baudrate=9600, parity= 'N')

#送信(tx)
data=input()+'\r\n'
data=data.encode('utf-8')
Serial_Port.write(data)

#受信(rx)
data=Serial_Port.readline() # 1byte受信なら data=Serial_Port.read(1)
data=data.strip()
data=data.decode('utf-8')
print(data)

戻る

シリアル通信例

シリアル通信の送受信をリアルタイムに実施できるよう、threadingを使って、送信と受信を同時並行処理できるようにしている。例では、接続先の機器が\r\nの改行コード単位で処理することを想定して、送信時に改行コード\r\nを付与しているが、改行コード不要であれば、input()の後の\r\nは削ればOK。

RaspberryPiでUART制御する際、/dev/ttyS0を自動検出しないので、下記行を追加。RaspberryPiでない場合は不要。

if '/dev/ttyAMA0' in serial_ports.values():
    serial_ports[str(len(serial_ports))]='/dev/ttyS0'
serial_test.py
import os
import serial
import serial.tools.list_ports
import threading

#----------------------
# データを送信
#----------------------
def serial_write():
    global Serial_Port
    
    while(1):
        if Serial_Port !='':
            data=input()+'\r\n'
            data=data.encode('utf-8')
            Serial_Port.write(data)

#----------------------
# データを受信
#----------------------
def serial_read():
    global Serial_Port
    while(1):
        if Serial_Port !='':
            #data=Serial_Port.read(1)
            data=Serial_Port.readline()
            data=data.strip()
            data=data.decode('utf-8')
            print(data)

#----------------------
# シリアルポートをOpen
#----------------------
def serial_open():
    global Serial_Port
    
    #portリストを取得
    serial_ports={}
    for i,port in enumerate(serial.tools.list_ports.comports()):
        serial_ports[str(i)]=port.device
    
    #RaspberryPiのminiUART検出できないので、/dev/ttyAMA0があれば自動的に/dev/ttyS0を追加
    if '/dev/ttyAMA0' in serial_ports.values():
        serial_ports[str(len(serial_ports))]='/dev/ttyS0'

    port_val = serial_ports[ input(f'ポート番号を選んでください。{serial_ports}:') ]
    boud_val = int(input('ボーレートbpsを数値で入力してください。:'))
    prty_val = input(f'パリティーを選んでください。[N:None, O:Odd, E:Even]:')
    
    Serial_Port=serial.Serial(port=port_val, baudrate=boud_val, parity= prty_val)
    print(f'open{port_val}/{boud_val}bps/parity:{prty_val}')


if __name__ == '__main__':

    Serial_Port=''
    
    #port open
    serial_open()
    
    thread_1 = threading.Thread(target=serial_write)
    thread_2 = threading.Thread(target=serial_read)

    thread_1.start()
    thread_2.start()

戻る

以上

9
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
5