11
14

More than 3 years have passed since last update.

シリアルポートからくるデータをリアルタイムにプロットする

Last updated at Posted at 2021-08-27

作ったもの

Pythonの標準機能とMatplotlibとPySerialなどよく使われるライブラリだけで、シリアルポートからくるデータをリアルタイムにプロットしてみました。

画像

コードはGitHubに

なぜ作ったか

Arduinoなどのマイコンにセンサを繋ぎ、測定値をシリアルでパソコンに送り込むことがよくあります。Arduino IDEにはSerialPlotterの機能があるので、リアルタイムに測定値を眺めることがすぐにできてとても便利です。

ところが表示のレンジが勝手に変わってしまうのを止めたかったり、何かしらの計算をしながら表示したかったり。自分でちょっとしたカスタマイズをしてリアルタイムにグラフを眺めたいことがあります。

Instructablesなどをみていると、Processingを利用して自作アプリを作る例を見かけます。例えばオープンソースの脈波計 Pulse Sensorのアプリ
pulse sensor processing app

できれば普段使い慣れたPythonでやりたい。しかもなるべく特殊なライブラリは入れずに。

方針

なるべく特別なライブラリを使わずにやりたい。とりあえずシンプルな機能だけにして、ボイラープレートとして使えるコードにする。
シリアルポートを扱うのはPySerial、グラフ表示は定番のmatplotlibを使う。

ぱっとできそうですが、いくつか工夫が必要です。
* 絶え間なくシリアルからくるデータを受け取りつつGUIを止めないために、標準のthreadingモジュールを使って別スレッドでデータを受け取る
* Matplotlibはもともと静的グラフを描画するライブラリなので、それなりの速度でリアルタイムにアップデートし続けるにはanimation機能を使う
* 別スレッドで受け取ったデータをanimationのスレッドで安全に使うための処理
* 一定数の最新のデータを保持するためのデータ構造をどうするか

私自身ソフトのプロではなく、いろいろ調べながらやっています。間違ったり余計なことをやっていることもあると思うので、アドバイスお願いします。

コード

コード全体はGitHub上で確認してください。

シリアルからデータを受けるワーカー

class DataWorker(threading.Thread):
    """
    Worker to get date from serial, parse data and put into numpy array
    """
    def __init__(self, ser, data):
        """
        ser: PySerial object
        data: Numpy array to hold data
        """
        threading.Thread.__init__(self)
        self._data = data
        self._ser = ser
        self._running = False
        self._lock = threading.Lock()

    def stop(self):
        self._running = False

    def run(self):
        """
        Worker loop
        """
        logging.debug("Start running worker")
        self._running = True
        line = ""
        while self._running:
            try:
                if self._ser.is_open:
                    line = self._ser.readline()
            except Exception as e:
                logging.error("Serial port Exception: "+ str(e))
            if line:
                line2 = line.decode("utf-8").strip()
                logging.info(line2)
                if line2 == '':
                    #logging.info("Emtpy line")
                    pass
                elif line2.startswith('#'):
                    logging.info("Found comment line")
                else:
                    self.parseLine(line2)
        logging.debug("Finished running worker")

別スレッドで処理するためにthreading.Threadを継承したクラスを作ります。
runメソッドにループでシリアルポートから1行ずつ読み込んで処理。
本当はシリアルポートのバッファ見ながら自分で改行コードで区切ってとかやるのでしょうが、PySerialのreadline()でラクしてます。
self._running変数を介してループすることで、外部からstop()できるようにするのが常套手段(?)

データはnumpy.ndarrayに格納して受け渡す

    def parseLine(self, line):
        """
        Implement your line parsing code here
        This example expects all comma separated values are float
        """
        try:
            values = [float(_) for _ in line.split(',')]
            # make sure the number of values are same as expected
            if len(values) == self._data.shape[0]:
                with self._lock:
                    self._data = np.roll(self._data, -1) # move all data to left by 1
                    self._data[:,-1] = values # replace last data with new values
            else:
                logging.error("Wrong number of values")
                logging.error(line)
        except Exception as e:
            logging.error("error parsing: "+str(e))
            logging.error(line)

    @property
    def snapshot(self):
        """ make a copy of current data and return
        """
        with self._lock:
            data = self._data
        return data

