LoginSignup
18
16

More than 3 years have passed since last update.

PythonでNI-DAQの利用

Posted at

はじめに

NI-DAQ (NATIONAL INSTRUMENTSのData acquisition device AD/DAデバイス)をこれまでLabviewで使っていましたが、Pythonで使うことになったので、その覚えとして記載します。
使用したデバイスはNI-USB6002というAD/DA変換デバイスです。Analog 信号をDigitalに変えてコンピューターに取り込みます(Analog Input)。このデバイスはAnalog出力、Digital入出力もできます。今回は、Analog Inputのプログラムです。

環境

Windows 10 pro 64bit
Anaconda
python 3.7

インストール

Control NI DAQ Device with Python and NI DAQmx

(1)NI-DAQmxのインストール
NI-DAQmx のダウンロード
インストールに結構時間がかかります。

(2)NI-DAQmx PythonAPIのインストール
https://nidaqmx-python.readthedocs.io/en/latest/
pip install nidaqmx
conda-forgeにはないようでした。

参考になったサイト
Python (halvorsen.blog)
DAQ with Python

このサイトに公式のサンプルがあります。
https://github.com/ni/nidaqmx-python/tree/master/nidaqmx_examples
なお、PyDAQmx というModuleもあるようですが今回こちらは使用していません。

NI-USB6002を用いた Analog Input データ収録

先ずUSBでPCとNI-USB6002をつなぐと、テスト画面が現れます。この画面でデバイスのテストとデバイス名を確認しておいてください。

nitest.PNG

公式サンプルにもある ai_voltage_sw_timed.py を参考にプログラムを作成します。このプログラムは、readを呼ぶとデータが取得できます。特に、1 Channel 1 Sample Read を使います。

Start-Read-Close、With構文プログラムでの速度の比較

DAQの基本は、ファイルの書き込みと同じように Start-Read/Write-Closeです。Wtih構文も使えますが、この後で示すプログラムのように計測時間に違いが出ました。
Start-Read-Closeで書いたもの、With構文で書いたものとの速度比較プログラムです。
なお、測定における時間は、精度の良いtime.perf_counter()を利用します。
time.time()は精度があまりよくない?

"""
DAQ ai_1c_1sの比較プログラム

readを呼ぶごとに1Channelの1つのデータを読み込む

(1)Start-Read-closeを使ったもの
(2)Wtih構文を使ったもの
比較結果 (1)を使った方が速かった。

DeviceはNI-USB6002 Analog Input channelを利用

"""

import datetime
import sys

from pathlib import Path
# nidaqmx モジュールがなぜか認識されなかったので記載しています。
# home = Path.home()
# 仮想環境を利用している場合は仮想環境のホルダーを指定
# module_path = home/'anaconda3/Lib/site-packages/'
# sys.path.append(str(module_path))

import time
import numpy as np
from matplotlib import pyplot as plt

import nidaqmx


def save_data(tydata,outpath):
    now = datetime.datetime.now()
    out_path =Path(outpath)
    save_file_name =  f'{out_path}/ai_1c1s_{now.strftime("%Y%m%d_%H%M%S")}.csv'
    print(save_file_name)
    np.savetxt(save_file_name, tydata, delimiter=',')


def ai_1c_1s(device_name, min_set, max_set, m_time):
    """
    Open-Read-Closeで書いたも
    """

    y_all = []
    t_all = []

    task = nidaqmx.Task()

    task.ai_channels.add_ai_voltage_chan(device_name, min_val=min_set, max_val=max_set)
    task.start()

    time_rate = 0.0005 # PCのタイミングに依存しますが、私のPCではこの程度の時間
    N = int(m_time/time_rate)

    print('start')

    start_time = time.perf_counter()
    for k in range(N):
        data = task.read()
        d_time = time.perf_counter()-start_time
        y_all.append(data)
        t_all.append(d_time)

    task.stop()
    task.close()   

    print('Finished')
    print('----Summary----')
    print(f'Data length: time: {len(t_all)}, data: {len(y_all)}')
    print(f'Real total time: {t_all[-1]}, dt: {t_all[-1]/len(t_all)}')

    ty_data = np.c_[t_all, y_all]

    return ty_data


def ai_1c_1s_with(device_name, min_set, max_set, m_time):
    """
    Withを使った書き方

    """
    y_all = []
    t_all = []

    with nidaqmx.Task() as task:

        task.ai_channels.add_ai_voltage_chan(device_name, min_val=min_set, max_val=max_set)

        # time_rate = 0.003 #with を使うとこの程度
        time_rate = 0.0005
        N = int(m_time/time_rate)

        print('start')

        start_time = time.perf_counter()
        for k in range(N):
            data = task.read()
            d_time = time.perf_counter()-start_time
            y_all.append(data)
            t_all.append(d_time)

    print('Finished')
    print('----Summary----')
    print(f'Data length: time: {len(t_all)}, data: {len(y_all)}')
    print(f'Real total time: {t_all[-1]}, dt: {t_all[-1]/len(t_all)}')

    ty_data = np.c_[t_all, y_all]

    return ty_data


