はじめに
Raspberry Piでシリアル通信したいと思い、USBシリアル変換を使った通信を試しました。
「Raspberry Piでシリアル通信試してみた」を参考にさせてもらいました。
コマンド送信→コマンド受信の順で処理することを前提とし、受信処理に受信タイムアウトを追加しました。
環境
- Raspberry Pi3
- Python 3.7.2
- pySerial 3.4
インストール
- 下記のコマンドを入力し、pySerialをインストールします。
- pipenvのインストールは割愛します。
$ pipenv install pyserial
USBシリアル変換
FT232 USBシリアル変換ケーブル を使いました。
FT232チップを搭載したものならば、ドライバの追加なしで認識するようです。
以下のコマンドで接続先を確認してください。
$ ls -la /dev/ttyUSB*
1台目であれば接続先は、'/dev/ttyUSB0'になると思います。
注:
Rasberry PiとPC(Windows/Mac/Linux)で通信する場合は、クロスケーブルを使用してください。
Rasberry PiもPCなのでPC間はクロスケーブル、通信機器との接続はストレート(が多い)です。
コード
シリアル通信クラス
- コマンド送信→コマンド受信の順で処理することを前提としています。(タイムアウト付き)
- 明示的にシリアルポートをオープン/クローズできます。(個人的な実装で必要だったため)
# -*- coding: utf-8 -*-
import serial
import time
import threading
"""
シリアル通信クラス
"""
class SampleComm:
# 初期化
def __init__(self):
# オープンフラグ
self.isPortOpen = False
# 受信データ
self.recvData = bytearray()
# イベント生成
self.event = threading.Event()
# データ受信待ち(タイムアウト付き[sec])
def recv(self, timeout=3):
# タイムアウト用時間取得
time_start = time.time()
time_end = time_start
# スレッド待ちイベントクリア
self.event.clear()
# 受信データクリア
self.recvData.clear()
# 受信結果 True:成功 False:失敗(タイムアウト)
result = False
# データ受信待ち
while not self.event.is_set():
# タイムアウトチェック
time_end = time.time()
if (time_end - time_start > timeout):
# データ送受信停止して失敗(タイムアウト)とする
result = False
self.stop()
print("timeout:{0}sec".format(timeout))
break
# 受信データ読み取り
buff = self.comm.read()
# 受信データ判定
if len(buff) > 0:
# 受信データ追加
self.recvData.extend(buff)
# (仮)¥nを受信済なら成功とする
if (self.recvData.find(b'\n')) >= 0:
# データ送受信停止して成功とする
result = True
self.stop()
break
# 結果を返す
return (result, self.recvData)
# データ送信
def send(self, data):
self.comm.write(data)
# データ送受信停止
def stop(self):
self.event.set()
# シリルポートオープン
def open(self, tty, baud='115200'):
try:
self.comm = serial.Serial(tty, baud, timeout=0.1)
self.isPortOpen = True
except Exception as e:
self.isPortOpen = False
return self.isPortOpen
# シリアルポートクローズ(明示的に閉じる)
def close(self):
self.stop()
if (self.isPortOpen):
self.comm.close()
self.isPortOpen = False
if __name__ == "__main__":
# シリアルを開く
comm = SampleComm()
comm.open('/dev/ttyUSB0', '115200')
# データ送信
comm.send('test'.encode())
# データ受信(タイムアウト=10sec)
result, data = comm.recv(10)
print(result)
print(data)
# シリアルを閉じる
comm.close();
使用方法
pySerialのバージョン2.5以上は、write()の引数がbytearray()です。
文字列を送信する場合は、適切なencode()をしてください。
str.encode()はデフォルトで'utf-8'です。
# シリアルを開く
comm = SampleComm()
comm.open('/dev/ttyUSB0', '115200')
# データ送信
comm.send('sample'.encode())
# データ受信(タイムアウト=10sec)
result, data = comm.recv(10)
print(result)
print(data)
# シリアルを閉じる
comm.close();
sample.pyを実行しても動作確認できます。
$ python sample.py
おわりに
機器との通信は、コマンド送信→コマンド受信の順で処理することが多く、受信処理にタイムアウト付きで実行したかったので作りました。
受信完了の判定などは適宜変更する必要があります。
シリアル通信で同じようなケースの少しでも参考になれば幸いです。