TL;DR
pySerialを使ってシリアルポートから受信する際に,組込系の受信割り込みめいた処理を行いたいときはTornadoのioloopを使ってポーリングすると似たようなことが出来る.
なにがしたいか
pySerialを使うとOS非依存でシリアルポートを使えて便利です.
ただ、基本的にはポーリングでの利用がメインになるので1,任意の条件をみたした時にコールバック関数を呼ぶ的な動作は単体だとできません.
そこで,Tornadoをスケジューラとして使ってマルチタスクっぽいのを実現するメモで書いたTornadoでマルチタスクっぽいのを実現する方法を使って,コールバック関数っぽい処理を実現してみました.
(本当はこういうことしない方がそれっぽいのかもしれませんが,組み込み系のUART受信割り込みを処理するのに慣れた身だとこういう書き方が一番楽なんです・・・)
ソースコード
基本的には,Tornadoのioloopを使ってpySerialを一定周期で読みに行って,条件をみたしたらコールバック関数を呼ぶ形です.
下の例では,終端文字('\n'
)を受信したらコールバック関数が呼ばれるようになっています2.
# -*- coding: utf-8 -*-
import time
import tornado.ioloop
import serial
class SerialPolling(object):
def __init__(self, port, baudrate, delimiter=b'\n'):
self.serial_port = serial.Serial(port, baudrate)
self.delimiter = delimiter
self.ioloop = tornado.ioloop.IOLoop.current()
self.read_buffer = b""
self.LOOP_FREQ = 1000
self.loop()
def loop(self):
self.ioloop.add_timeout(time.time()+1/self.LOOP_FREQ, self.loop)
self.read()
def read(self):
self.read_buffer += self.serial_port.read(self.serial_port.inWaiting())
% 以下条件確認.任意の条件に変更可能
for segment in self.read_buffer.split(self.delimiter)[:-1]:
self.callback(segment)
self.read_buffer = self.read_buffer.split(self.delimiter)[-1]
def callback(self, recieved_segment):
print("Receieved: ", recieved_segment)
if __name__ == "__main__":
serial_porlling = SerialPolling("COM16", 115200)
tornado.ioloop.IOLoop.current().start()
前回の記事で書いたように,ioloop.add_timeout
を使ってloop()
がLOOP_FREQ
Hzで実行されるようにしてあります.
loop()
から呼ばれたread()
は,まずpySerialの受信バッファの内容をすべてread_buffer
に移したあとで条件確認とコールバック関数の呼び出しを行います.
今回の例では,split()
関数を用いて終端文字で文字列を分割,分割後の要素ごとにコールバック関数を呼び出すようにしてあります.
(read_buffer
内が"abc\ndef\ngh"
であれば,callback("abc")
,callback("def")
が呼び出され,read_buffer
内に"gh"
が残る.)
この個所の条件を任意に変更することで,様々なプロトコルに対応可能です.