この記事の目的
LabStreamingLayer(LSL)というデータ送受信のためのシステムがあります。
最近の脳波計やアイトラッカーといった計測装置は、このLSLに対応しているものが増えているようです。
これを使うと、PythonやMATLAB上で、データをリアルタイムに受け取ることが簡単にできます。
計測した生体情報をオンラインで解析してフィードバックするといった用途に便利です。
しかし、オンライン解析しているはずが何故か数十秒単位のタイムラグが生じたりと、苦戦していました。
最近ようやくその原因が(多分)わかりました。
この機会に、LSLを使ってPythonで計測データを受け取る方法についてまとめます。
必要なものを揃える
LSLでデータを送ってくれる測定装置
LSLでデータを送るのも簡単なのですが、今回は受け取る方に解説を絞ります。
そこで今回は、データをLSLで送ってくれる都合のいいやつとして、pupil labsという装置を使います。
非常に軽量なアイトラッカーです。
pupil labsから瞳孔サイズをリアルタイムに受け取り表示することを、記事の最終的な目標とします。
pupil labsを起動して、LSLに情報を流している状態を前提として進めます。
ちなみに、pupil labsをPythonから操作する方法についての記事も書いていますので、ご興味があればどうぞ。
(24年9月30日追記)Pupil Captureでキャリブレーションを済ませていないと、LSLにデータが流れないようです。ご注意ください。
LSLを使うためのPythonパッケージ
pylsl
というパッケージをインストールすることで、PythonからLSLを使うことができます。
インストールはpip
で一発です。
pip install pylsl
LSLでデータにアクセスする
PythonからLSLにアクセスしましょう。
from pylsl import StreamInlet, resolve_streams
streams = resolve_streams(wait_time=3.)
wait_time
はタイムアウトまでの秒数です。
これで、現在LSLに流れている情報の一覧がstreams
に取得されます。
リスト型変数であるstreams
の中身を見ると7つのstreamが入っています。
1つのstreamには、pupil labsから流れる信号(瞳孔のサイズや、位置など)と、
その信号についての情報(そのサンプリングレートなど)が束ねられています。
pupil labsは7つのstream(左目用カメラの情報, 右目用カメラの情報, etc.)を同時に流しています。
もしほかの装置を同時にLSLに繋げていると、さらに多くのstreamが検出されます。
どれかひとつを、StreamInlet()
の引数に与えて、inlet
というインスタンスを作ります。
inlet = StreamInlet(streams[0])
一本のstreamを引いてきたわけですね。
inletのメソッドは以下の通りです。
- info()
- open_stream(timeout=FOREVER)
- close_stream()
- time_correction(timeout=FOREVER)
- pull_sample(timeout=FOREVER, sample=None)
- pull_chunk(timeout=0.0, max_sample=1024, dest_obj=None)
- samples_available()
- was_clock_reset()
streamの情報を調べる
.info()
を使うことで、inletに捉えたstreamに含まれている情報を調べられます。
一本目のstreamの中身は、Eye Camera 1(右目用カメラ)で捉えた瞳孔のデータのようです。
ちなみに、streamの順番は繋ぐたびに変わります。
pupil labsから出されるstreamの.type()
はすべてPupil Capture
となっています。
streamには5つのチャンネルが含まれているようです。
つまり、「Pupil Primitive Data - Eye 1」というstreamの中に、5種類の信号が流れているということです。
サンプリングレートは0.0とあります。
どうもpupil labsは、サンプリングレートが動的に変動するようです。
なので情報が与えられていないのでしょうね。
5つのチャンネルの名前を取得するには、.info().desc()
を使います。
しかし、返ってくるのはXMLElementという形式のデータなので、読み解くのに一手間かかります。
ここでは詳しい説明は省略して、チャンネル名を取得するために作った関数を紹介します。
def pick_ch_names(info):
ch_xml = info.desc().child('channels').child('channel')
ch_names = []
for _ in range(info.channel_count()):
ch_names.append(ch_xml.child_value('label'))
ch_xml = ch_xml.next_sibling()
return ch_names
inlet.info()
から使えるメソッドはほかにもありますが、省略します。
print(inlet.info().channel_format())
print(inlet.info().source_id())
print(inlet.info().version())
print(inlet.info().created_at())
print(inlet.info().uid())
print(inlet.info().session_id())
print(inlet.info().hostname())
また、すべての情報をXML形式で取得する方法もあります。
これをテキストファイルにコピペして、拡張子を.xml
に変えると、XMLファイルができます。
後はここを参考にエクセルファイルに読み込むと、見やすくなります。
一回試してみてもいいかもしれません。
信号を受信する
ではいよいよ、pupil labsからの信号をリアルタイムに取得しましょう。
ここでは、Eye Camera 0の瞳孔サイズを取得します。
必要な情報の入ったstreamを探す
7本もstreamが流れているので、必要なstreamを探しましょう。
streamの順番は毎回変わるので、stream名から探します。
まず、stream名の一覧を作ります。
stream_names = []
for stream in streams:
inlet = StreamInlet(stream)
stream_names.append(inlet.info().name())
これで、必要なstreamだけを指定することができます。
import numpy as np
idx = np.where(np.array(stream_names)=='Pupil Primitive Data - Eye 0')[0][0]
inlet = StreamInlet(streams[idx])
ここまでを、関数にまとめましょう。
def inlet_specific_stream(stream_name):
import numpy as np
streams = resolve_streams(wait_time=3.)
stream_names = []
for stream in streams:
inlet = StreamInlet(stream)
stream_names.append(inlet.info().name())
idx = np.where(np.array(stream_names)==stream_name)[0][0]
inlet = StreamInlet(streams[idx])
return inlet
サンプルを取得する
信号を受信するためにはまず、inlet.open_stream()
をしてやります。
これにより、瞳孔サイズなどの信号がバッファ保存されはじめます。
streamに流れているデータのうち1サンプルを受信するには、inlet.pull_sample()
を使います。
データが1サンプル分取得されました。
ch_names
を見るに左から、瞳孔径、信頼度、タイムスタンプ、瞳孔位置(X軸)、瞳孔位置(Y軸)のようです。
信頼度は、きちんと瞳孔を検出しているかの指標ですね。
なお、初めにinlet.open_stream()
をしていない状態でpull_sample()
すると、自動でopen_stream()
してくれます。
なので実は、pull_sample()
するときにopen_stream()
をしてやる必要はありません。
しかしどの時点からバッファ保存が開始しているか明示的になるよう、きちんとopen_stream()
をした方がいいのかもしれません。
チャンクを取得する
1サンプルごとではなく、ある程度の塊でデータを受信したい場合は、pull_chunk()
を使います。
1024サンプルx5チャンネルのデータが取得されました。
1024というのは、pull_chunk()
で引っ張られるサンプル数の最大値です。
これを変えるためには、pull_chunk(max_samples=1024)
としてやります。
open_stream()
前にpull_chunk()
を使うと、pull_sample()
と同様、自動でopen_stream()
してくれます。
しかしこの場合、バッファにデータがたまっていないためか、返り値は空っぽになります。
ちなみにバッファの中身を消してやるには、inlet.close_stream()
してやればいいのかと思ったのですが、なぜかできませんでした。
なので再度inletを作ることでバッファをクリアしてやっています。
(24年9月30日追記)inlet.flush()
でもバッファがクリアできます。
pull_chunk(timeout=1.)
としてやると、open_stream()
の後1秒待ってからpull_chunk()
してくれるようです。
なので、1秒分のサンプルが返ってきます。
pull_chunk()の性質について
pull_chunk()
やpull_sample()
をすると、バッファにあるデータを引っ張ってくる(pullする)ことができます。
pullされたデータは、バッファからはなくなります。
コピーするのでなく、移動させるわけですね。(多分)
なので、バッファへデータが送られる(pushされる)以上のペースでpullしてやると、バッファが空っぽになります。
ところで、データ取り出しの形式には、2種類あるそうです。
キュー型(先入れ先出し)とスタック型(先入れ後出し)です。(参考)
LSLにおけるバッファ保存やpullは、どちらのタイプなのでしょうか。
要するに、pullされたデータは最新のサンプルやチャンクなのか(スタック型)、
あるいはopen_stream()
された直後の古いサンプル・チャンクなのか(キュー型)、という問いです。
調べてみてもよくわからなかったので、こんな実験をしてみました。
*この問いは非常に重要と思われますが、以降の検証が正しい確証はないのでご注意ください
while True: # 条件を満たすまで何度でも繰り返す
d, _ = inlet.pull_chunk() # pullする
d = np.array(d) # 扱いやすいようnumpy.array形式にする
if d.shape[0]==0: break # 引っ張ってきたデータが空っぽ(=バッファが空)だったら終了
print(d[-1, 2]) # pullで引っ張ってきたデータ(のうち最も古いもの)のタイムスタンプを表示
先ほどと同じように、バッファが空っぽになるまでpullしまくります。
この時、pullしたデータのタイムスタンプを確認します。
もしバッファがキュー型(先入れ先出し)であれば、タイムスタンプが古いものから順に取り出されます。
なので、printされる数値はどんどん大きくなるはずです。
逆に、スタック型(先入れ後出し)であれば、タイムスタンプが新しいものから順に取り出されます。
なので、printされる数値はどんどん小さくなるはずです。
さて、結果は…
数値がどんどん大きくなっています。
これはつまり、pull_chunk()
は古いものから順にデータを取り出している(キュー型)ということです。
よって、リアルタイムにデータ解析しようとしても、pull_chunk()して得られたのは古いデータである可能性があるということです。
一方、pull_sample()
については検証できませんでした。
どうもバッファに1サンプル溜まるのを待ってからpullしているようで、バッファを空っぽにすることができませんでした。
リアルタイム解析してみる
解析、というほどのことではありませんが、瞳孔のサイズをリアルタイムに円の大きさで表現するスクリプトを書きました。
from time import sleep
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import patches
from pylsl import StreamInfo, StreamInlet, resolve_streams
def inlet_specific_stream(stream_name):
import numpy as np
streams = resolve_streams(wait_time=3.)
stream_names = []
for stream in streams:
inlet = StreamInlet(stream)
stream_names.append(inlet.info().name())
idx = np.where(np.array(stream_names)==stream_name)[0][0]
inlet = StreamInlet(streams[idx])
return inlet
stream_name = 'Pupil Primitive Data - Eye 0'
fig = plt.figure()
ax = fig.add_subplot(111)
inlet = inlet_specific_stream(stream_name)
inlet.open_stream() # バッファ開始
sleep(.1) # バッファにある程度データをためる
print('START!')
while True:
# データ取得
if True: # 正しい例
d, _ = inlet.pull_chunk(max_samples=1024) # バッファにあるデータを全部取る
assert(len(d) < 1024) # 念のため、全部取り切れていることを確認する
try:
diameter = np.array(d)[-1, 0] # とってきたデータの最後の部分を使う
except: # サンプリングレートが落ちてバッファが空になることもあるので...
pass # その時はpassしてごまかす
if False: # 悪い例(1)!!!
d, _ = inlet.pull_chunk(max_samples=1) # バッファから1サンプルだけ取得する
diameter = np.array(d)[0, 0]
if False: # 悪い例(2)!!!
d, _ = inlet.pull_sample(timeout=1.) # pull_sampleを使って1サンプルだけ取得する
diameter = d[0]
# 取得した瞳孔サイズをリアルタイムに表示
plt.cla()
c = patches.Circle(xy=(0,0), radius=diameter/2)
ax.add_patch(c)
plt.xlim([-30, 30])
plt.ylim([-30, 30])
plt.pause(.1)
これを実行すると、このようになります(GIF)。
右はpupil labsのアプリです。
目を瞑ると、瞳孔の検出が切れて円の大きさが0になります。
途中、光を当てて瞳孔を収縮させると、円も一緒に小さくなっています。
若干のラグがありますが、おおむねきちんとシンクロしています。
スクリプトには途中、悪い例を二つ書いています。
悪い例(1)は、pull_chunk()
で引っ張ってこれるのが最新のデータだと思い込んでいると犯してしまうミスです。
実際は先ほど検証した通り、open_stream()
直後のデータから順に取得します。
これを0.1秒間隔で順に表示するので、大きなタイムラグが生じてしまいます。
悪い例(2)では、先ほどは検証できなかった、pull_sample()
を使っています。
この例を使うと、悪い例(1)と同様にラグが生じます。
このことから、pull_sample()
も、open_stream()
直後の古いデータを取得していることがわかりました。
まとめ
今回はpupil labsという装置を例にしましたが、LSLに対応している様々な装置で同じようにデータを受け取れるはずです。
やり方さえわかってしまえば楽にデータを飛ばせます。
ただしオンライン解析をする場合には、取得したデータが古いものになっていないか、ご注意ください。