def main1(device_name, min_set,max_set, m_time ):

    ty_data = ai_1c_1s(device_name,min_set,max_set, m_time )
    # print('Save')
    # save_data(tydata=ty_data, outpath='./')
    return ty_data

def main2(device_name,min_set,max_set, m_time ):

    ty_data = ai_1c_1s_with(device_name,min_set,max_set, m_time )
    # print('Save')
    # save_data(tydata=ty_data, outpath='./')
    return ty_data


if __name__ == '__main__':
    device_name ="Dev2/ai0"
    min_set, max_set, m_time = -1, 1, 2
    tydata1=main1(device_name,min_set,max_set, m_time)
    tydata2=main2(device_name,min_set,max_set, m_time)
    plt.plot(tydata1[:,0],tydata1[:,1])
    plt.plot(tydata2[:,0],tydata2[:,1])
    plt.show()

結果

PC任せに4000個データを取ったときの時間
Open-Read-Closeの場合
0.00053[s] --> 0.5ms
----Summary----
Data length: time: 4000, data: 4000
Real total time: 2.149167060852051, dt: 0.0005372917652130127

withの場合
0.0034[s] --> 3.4ms
----Summary----
Data length: time: 4000, data: 4000
Real total time: 13.97138786315918, dt: 0.003492846965789795

このような結果なので、とにかく時間分解能を上げて取りたいときはStart-read-Closeを使った方がよさそうです。(そんなに速くなくてmsのずれは問題ないということであれば、With構文を使った書き方でもよいと思います。)
公式サンプルには1 Channel N Samples Read: もありますが、よく理解できなかったので使っていません。

数十から数百msの間隔での測定プログラム

数十から数百msの間隔での測定ならば、測定関数の中にWaitを入れることである程度制御することができます。

import datetime
import sys
from pathlib import Path

import time
import numpy as np
from matplotlib import pyplot as plt

import nidaqmx

def save_data(tydata,outpath):
    now = datetime.datetime.now()
    out_path =Path(outpath)
    save_file_name =  f'{out_path}/ai_1c1s_{now.strftime("%Y%m%d_%H%M%S")}.csv'
    print(save_file_name)
    np.savetxt(save_file_name, tydata, delimiter=',')

def ai_1c_1s_wait(device_name, min_set, max_set, m_time, wait=0):

    y_all = []
    t_all = []

    task = nidaqmx.Task()

    task.ai_channels.add_ai_voltage_chan(device_name, min_val=min_set, max_val=max_set)
    task.start()

    if wait == 0 :
        time_rate = 0.0005 

    else:
        time_rate = wait-0.0005

    N = int(m_time/time_rate)    
    print('start')

    start_time = time.perf_counter()
    for k in range(N):
        data = task.read()
        d_time = time.perf_counter()-start_time
        y_all.append(data)
        t_all.append(d_time)
        time.sleep(wait)

    task.stop()
    task.close()   

    print('Finished')
    print('----Summary----')
    print(f'Data length: time: {len(t_all)}, data: {len(y_all)}')
    print(f'Real total time: {t_all[-1]}, dt: {t_all[-1]/len(t_all)}')

    ty_data = np.c_[t_all, y_all]

    return ty_data


def main(device_name, min_set,max_set, m_time, wait ):

    ty_data = ai_1c_1s_wait(device_name,min_set,max_set, m_time, wait)
    # print('Save')
    # save_data(tydata=ty_data, outpath='./')
    return ty_data


if __name__ == '__main__':
    device_name ="Dev2/ai0"
    min_set, max_set, m_time = -1, 1, 2
    tydata1=main(device_name,min_set,max_set, m_time, wait=0.01)
    plt.plot(tydata1[:,0],tydata1[:,1])
    plt.show()

GUI付の測定プログラム

実際には、このADデバイスをできるだけ時間分解能を高くして、ロガーの様に使いたく、また、データ収集開始後、他の操作をできるだけ早く行うために、操作性が良いGUIをつけて見ました。
GUIはPysimpleGUIを用いています。PysimpleGUIモジュールの使い方は、こちらを参照してください。PysimpleGUIで3つの方法でmatplotlibのグラフ表示

import datetime
import io
import sys
from pathlib import Path
import time
import numpy as np
from matplotlib import pyplot as plt
import PySimpleGUI as sg