データのラインが来たらパースしてnumpy.ndarrayに格納します。
とりあえずコンマ区切りですべてfloatだとして処理してますが、必要に応じて変更してください。

プロットするために、決まった個数の最新データだけを変数に入れます。リングバッファとしてPython標準ではcollections.dequeが使えるようです。でもnumpy.roll()でスライドさせていくのも悪くないようなのでnumpy.ndarrayを使うことにします。ndarrayの方が2次元配列で一気に処理できるし、maptplotlibに渡すと時もラクそうなので。

スレッド間でデータを受け渡しすることになるので、アクセスするときにはthreading.Lock()を使い保護する。

animation用のクラスを定義

class Plotter():
    """
    Class to hold plot figure
    provides functions for animation
    """
    def __init__(self, fig, ax, x_data, data_worker):
        """
        fig: Figure
        ax: Axes
        x_data: array of x values
        data_worker: instance of DataWorker that provide data
        """
        self._x_data = x_data
        self.data_worker = data_worker
        self.fig = fig
        self.ax = ax

    def initial_plot(self):
        """
        Adjust plot command
        Make sure to return lines to update
        """
        # obtain latest data from worker
        data = self.data_worker.snapshot
        # draw a line with 1st row data
        self.lineRed, = self.ax.plot(self._x_data, data[0,:], 'r-')
        # adjust plot
        self.ax.set_xlim(0, 500)
        self.ax.set_ylim(100000,200000)
        return self.lineRed,

    def update_plot(self, frame):
        # obtain latest data from worker
        data = self.data_worker.snapshot
        # update line's ydata
        self.lineRed.set_ydata(data[0,:])
        return self.lineRed,

matplotlibで動的なプロットをするにはanimationの仕組みを使います。FuncAnimationクラスには、最初のプロットをする関数と各フレームで描画する関数を渡すので、それを1つのクラスにまとめて定義しておきます。

animationのポイントは、アップデートしたいArtistを最初のプロットをするときに変数に入れておくこと。ここではinitial_plot()の中でself.ax.plot()で作られるLine2Dをself.lineRedに格納したうえでreturnします。

アニメーションの各フレームで呼ばれる update_plot()の中では、DataWorkerから最新データを受け取り、Line2D.set_ydataに最新データを渡すことで高速にアップデートします。

セットアップ

if __name__ == "__main__":
    fmt = "%(message)s"
    logging.basicConfig(format=fmt, level=logging.INFO)

    port = "/dev/tty.usbmodem14201"
    baudrate = 115200
    ser = serial.Serial(port, baudrate, timeout=1)

    # we expect two values from serial port
    # ValueA,ValueB
    n_values = 2 # expecting number of values in a line
    n_points = 500 # number of latest data points to hold
    # make numpy array to hold data
    arr = np.empty(n_values * n_points)
    arr[:] = np.nan # fill with NaN as default
    data = np.reshape(arr, (n_values, n_points))

    arr_x = np.arange(n_points) # make array for x axis

    fig = plt.figure()
    ax = fig.add_subplot(111)
    dataworker = DataWorker(ser, data)
    plotter = Plotter(fig, ax, arr_x, dataworker)
    dataworker.start()
    ani = FuncAnimation(fig, 
                        plotter.update_plot, 
                        frames=None,
                        init_func=plotter.initial_plot, 
                        blit=True, 
                        interval=50,
                        )
    plt.show()
    dataworker.stop()

シリアルポートやボーレートを決めてPySerial.Serialオブジェクトをインスタンス化。
1行に何個の値が来るかと、何データ分プロットするかを決めたらndarrayを作ります。
とりあえずnp.nanにしておけばグラフの初期化がラク。

FuncAnimationにもろもろ渡してあげれば出来上がりです。

今後

ボイラープレートなのであまり複雑にする気はないですが、パーサーは外から与えるようにするなどDataWorkerクラスは変更せずに使い回せるようにしたいところ。受け取ったデータをファイルに保存するコードも入れていきたい。loggingの機能を使えばすぐできるはず。

あとはちゃんとしたGUIアプリにしようとすると、QtなりのGUIライブラリを使いたくなる。QtとMatplotlibの組み合わせも癖があるので、Qt版のボイラープレートも作っておきたいところ。Qt6+PySide6に対応したmatplotlib v3.5が出たらやってみたい。

11
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
14