1. 目的と完成予想
電子天秤A&D FX-200iの測定データをcsvとして自動保存するプログラムをPythonで実装する.
完成イメージ図は下のようである.電子天秤上の容器に連続的に液体を流し込む.その時の重さの時系列変化を知りたいので,連続的にPC側から計測結果を取得し,自動でCSVへ書き出してくれるプログラムを作成する.
非エンジニアである自分の備忘録として記述しているので,内容には誤りや正確性を欠く表現が含まれているかもしれない.ご容赦いただきたい.
また,この記事は友人へ向けて書いたものでもあるため,基礎的な内容も多く冗長になってしまった.最終的に完成したコードを見たい方は,ページの一番下をご覧頂きたい.
2. 基礎知識
2.1. RS-232C
この電子天秤はRS-232CインタフェースでPCと接続することができる.RS-232Cとは,制御機器や計測機器などに採用されている,最も普及しているシリアル通信の規格であり,機器とPCなどを1対1で接続することができる.
2.2. FX-200iのデータ送信方法と設定項目
FX-200iには,「データ出力モード」という設定項目がある(マニュアル p33参照).データ出力モードは本体設定の「dout > Prt」から設定可能である.このモードを変更することで,RS-232Cのデータ送信の動作を決定することができる.
各モードの詳細についてはマニュアルを一読されたいが,今回使用するのは ストリームモード である.
”ストリーム”という英単語から推測できる通り,「川の流れ」を想像すると良い.川の上流から取得したデータを,頭から順番に下流に流す.
ストリームモードを選択する際には,「 表示書換周期 」と「 ボーレート 」の設定値も検討する必要がある.
a) 表示書換周期
表示書換周期とは, 計測値を一秒間に何回取得するか のことである.これは,本体設定の「bASFnc > SPd」で設定可能で,5, 10, 20 [回/秒]の中から選択可能である.
b) ボーレート
また,ボーレートはRS-232C通信に関係する話である.これは,シリアル通信の通信速度を表しており(厳密には違うらしい),「SiF > bPS」から設定可能である.この値は,後述する Pythonプログラム内の値と同一にする 必要がある.
3. Pythonプログラム
3.1. pySerialで機器と接続
Pythonでは, pySerial というライブラリを使用してシリアル通信を簡単に行うことができる.
PythonがインストールされているPCで,下を実行することでライブラリをインストールできる.
> pip install pyserial
PCにRS-232Cデバイスを接続すると,PC上で自動的に COMポート が割り当てられる.この割り当てられた番号を指定して機器へアクセスするので,PC上で何番へ割り当てられたから確認する必要がある.
Windowsのデバイスマネージャーから簡単に確認できるので,確認方法の詳細はA&Dが出している以下のPDFを参照されたい.
COMポートの確認が済めば,簡単にPythonからシリアル通信が行える.
一番単純な書き方は下の方法である.
import serial
serial1 = serial.Serial('COM4')
これで接続処理が完了する.さらに,機器側で設定したRS-232Cのボーレートや機器接続のタイムアウトなどを盛り込むと下のようになる.先述の通り,このボーレートの値は,機器側と同じにする必要がある.
import serial
serial1 = serial.Serial('COM4', 19200, timeout=0.1)
3.2. 計測データ取得
FX-200iはストリームモードにしてあるので,何もしなくても連続的にデータが流れてきている状態である.
データを取得するには下のコードを実行する.
line_data = serial1.readline()
print(line_data)
先ほど接続した機器の変数 serial1 に対して,一行を読み込む変数 readline() を呼んでいる.
これを実行すると
>> b'US\xac+\xb01\xb23\xb.1\xb21\xa0\xa0\xe7\x8d\n'
>> b'S\xd4\xac+\xb01\xb23\xb.1\xb20\xa0\xa0\xe7\x8d\n'
>> b'S\xd4\xac+\xb01\xb23\xb.1\xb20\xa0\xa0\xe7\x8d\n'
などが得られる.この一見意味不明な文字列を成形して計測データを得る.
ここで,取扱説明書を確認してみると,FX-200iから送られる測定データは下のようなフォーマットとなっている.(取扱説明書p35より)
これによると,一つの測定データは,「計測値の安定状態」「測定データ」「単位」「ターミネータ」の4つからなることがわかる.
(実はFX-200iから送信されるデータのフォーマットはいくつか種類があるが,デフォルトで選択されているフォーマット「A&D標準フォーマット」をFX-200i側で選択している前提で話を進める.)
話を戻し,説明書と実際に得られたデータを比較してみるといくつかの規則性があることがわかる.
計測データには,まず初めに正負を表す + - があり,その後数値が続く.計測結果の数値は,2桁ごとに「\xb」が挟まれており,それを取り除くと計測結果が得られる(バックスラッシュと円マークは同じ).
これらをもとに,シリアル通信から得られたデータを少数の数値へ直すコードは下のようになる.
result = str(serial.readline())[2:-1]
print(result)
# 安定時を表す記号
stable_symbol = r'S\xd4\xac'
unstable_symbol = r'US\xac'
stable_or_not = ''
# 安定不安定の判定
if stable_symbol in result:
stable_or_not = 'o'
# 必要ない記号の除去
# 安定時
result = result.replace(stable_symbol, '')
# 不安定時
result = result.replace(unstable_symbol, '')
# 数値前の記号
result = result.replace(r'\xb', '')
# 単位の分離
result = result.replace(r'\xa0', '')
# 単位、改行
result = result.replace(r'\xe7\x8d\n', '')
# 正負判定
weight = 0
try:
if '+' in result:
weight = float(result.replace(r'+', ''))
elif '-' in result:
weight = float(result.replace(r'-', '')) * -1
except ValueError:
weight = 9999999
行数は多いが,やっていることはシンプルである.
1. 安定か不安定かを表す記号が含まれているのかを判定する.
2. 安定状態ならば "stable_or_not" という変数に "o" と入れておく.
3. "replace()"という関数で,必要のない部分を除去していく.
4. + - の符号判定をし,+が数値の最初にあれば,数値をそのまま変数 "weight" に格納し,-が含まれていれば,数値に-1を乗して変数 "weight" に格納する.
5. 測定不能の場合には,9999999を変数 "weight" に格納する.
といった手順である.
3.3. 測定データの格納
上のデータ取得処理をwhileで回して,ストリームで流れてきたデータを随時取得していく.そして,その得られた測定結果(安定状態,重さ)と,現在測定を始めて何秒目なのかのデータをまとめて配列へ格納する.
output_list = [time_counter, weight, stable_or_not]
result_list.append(output_list)
上の "output_list"という配列は,一回分の計測結果を保存するための配列で,[測定開始からの時間(秒),重さ,安定か不安定かの状態] の3つの要素からなる.その配列を,測定結果をすべて保存する配列 "result_list" へ "append()" 格納している.したがって,"result_lsit" の行数は,測定したデータの個数分となる.
測定を止めない限り,この全測定データを保持している配列 "result_list" は増大していくが,今日のPCに搭載しているメモリの容量を考えると問題とはならない容量である.
3.4. CSVへ書き出し
まずは,CSVファイルの保存先フォルダを作成する."os.makedirs" という関数を使い,保存先として指定したフォルダがない場合には自動作成させる.
output_dir = 'C:/Users/hogehoge/Desktop/FX-200i'
os.makedirs(output_dir, exist_ok=True)
指定したディレクトリがすでにある場合,"os.makedirs"はデフォルトではエラーを起こす.しかし,引数として "exist_ok=True"を与えると,すでに指定したディレクトリがある場合でもエラーを起こさない.
保存するフォルダは完成したので,次は保存するファイル名である.
start_time = datetime.datetime.now()
formatted_start_time = start_time.strftime('%y%m%d_%H%M%S')
file_name = f'FX-201i_{formatted_start_time}.csv'
ファイル名に計測スタート時の年月日と時刻を入れたいので,プログラム開始時に "datetime.datetime.now()" を呼び,予め時刻を取得しておく.
それを "strftime()" 関数でフォーマットしてファイル名に使用している.
先に作成したファイル名 "output_dir" と 上で作成したファイル名 "file_name" を結合させることで,保存先のパスが完成する.
output_csv_path = os.path.join(output_dir, file_name)
パス同士の結合には,"os.path.join()" を用いると良い.osごとにパスの区切り文字は異なるが,最適な区切り文字を選び結合してくれる.
ここまでくればあとはCSVとして書き出すだけである.書き出すデータの配列 "result_list" と,書き出し先のパス "output_csv_path" が揃ったので憂うことは何もない.
with open(output_csv_path, mode='w') as file:
writer = csv.writer(file, lineterminator='\n')
writer.writerows(result_list)
これだけで,2次元配列のCSV出力は完了する.
3.5. 例外処理
今回の作成したコードの中には,複数の try-exceptの例外処理が存在する.
例外処理は
try:
# 処理内容
except Exception:
# 例外処理
と書く.try の中に行いたい処理を記述する.tryの中の処理をPython側は実行し,tryの中でエラーが発生した場合に except のブロックへ飛ばされる.
上では "except Exception" と書いたが,"Exception" の部分には,発生するエラーの名称を記述する.エラーごとに行いたい処理は変わるので,
try:
# 処理内容
except ValueError:
# 例外処理1
except KeyboardInterrupt:
# 例外処理2
と,複数の種類のエラーに対応できるコードが書ける.
一口にエラーと言っても,発生するエラーによって様々なエラーの名称が付けられている.すべてのエラーをまとめた "Exception" というエラー名を使用することもできるが,"Exception" を使用した例外処理は最悪の方法である.中でどのような例外が発生しているかに目を向けずに黙らせる.そんな方法である.なので,2つ上のコードで書いたExceptionは使わないほうが良い.
今回のプログラムでは,データ取得の無限ループから抜けるために "KeyboardInterrupt" というCtrl+Cを押した際に上がるエラーや,シリアル通信の接続失敗に対応するために,"serial.serialutil.SerialException"というエラーなどを使用した.
初心者が最初から例外を意識してプログラムを記述するのは困難である.実際にコードを走らせながら上がったエラーに対して対応する形で例外処理を書く,そんなモチベーションで例外処理を書くといいと思う.
3.6. 完成したコード
最終的に完成したコードは次の通りである.
import csv
import datetime
import os
import sys
import serial
from serial.serialutil import SerialException
# データ取得関数
# main関数内から呼ばれ,whileの無限ループを回しながらデータを取得していく
def get_data(serial1, freq):
# データ取得頻度
getting_data_per_sec = freq
# すべての測定結果を格納する2次元配列
result_list = []
try:
# 取得したデータの取得時刻(秒)
current_time_sec = 0
# 取得用ループ
# 無限ループのため,Ctrl+C(停止ボタン)で抜けるしかない
while 1:
# シリアル通信から1行取得
result = str(serial1.readline())[2:-1]
# 安定時を表す記号
stable_symbol = r'S\xd4\xac'
unstable_symbol = r'US\xac'
stable_or_not = ''
# 安定不安定の判定
if stable_symbol in result:
stable_or_not = 'o'
# 必要ない記号の除去
# 安定時
result = result.replace(stable_symbol, '')
# 不安定時
result = result.replace(unstable_symbol, '')
# 数値前の記号
result = result.replace(r'\xb', '')
# 単位の分離
result = result.replace(r'\xa0', '')
# 単位、改行
result = result.replace(r'\xe7\x8d\n', '')
# 正負判定
weight = 0
try:
if '+' in result:
weight = float(result.replace(r'+', ''))
elif '-' in result:
weight = float(result.replace(r'-', '')) * -1
except ValueError:
weight = 9999999
# 計測したデータの時刻を計算
current_time_sec += 1 / getting_data_per_sec
# 少数2桁に丸める
current_time_sec = round(current_time_sec, 2)
# 計測データを表示
print(f'{current_time_sec}sec, {weight}, {stable_or_not}')
# 結果のリスト作成
output_list = [current_time_sec, weight, stable_or_not]
result_list.append(output_list)
# Ctrl+Cで停止命令を受けたときの処理
# 実行元(main関数)へすべての結果の配列を返す
except KeyboardInterrupt:
return result_list
# 計測途中にシリアル通信に失敗したときのエラー処理
# これまで取得したデータを実行元(main関数)へ返す
except SerialException:
print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
print('シリアル通信に失敗しました')
print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
return result_list
def main():
# 機器設定
# ---------------------------------------------------------------------
# ポート番号 デバイスマネージャーで確認
com = "COM4"
# シリアル通信のビットレート 図り本体と同じ値を入力 S1F > bPS
bit_rate = 19200
# 表示書き換え周期[data / sec] はかり本体と同じ値を入力 BASFnc > SPd
getting_data_per_sec = 20
# 保存先フォルダのパス
output_dir = 'C:/Users/hogehoge/Desktop/FX-200i'
# ---------------------------------------------------------------------
# 指定した保存先フォルダが存在していない場合にフォルダを作成する
os.makedirs(output_dir, exist_ok=True)
# 開始時刻の取得
start_time = datetime.datetime.now()
# メインの処理はこの中
try:
# 機器とpySerialで接続
# withで開いているので,このブロックから抜ける際に,
# 自動でシリアル通信を閉じてくれる
with serial.Serial(com, bit_rate, timeout=0.1) as serial1:
# 計測データ取得
# 取得した全計測データの配列を result_list として受け取る
result_list = get_data(serial1, getting_data_per_sec)
# データを保存するcsvのパスを作成
formatted_start_time = start_time.strftime('%y%m%d_%H%M%S')
file_name = f'FX-201i_{formatted_start_time}.csv'
output_csv_path = os.path.join(output_dir, file_name)
# csvに書き出し
with open(output_csv_path, mode='w') as file:
writer = csv.writer(file, lineterminator='\n')
writer.writerows(result_list)
print(f'\ncsv書き出し完了> {output_csv_path}')
# シリアル通信のエラー処理
# 機器との接続に失敗するとここへ飛ばされる
except SerialException:
print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
print('シリアル通信に失敗しました')
print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
sys.exit(1)
# 書き出し処理
# 途中でエラーが発生しプログラムが停止しても,必ず実行される
if __name__ == '__main__':
main()