import nidaqmx


def save_data(tydata,outpath):
    now = datetime.datetime.now()
    out_path =Path(outpath)
    save_file_name =  f'{out_path}/ai_1c1s_{now.strftime("%Y%m%d_%H%M%S")}.csv'
    print(save_file_name)
    np.savetxt(save_file_name, tydata, delimiter=',')


def make_fig(x_data,y_data,make=True):
    fig = plt.figure(figsize=(8,5))

    if make:
        ax = fig.add_subplot(111)
        ax.plot(x_data, y_data, "-o")
        plt.grid()
        return fig

    else:
        return fig


def draw_plot_image(fig):
    item = io.BytesIO()
    plt.savefig(item, format='png')
    plt.clf()
    # plt.close('all')
    return item.getvalue()


font_set = ('IPAゴシック', 24)
layout = [[sg.Text('Standby', 
                   auto_size_text=True,
                   font=font_set, text_color='#ffffff',
                   background_color= 'blue',
                   pad=((10, 10), (10, 10)),
                   border_width=12,
                   key='-sign-')],
          [sg.Button('Start', size=(8, 1), 
                     pad=((10, 10), (10, 10)), font=font_set, key='-start-')],
          [sg.Button('Save',key='-save-'),sg.Cancel()],
          [sg.Text('Device Name & Channel: '), 
           sg.InputText(default_text="Dev2/ai0", size=(8, 1), key='-dev-')],
          [sg.Text('Max Voltage: '), sg.InputText("1", size=(3, 1), key='-max-'),
           sg.Text('Min Voltage: '), sg.InputText(
               "-1", size=(3, 1), key='-min-'),
           sg.Text('Measure Time[s]: '), sg.InputText("2", size=(3, 1), key='-m_time-')],
          [sg.Text('Output Path:',size=(9, 1), ), sg.InputText(
              f'{Path.cwd()}', size=(60, 1), key='-out_path-')],
          [sg.Output(size=(70, 5))],
          [sg.Image(filename='',key='-plot-')]
                                                       ]

sg.theme('Dark Blue 3')
window = sg.Window('DAQ Control', layout,
                   location=(0, 0), alpha_channel=1.0,
                   no_titlebar=False, grab_anywhere=False).Finalize()
while True:

    event, values = window.read(timeout=20)

    if event in (None, 'Cancel'):
        break

    elif event == '-start-':
        y_all = []
        t_all = []
        dev_name = str(values['-dev-'])
        min = float(values['-min-'])
        max = float(values['-max-'])
        act_time = int(values['-m_time-'])
        # https://teratail.com/questions/253502
        options = {}
        options["background_color"] = "red"

        task = nidaqmx.Task()

        task.ai_channels.add_ai_voltage_chan(dev_name, min_val=min, max_val=max)
        task.start()

        time_rate = 0.0005
        N = int(act_time/time_rate)

        window['-sign-'].update(**options)
        window['-sign-'].update('Measure')
        print('start')

        start_time = time.perf_counter()
        for k in range(N):
            data = task.read()
            d_time = time.perf_counter()-start_time
            y_all.append(data)
            t_all.append(d_time)

        task.stop()
        task.close()   

        options["background_color"] = "blue"   
        window['-sign-'].update('Standby')
        window['-sign-'].update(**options)
        print('Finished')
        print('----Summary----')
        print(f'Data length: time: {len(t_all)}, data: {len(y_all)}')
        print(f'Real total time: {t_all[-1]}, dt: {t_all[-1]/len(t_all)}')


        ty_data = np.c_[t_all, y_all]

        fig_ = make_fig(x_data=t_all, y_data=y_all, make=True)
        fig_bytes = draw_plot_image(fig_)
        window['-plot-'].update(data=fig_bytes) 

    elif event == '-save-':
        print('Save')
        save_data(tydata=ty_data, outpath=values['-out_path-'])

window.close()

実行例

立ち上げると以下のような画面が出てきます。

aigui1.PNG

Startボタンを押すと、
測定が開始されると一番下の表示部に'start'と表示されます。また同時に、青地のStanbyが赤字のMeasureに変わります。変わったところで、他の機器の動作を行います。(測定対象物のスイッチを入れるなど)

aigui2.PNG
測定が終了するとグラフが表示されます。

aigui3.PNG

このデータを保存したい場合は、Saveボタンを押します。

まとめ

PCの速度任せで、できるだけ分解能を高くデータを取得するには、With構文を使わない方が速かったです。測定時間が等間隔ではない可能性があるので、後でScipyの線形補完などを利用して等間隔のデータに近似するのもよいかもしれません。

18
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
16