21
28

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Raspberry Pi + PySerialでシリアル通信

Posted at

はじめに

Raspberry Piでシリアル通信したいと思い、USBシリアル変換を使った通信を試しました。
Raspberry Piでシリアル通信試してみた」を参考にさせてもらいました。
コマンド送信→コマンド受信の順で処理することを前提とし、受信処理に受信タイムアウトを追加しました。

環境

  • Raspberry Pi3
  • Python 3.7.2
  • pySerial 3.4

インストール

  • 下記のコマンドを入力し、pySerialをインストールします。
  • pipenvのインストールは割愛します。
$ pipenv install pyserial

USBシリアル変換

FT232 USBシリアル変換ケーブル を使いました。
FT232チップを搭載したものならば、ドライバの追加なしで認識するようです。

以下のコマンドで接続先を確認してください。

$ ls -la /dev/ttyUSB*

1台目であれば接続先は、'/dev/ttyUSB0'になると思います。

注:
Rasberry PiとPC(Windows/Mac/Linux)で通信する場合は、クロスケーブルを使用してください。
Rasberry PiもPCなのでPC間はクロスケーブル、通信機器との接続はストレート(が多い)です。

コード

シリアル通信クラス

  • コマンド送信→コマンド受信の順で処理することを前提としています。(タイムアウト付き)
  • 明示的にシリアルポートをオープン/クローズできます。(個人的な実装で必要だったため)
sample.py
# -*- coding: utf-8 -*-

import serial
import time
import threading

"""
シリアル通信クラス
"""
class SampleComm:
    # 初期化
    def __init__(self):
        # オープンフラグ
        self.isPortOpen = False
        # 受信データ
        self.recvData = bytearray()
        # イベント生成
        self.event = threading.Event()

    # データ受信待ち(タイムアウト付き[sec])
    def recv(self, timeout=3):
        # タイムアウト用時間取得
        time_start = time.time()
        time_end = time_start
        # スレッド待ちイベントクリア
        self.event.clear()
        # 受信データクリア
        self.recvData.clear()
        # 受信結果 True:成功 False:失敗(タイムアウト)
        result = False

        # データ受信待ち
        while not self.event.is_set():
            # タイムアウトチェック
            time_end = time.time()
            if (time_end - time_start > timeout):
                # データ送受信停止して失敗(タイムアウト)とする
                result = False
                self.stop()
                print("timeout:{0}sec".format(timeout))
                break

            # 受信データ読み取り
            buff = self.comm.read()

            # 受信データ判定
            if len(buff) > 0:
                # 受信データ追加
                self.recvData.extend(buff)
                # (仮)¥nを受信済なら成功とする
                if (self.recvData.find(b'\n')) >= 0:
                    # データ送受信停止して成功とする
                    result = True
                    self.stop()
                    break

        # 結果を返す
        return (result, self.recvData)

    # データ送信
    def send(self, data):
        self.comm.write(data)

    # データ送受信停止
    def stop(self):
        self.event.set()

    # シリルポートオープン
    def open(self, tty, baud='115200'):
        try:
            self.comm = serial.Serial(tty, baud, timeout=0.1)
            self.isPortOpen = True
        except Exception as e:
            self.isPortOpen = False

        return self.isPortOpen

    # シリアルポートクローズ(明示的に閉じる)
    def close(self):
        self.stop()
        if (self.isPortOpen):
            self.comm.close()
        self.isPortOpen = False

if __name__ == "__main__":
    # シリアルを開く
    comm = SampleComm()
    comm.open('/dev/ttyUSB0', '115200')

    # データ送信
    comm.send('test'.encode())
    # データ受信(タイムアウト=10sec)
    result, data = comm.recv(10)
    print(result)
    print(data)

    # シリアルを閉じる
    comm.close();

使用方法

pySerialのバージョン2.5以上は、write()の引数がbytearray()です。
文字列を送信する場合は、適切なencode()をしてください。
str.encode()はデフォルトで'utf-8'です。

# シリアルを開く
comm = SampleComm()
comm.open('/dev/ttyUSB0', '115200')

# データ送信
comm.send('sample'.encode())
# データ受信(タイムアウト=10sec)
result, data = comm.recv(10)
print(result)
print(data)

# シリアルを閉じる
comm.close();

sample.pyを実行しても動作確認できます。

$ python sample.py

おわりに

機器との通信は、コマンド送信→コマンド受信の順で処理することが多く、受信処理にタイムアウト付きで実行したかったので作りました。
受信完了の判定などは適宜変更する必要があります。
シリアル通信で同じようなケースの少しでも参考になれば幸いです。

21
28
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
28

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?