はじめに
NI-DAQ (NATIONAL INSTRUMENTSのData acquisition device AD/DAデバイス)をこれまでLabviewで使っていましたが、Pythonで使うことになったので、その覚えとして記載します。
使用したデバイスはNI-USB6002というAD/DA変換デバイスです。Analog 信号をDigitalに変えてコンピューターに取り込みます(Analog Input)。このデバイスはAnalog出力、Digital入出力もできます。今回は、Analog Inputのプログラムです。
環境
Windows 10 pro 64bit
Anaconda
python 3.7
インストール
[Control NI DAQ Device with Python and NI DAQmx](Control NI DAQ Device with Python and NI DAQmx - National Instruments)
(1)NI-DAQmxのインストール
NI-DAQmx のダウンロード
インストールに結構時間がかかります。
(2)NI-DAQmx PythonAPIのインストール
https://nidaqmx-python.readthedocs.io/en/latest/
pip install nidaqmx
conda-forgeにはないようでした。
参考になったサイト
Python (halvorsen.blog)
DAQ with Python
このサイトに公式のサンプルがあります。
https://github.com/ni/nidaqmx-python/tree/master/nidaqmx_examples
なお、PyDAQmx
というModuleもあるようですが今回こちらは使用していません。
###NI-USB6002を用いた Analog Input データ収録
先ずUSBでPCとNI-USB6002をつなぐと、テスト画面が現れます。この画面でデバイスのテストとデバイス名を確認しておいてください。
公式サンプルにもある ai_voltage_sw_timed.py を参考にプログラムを作成します。このプログラムは、readを呼ぶとデータが取得できます。特に、1 Channel 1 Sample Read を使います。
####Start-Read-Close、With構文プログラムでの速度の比較
DAQの基本は、ファイルの書き込みと同じように Start-Read/Write-Closeです。Wtih構文も使えますが、この後で示すプログラムのように計測時間に違いが出ました。
Start-Read-Closeで書いたもの、With構文で書いたものとの速度比較プログラムです。
なお、測定における時間は、精度の良いtime.perf_counter()
を利用します。
time.time()は精度があまりよくない?
"""
DAQ ai_1c_1sの比較プログラム
readを呼ぶごとに1Channelの1つのデータを読み込む
(1)Start-Read-closeを使ったもの
(2)Wtih構文を使ったもの
比較結果 (1)を使った方が速かった。
DeviceはNI-USB6002 Analog Input channelを利用
"""
import datetime
import sys
from pathlib import Path
# nidaqmx モジュールがなぜか認識されなかったので記載しています。
# home = Path.home()
# 仮想環境を利用している場合は仮想環境のホルダーを指定
# module_path = home/'anaconda3/Lib/site-packages/'
# sys.path.append(str(module_path))
import time
import numpy as np
from matplotlib import pyplot as plt
import nidaqmx
def save_data(tydata,outpath):
now = datetime.datetime.now()
out_path =Path(outpath)
save_file_name = f'{out_path}/ai_1c1s_{now.strftime("%Y%m%d_%H%M%S")}.csv'
print(save_file_name)
np.savetxt(save_file_name, tydata, delimiter=',')
def ai_1c_1s(device_name, min_set, max_set, m_time):
"""
Open-Read-Closeで書いたも
"""
y_all = []
t_all = []
task = nidaqmx.Task()
task.ai_channels.add_ai_voltage_chan(device_name, min_val=min_set, max_val=max_set)
task.start()
time_rate = 0.0005 # PCのタイミングに依存しますが、私のPCではこの程度の時間
N = int(m_time/time_rate)
print('start')
start_time = time.perf_counter()
for k in range(N):
data = task.read()
d_time = time.perf_counter()-start_time
y_all.append(data)
t_all.append(d_time)
task.stop()
task.close()
print('Finished')
print('----Summary----')
print(f'Data length: time: {len(t_all)}, data: {len(y_all)}')
print(f'Real total time: {t_all[-1]}, dt: {t_all[-1]/len(t_all)}')
ty_data = np.c_[t_all, y_all]
return ty_data
def ai_1c_1s_with(device_name, min_set, max_set, m_time):
"""
Withを使った書き方
"""
y_all = []
t_all = []
with nidaqmx.Task() as task:
task.ai_channels.add_ai_voltage_chan(device_name, min_val=min_set, max_val=max_set)
# time_rate = 0.003 #with を使うとこの程度
time_rate = 0.0005
N = int(m_time/time_rate)
print('start')
start_time = time.perf_counter()
for k in range(N):
data = task.read()
d_time = time.perf_counter()-start_time
y_all.append(data)
t_all.append(d_time)
print('Finished')
print('----Summary----')
print(f'Data length: time: {len(t_all)}, data: {len(y_all)}')
print(f'Real total time: {t_all[-1]}, dt: {t_all[-1]/len(t_all)}')
ty_data = np.c_[t_all, y_all]
return ty_data
def main1(device_name, min_set,max_set, m_time ):
ty_data = ai_1c_1s(device_name,min_set,max_set, m_time )
# print('Save')
# save_data(tydata=ty_data, outpath='./')
return ty_data
def main2(device_name,min_set,max_set, m_time ):
ty_data = ai_1c_1s_with(device_name,min_set,max_set, m_time )
# print('Save')
# save_data(tydata=ty_data, outpath='./')
return ty_data
if __name__ == '__main__':
device_name ="Dev2/ai0"
min_set, max_set, m_time = -1, 1, 2
tydata1=main1(device_name,min_set,max_set, m_time)
tydata2=main2(device_name,min_set,max_set, m_time)
plt.plot(tydata1[:,0],tydata1[:,1])
plt.plot(tydata2[:,0],tydata2[:,1])
plt.show()
####結果
PC任せに4000個データを取ったときの時間
Open-Read-Closeの場合
0.00053[s] --> 0.5ms
----Summary----
Data length: time: 4000, data: 4000
Real total time: 2.149167060852051, dt: 0.0005372917652130127
withの場合
0.0034[s] --> 3.4ms
----Summary----
Data length: time: 4000, data: 4000
Real total time: 13.97138786315918, dt: 0.003492846965789795
このような結果なので、とにかく時間分解能を上げて取りたいときはStart-read-Closeを使った方がよさそうです。(そんなに速くなくてmsのずれは問題ないということであれば、With構文を使った書き方でもよいと思います。)
公式サンプルには1 Channel N Samples Read: もありますが、よく理解できなかったので使っていません。
####数十から数百msの間隔での測定プログラム
数十から数百msの間隔での測定ならば、測定関数の中にWaitを入れることである程度制御することができます。
import datetime
import sys
from pathlib import Path
import time
import numpy as np
from matplotlib import pyplot as plt
import nidaqmx
def save_data(tydata,outpath):
now = datetime.datetime.now()
out_path =Path(outpath)
save_file_name = f'{out_path}/ai_1c1s_{now.strftime("%Y%m%d_%H%M%S")}.csv'
print(save_file_name)
np.savetxt(save_file_name, tydata, delimiter=',')
def ai_1c_1s_wait(device_name, min_set, max_set, m_time, wait=0):
y_all = []
t_all = []
task = nidaqmx.Task()
task.ai_channels.add_ai_voltage_chan(device_name, min_val=min_set, max_val=max_set)
task.start()
if wait == 0 :
time_rate = 0.0005
else:
time_rate = wait-0.0005
N = int(m_time/time_rate)
print('start')
start_time = time.perf_counter()
for k in range(N):
data = task.read()
d_time = time.perf_counter()-start_time
y_all.append(data)
t_all.append(d_time)
time.sleep(wait)
task.stop()
task.close()
print('Finished')
print('----Summary----')
print(f'Data length: time: {len(t_all)}, data: {len(y_all)}')
print(f'Real total time: {t_all[-1]}, dt: {t_all[-1]/len(t_all)}')
ty_data = np.c_[t_all, y_all]
return ty_data
def main(device_name, min_set,max_set, m_time, wait ):
ty_data = ai_1c_1s_wait(device_name,min_set,max_set, m_time, wait)
# print('Save')
# save_data(tydata=ty_data, outpath='./')
return ty_data
if __name__ == '__main__':
device_name ="Dev2/ai0"
min_set, max_set, m_time = -1, 1, 2
tydata1=main(device_name,min_set,max_set, m_time, wait=0.01)
plt.plot(tydata1[:,0],tydata1[:,1])
plt.show()
####GUI付の測定プログラム
実際には、このADデバイスをできるだけ時間分解能を高くして、ロガーの様に使いたく、また、データ収集開始後、他の操作をできるだけ早く行うために、操作性が良いGUIをつけて見ました。
GUIはPysimpleGUIを用いています。PysimpleGUIモジュールの使い方は、こちらを参照してください。PysimpleGUIで3つの方法でmatplotlibのグラフ表示
import datetime
import io
import sys
from pathlib import Path
import time
import numpy as np
from matplotlib import pyplot as plt
import PySimpleGUI as sg
import nidaqmx
def save_data(tydata,outpath):
now = datetime.datetime.now()
out_path =Path(outpath)
save_file_name = f'{out_path}/ai_1c1s_{now.strftime("%Y%m%d_%H%M%S")}.csv'
print(save_file_name)
np.savetxt(save_file_name, tydata, delimiter=',')
def make_fig(x_data,y_data,make=True):
fig = plt.figure(figsize=(8,5))
if make:
ax = fig.add_subplot(111)
ax.plot(x_data, y_data, "-o")
plt.grid()
return fig
else:
return fig
def draw_plot_image(fig):
item = io.BytesIO()
plt.savefig(item, format='png')
plt.clf()
# plt.close('all')
return item.getvalue()
font_set = ('IPAゴシック', 24)
layout = [[sg.Text('Standby',
auto_size_text=True,
font=font_set, text_color='#ffffff',
background_color= 'blue',
pad=((10, 10), (10, 10)),
border_width=12,
key='-sign-')],
[sg.Button('Start', size=(8, 1),
pad=((10, 10), (10, 10)), font=font_set, key='-start-')],
[sg.Button('Save',key='-save-'),sg.Cancel()],
[sg.Text('Device Name & Channel: '),
sg.InputText(default_text="Dev2/ai0", size=(8, 1), key='-dev-')],
[sg.Text('Max Voltage: '), sg.InputText("1", size=(3, 1), key='-max-'),
sg.Text('Min Voltage: '), sg.InputText(
"-1", size=(3, 1), key='-min-'),
sg.Text('Measure Time[s]: '), sg.InputText("2", size=(3, 1), key='-m_time-')],
[sg.Text('Output Path:',size=(9, 1), ), sg.InputText(
f'{Path.cwd()}', size=(60, 1), key='-out_path-')],
[sg.Output(size=(70, 5))],
[sg.Image(filename='',key='-plot-')]
]
sg.theme('Dark Blue 3')
window = sg.Window('DAQ Control', layout,
location=(0, 0), alpha_channel=1.0,
no_titlebar=False, grab_anywhere=False).Finalize()
while True:
event, values = window.read(timeout=20)
if event in (None, 'Cancel'):
break
elif event == '-start-':
y_all = []
t_all = []
dev_name = str(values['-dev-'])
min = float(values['-min-'])
max = float(values['-max-'])
act_time = int(values['-m_time-'])
# https://teratail.com/questions/253502
options = {}
options["background_color"] = "red"
task = nidaqmx.Task()
task.ai_channels.add_ai_voltage_chan(dev_name, min_val=min, max_val=max)
task.start()
time_rate = 0.0005
N = int(act_time/time_rate)
window['-sign-'].update(**options)
window['-sign-'].update('Measure')
print('start')
start_time = time.perf_counter()
for k in range(N):
data = task.read()
d_time = time.perf_counter()-start_time
y_all.append(data)
t_all.append(d_time)
task.stop()
task.close()
options["background_color"] = "blue"
window['-sign-'].update('Standby')
window['-sign-'].update(**options)
print('Finished')
print('----Summary----')
print(f'Data length: time: {len(t_all)}, data: {len(y_all)}')
print(f'Real total time: {t_all[-1]}, dt: {t_all[-1]/len(t_all)}')
ty_data = np.c_[t_all, y_all]
fig_ = make_fig(x_data=t_all, y_data=y_all, make=True)
fig_bytes = draw_plot_image(fig_)
window['-plot-'].update(data=fig_bytes)
elif event == '-save-':
print('Save')
save_data(tydata=ty_data, outpath=values['-out_path-'])
window.close()
####実行例
立ち上げると以下のような画面が出てきます。
Startボタンを押すと、
測定が開始されると一番下の表示部に'start'と表示されます。また同時に、青地のStanbyが赤字のMeasureに変わります。変わったところで、他の機器の動作を行います。(測定対象物のスイッチを入れるなど)
このデータを保存したい場合は、Saveボタンを押します。
###まとめ
PCの速度任せで、できるだけ分解能を高くデータを取得するには、With構文を使わない方が速かったです。測定時間が等間隔ではない可能性があるので、後でScipyの線形補完などを利用して等間隔のデータに近似するのもよいかもしれません。