作ったもの
Pythonの標準機能とMatplotlibとPySerialなどよく使われるライブラリだけで、シリアルポートからくるデータをリアルタイムにプロットしてみました。
コードはGitHubに。
なぜ作ったか
Arduinoなどのマイコンにセンサを繋ぎ、測定値をシリアルでパソコンに送り込むことがよくあります。Arduino IDEにはSerialPlotterの機能があるので、リアルタイムに測定値を眺めることがすぐにできてとても便利です。
ところが表示のレンジが勝手に変わってしまうのを止めたかったり、何かしらの計算をしながら表示したかったり。自分でちょっとしたカスタマイズをしてリアルタイムにグラフを眺めたいことがあります。
Instructablesなどをみていると、Processingを利用して自作アプリを作る例を見かけます。例えばオープンソースの脈波計 Pulse Sensorのアプリ
できれば普段使い慣れたPythonでやりたい。しかもなるべく特殊なライブラリは入れずに。
方針
なるべく特別なライブラリを使わずにやりたい。とりあえずシンプルな機能だけにして、ボイラープレートとして使えるコードにする。
シリアルポートを扱うのはPySerial、グラフ表示は定番のmatplotlibを使う。
ぱっとできそうですが、いくつか工夫が必要です。
- 絶え間なくシリアルからくるデータを受け取りつつGUIを止めないために、標準のthreadingモジュールを使って別スレッドでデータを受け取る
- Matplotlibはもともと静的グラフを描画するライブラリなので、それなりの速度でリアルタイムにアップデートし続けるにはanimation機能を使う
- 別スレッドで受け取ったデータをanimationのスレッドで安全に使うための処理
- 一定数の最新のデータを保持するためのデータ構造をどうするか
私自身ソフトのプロではなく、いろいろ調べながらやっています。間違ったり余計なことをやっていることもあると思うので、アドバイスお願いします。
コード
コード全体はGitHub上で確認してください。
シリアルからデータを受けるワーカー
class DataWorker(threading.Thread):
"""
Worker to get date from serial, parse data and put into numpy array
"""
def __init__(self, ser, data):
"""
ser: PySerial object
data: Numpy array to hold data
"""
threading.Thread.__init__(self)
self._data = data
self._ser = ser
self._running = False
self._lock = threading.Lock()
def stop(self):
self._running = False
def run(self):
"""
Worker loop
"""
logging.debug("Start running worker")
self._running = True
line = ""
while self._running:
try:
if self._ser.is_open:
line = self._ser.readline()
except Exception as e:
logging.error("Serial port Exception: "+ str(e))
if line:
line2 = line.decode("utf-8").strip()
logging.info(line2)
if line2 == '':
#logging.info("Emtpy line")
pass
elif line2.startswith('#'):
logging.info("Found comment line")
else:
self.parseLine(line2)
logging.debug("Finished running worker")
別スレッドで処理するためにthreading.Threadを継承したクラスを作ります。
runメソッドにループでシリアルポートから1行ずつ読み込んで処理。
本当はシリアルポートのバッファ見ながら自分で改行コードで区切ってとかやるのでしょうが、PySerialのreadline()でラクしてます。
self._running変数を介してループすることで、外部からstop()できるようにするのが常套手段(?)
データはnumpy.ndarrayに格納して受け渡す
def parseLine(self, line):
"""
Implement your line parsing code here
This example expects all comma separated values are float
"""
try:
values = [float(_) for _ in line.split(',')]
# make sure the number of values are same as expected
if len(values) == self._data.shape[0]:
with self._lock:
self._data = np.roll(self._data, -1) # move all data to left by 1
self._data[:,-1] = values # replace last data with new values
else:
logging.error("Wrong number of values")
logging.error(line)
except Exception as e:
logging.error("error parsing: "+str(e))
logging.error(line)
@property
def snapshot(self):
""" make a copy of current data and return
"""
with self._lock:
data = self._data
return data
データのラインが来たらパースしてnumpy.ndarrayに格納します。
とりあえずコンマ区切りですべてfloatだとして処理してますが、必要に応じて変更してください。
プロットするために、決まった個数の最新データだけを変数に入れます。リングバッファとしてPython標準ではcollections.dequeが使えるようです。でもnumpy.roll()でスライドさせていくのも悪くないようなのでnumpy.ndarrayを使うことにします。ndarrayの方が2次元配列で一気に処理できるし、maptplotlibに渡すと時もラクそうなので。
スレッド間でデータを受け渡しすることになるので、アクセスするときにはthreading.Lock()を使い保護する。
animation用のクラスを定義
class Plotter():
"""
Class to hold plot figure
provides functions for animation
"""
def __init__(self, fig, ax, x_data, data_worker):
"""
fig: Figure
ax: Axes
x_data: array of x values
data_worker: instance of DataWorker that provide data
"""
self._x_data = x_data
self.data_worker = data_worker
self.fig = fig
self.ax = ax
def initial_plot(self):
"""
Adjust plot command
Make sure to return lines to update
"""
# obtain latest data from worker
data = self.data_worker.snapshot
# draw a line with 1st row data
self.lineRed, = self.ax.plot(self._x_data, data[0,:], 'r-')
# adjust plot
self.ax.set_xlim(0, 500)
self.ax.set_ylim(100000,200000)
return self.lineRed,
def update_plot(self, frame):
# obtain latest data from worker
data = self.data_worker.snapshot
# update line's ydata
self.lineRed.set_ydata(data[0,:])
return self.lineRed,
matplotlibで動的なプロットをするにはanimationの仕組みを使います。FuncAnimationクラスには、最初のプロットをする関数と各フレームで描画する関数を渡すので、それを1つのクラスにまとめて定義しておきます。
animationのポイントは、アップデートしたいArtistを最初のプロットをするときに変数に入れておくこと。ここではinitial_plot()の中でself.ax.plot()で作られるLine2Dをself.lineRedに格納したうえでreturnします。
アニメーションの各フレームで呼ばれる update_plot()の中では、DataWorkerから最新データを受け取り、Line2D.set_ydataに最新データを渡すことで高速にアップデートします。
セットアップ
if __name__ == "__main__":
fmt = "%(message)s"
logging.basicConfig(format=fmt, level=logging.INFO)
port = "/dev/tty.usbmodem14201"
baudrate = 115200
ser = serial.Serial(port, baudrate, timeout=1)
# we expect two values from serial port
# ValueA,ValueB
n_values = 2 # expecting number of values in a line
n_points = 500 # number of latest data points to hold
# make numpy array to hold data
arr = np.empty(n_values * n_points)
arr[:] = np.nan # fill with NaN as default
data = np.reshape(arr, (n_values, n_points))
arr_x = np.arange(n_points) # make array for x axis
fig = plt.figure()
ax = fig.add_subplot(111)
dataworker = DataWorker(ser, data)
plotter = Plotter(fig, ax, arr_x, dataworker)
dataworker.start()
ani = FuncAnimation(fig,
plotter.update_plot,
frames=None,
init_func=plotter.initial_plot,
blit=True,
interval=50,
)
plt.show()
dataworker.stop()
シリアルポートやボーレートを決めてPySerial.Serialオブジェクトをインスタンス化。
1行に何個の値が来るかと、何データ分プロットするかを決めたらndarrayを作ります。
とりあえずnp.nanにしておけばグラフの初期化がラク。
FuncAnimationにもろもろ渡してあげれば出来上がりです。
今後
ボイラープレートなのであまり複雑にする気はないですが、パーサーは外から与えるようにするなどDataWorkerクラスは変更せずに使い回せるようにしたいところ。受け取ったデータをファイルに保存するコードも入れていきたい。loggingの機能を使えばすぐできるはず。
あとはちゃんとしたGUIアプリにしようとすると、QtなりのGUIライブラリを使いたくなる。QtとMatplotlibの組み合わせも癖があるので、Qt版のボイラープレートも作っておきたいところ。Qt6+PySide6に対応したmatplotlib v3.5が出たらやってみたい。