本記事の内容はベータ公開としているので内容が変更される場合があります。
本記事では、教育版 LEGO® MINDSTORMS EV3(以降EV3)とPython環境を利用してライントレースに関する線形回帰を行っていく。
環境構築に関しては前回の記事を参照してほしい。
EV3で機械学習その1 環境構築編:こちら
EV3で機械学習その2 線形回帰編:本記事
EV3で機械学習その3 分類編:こちら
事前学習教材
本記事の内容は以下の書籍を参考にしている。
Pythonを使ったEV3の基本的な制御等については以下に網羅されている。
ロボットではじめるAI入門
上記書籍との内容の関係は以下の通りになる。
・概要
・PC,EV3のセットアップ ⬅ NEW(前回)
・プログラム作成方法 ⬅ NEW(前回)
・モーター,センサーの利用
・ライントレース制御
・ライントレースフィードバック
・線形回帰 ⬅ NEW(本記事)
・分類 ⬅ NEW(次回)
本記事内での環境
-
PC
Windows10
Python 3.7.3
開発環境 VisualStudioCode(以降VSCode) -
EV3
python-ev3dev2(以降ev3dev)
目次
- 線形回帰とは
- やること
- EV3のモデルと走行コース
- 使用するライブラリとインストール
- プログラムの作成
- ソケット通信のIP設定
- プログラムの実行
- 実行結果
- まとめ
線形回帰とは
線形回帰はデータの分布があるとき、そのデータ群に一番当てはまる直線を求める分析手法である。
以下のような2種類のデータ群があったとき、データに対してよく当てはまる線を引くことで所持していないデータに対してもある程度予測をすることで結果を得ることができる。
やること
EV3で線形回帰を行うことが大きな目的だが、EV3の動作と線形回帰による推論を結びつけて行く必要がある。
今回は以下の手順で線形回帰を行いながら、ライントレースをなめらかな走行で行うことを目的とする。
- 楕円形のコースをライントレースで走行させながら、ジャイロセンサーの値を取得する
- 取得したジャイロセンサーの値から、コースを走行した軌跡を描画する
- 元のコースデータと比較し、どれだけブレた走行をしたか周回ごとに誤差を算出する
- 誤差と旋回値をcsvデータに蓄積していきそのデータをもとに線形回帰を行い誤差の少ない旋回値を推論しフィードバックする
要約すると以下のようになる。
データ:ライントレースを行った旋回値,元のコースと走行軌跡の誤差
目的:旋回値と誤差の関係からよりきれいに走れる旋回値を見つけライントレースをする
旋回値について
前項で登場した旋回値について説明する。今回利用するロボットのモデルは車型であるため左右のモーターのスピードを指定して操舵角を決める。今回はそこに旋回値という変数と左右のモータースピードを紐付けている。旋回値を指定することでスピードに依存せずに操舵角を設定できるようにする。イメージとしては車のハンドル操作に近いものとなる。
EV3のモデルと走行コース
今回利用するEV3のモデルは以下のような車型のモデルを利用する。
車体前部にはカラーセンサーが付いており、ライントレース走行を可能にしている。
車体後部にはジャイロセンサーが付いており、車体が向いている角度を逐一取得する。この角度の値を利用して走行経路を描画する。
使用するライブラリとインストール
今回ev3devに加えて追加で利用するライブラリは以下である。
- Numpy
- matplotlib
- Pandas
- Scikit-learn
Numpy
NumpyはPythonで数値計算を行う際に、非常によく使われるライブラリである。
ベクトルや行列計算などの配列計算の処理を容易に行うことができるようになる。
今回は取得したセンサーデータを配列に格納したり、計算をしたりする際に用いる。
インストール手順
- コマンドプロンプトを立ち上げる
- コマンド
pip install numpy
を実行する
matplotlib
matplotlibはPythonでグラフを描画する際によく使われるライブラリである。
今回はジャイロセンサーデータを元に走行経路を描画するのに用いる。
以下インストール手順
- コマンドプロンプトを立ち上げる
(Numpyインストール時に立ち上げたもので良い) - コマンド
pip install matplotlib
を実行する
Pandas
PandasはPythonでデータを効率的に扱うためのライブラリである。
今回はcsvファイルの読み込みに利用する。
以下インストール手順
- コマンドプロンプトを立ち上げる
(Numpyインストール時に立ち上げたもので良い) - コマンド
pip install pandas
を実行する
scikit-learn
scikit-learnはPythonの機械学習ライブラリである。
分類・回帰・クラスタリングなどが比較的簡単に実装することができる。
以下インストール手順
- コマンドプロンプトを立ち上げる
(Numpyインストール時に立ち上げたもので良い) - コマンド
pip install scipy
を実行する - コマンド
pip install scikit-learn
を実行する
プログラムの作成
今回は以下の3つのプログラムを作成する。
- EV3側プログラム
data_get_gyro.py
- 比較用コースデータプログラム
course.py
- PC側プログラム
LinearRegression.py
EV3のスペックには限度があるため、あくまで数値の計算や線形回帰の処理はPC側で行い処理結果の値をEV3に送るような構成にする。
それぞれのプログラムの関係性について以下の図に示す。
EV3側プログラム
EV3側プログラムは、実際にEV3で実行するプログラムである。主にEV3の動作として旋回値を指定したライントレース、ソケット通信を利用したジャイロセンサー値の転送、旋回値のフィードバックの受信を実装する。
以下EV3側プログラムdata_get_gyro.py
をVSCode上のワークスペースで作成する。ワークスペースの作成やEV3へのソースコード転送方法については前回の記事を参照してほしい。
import time
import socket
import sys
from ev3dev2.button import Button
from ev3dev2.motor import LargeMotor, OUTPUT_B, OUTPUT_C
from ev3dev2.sensor import INPUT_2, INPUT_3
from ev3dev2.sensor.lego import GyroSensor, ColorSensor
# power range:-1050 -> 1050, turn_ratio range:-100 -> 100
def linetrace_steer(power, turn_ratio):
global data_cycle
if color.reflected_light_intensity > 30:
ev3_motor_steer(power, turn_ratio*-1)
else:
ev3_motor_steer(power, turn_ratio)
time.sleep(0.1)
data_cycle += 1
def ev3_motor_steer(power, turn_ratio):
if turn_ratio < 0:
lm_b.run_forever(speed_sp=power*(1+turn_ratio/50))
lm_c.run_forever(speed_sp=power)
elif turn_ratio > 0:
lm_b.run_forever(speed_sp=power)
lm_c.run_forever(speed_sp=power*(1-turn_ratio/50))
else:
lm_b.run_forever(speed_sp=power)
lm_c.run_forever(speed_sp=power)
def gyro_reset():
time.sleep(1.0)
gyro.mode = 'GYRO-ANG'
gyro.mode = 'GYRO-RATE'
gyro.mode = 'GYRO-ANG'
time.sleep(1.0)
def dataget():
global fb_steer # feedback steering value
global set_steer
_gyro_data = gyro.value() # gyro data
_gyro_data_str = str(_gyro_data)
s.send(_gyro_data_str.encode())
print(_gyro_data_str)
if ROUND_CHECK < _gyro_data:
while not(button.up):
ev3_motor_steer(0, 0)
if fb_steer is None:
fb_steer = s.recv(1024).decode()
fb_steer_float = float(fb_steer)
print(fb_steer_float)
set_steer = set_steer - fb_steer_float
if button.backspace:
s.close
print('End program')
sys.exit()
print('set_steer = ' + str(set_steer))
# gyro reset
gyro_reset()
fb_steer = None
# sensors&motors definition
button = Button()
color = ColorSensor(INPUT_3)
gyro = GyroSensor(INPUT_2)
lm_b = LargeMotor(OUTPUT_B)
lm_c = LargeMotor(OUTPUT_C)
# gyro initialize
gyro_reset()
# variable initialize
data_cycle = 1 # counter
fb_steer = None # Feedback turn
ROUND_CHECK = 355 # confirm one round
# motor initialize
lm_b.reset()
lm_c.reset()
set_power = 200
set_steer = 70
# get gyrodate and into array
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('169.254.207.161', 50010)) # your PC's Bluetooth IP & PORT
while not(button.backspace):
linetrace_steer(set_power, set_steer)
if data_cycle % 4 == 0: # gyro_get_frequency
dataget()
lm_b.stop(stop_action='brake')
lm_c.stop(stop_action='brake')
print('End program')
sys.exit()
※EV3側環境は日本語でコメントアウトすると文字コードの性質上エラーが発生するので注意が必要。
後半に記述しているs.connect(('169.254.207.161', 50010))
は、後ほど環境に応じて書き換える。
比較用コースデータプログラム
比較用コースデータプログラムでは、ライントレースによって描画される軌跡の楕円と比較するために元のコースそっくりのデータを生成する。
本プログラムは直接実行するわけではなく後ほど作成するLinearRegression.pyにインポートするために作成する。
PCのデスクトップにprogram
というフォルダを作成し、その中にcourse.py
をテキストドキュメントとして作成し以下の内容を記述する。
import numpy as np
import matplotlib.pyplot as plt
def original_course(element_cnt, plot_interval):
_element_cnt_f = element_cnt % 10 # element count fraction
_element_cnt_unf = (element_cnt - _element_cnt_f)
_element_cnt_s = _element_cnt_unf / 10 # element count one course section
plot_interval = plot_interval + (_element_cnt_f*(plot_interval/_element_cnt_unf))
_xcount = 1
_ycount = 1
_rcount = 1
global P
P = np.zeros(0)
global Q
Q = np.zeros(0)
# 1
while not _xcount > _element_cnt_s:
_x1 = plot_interval * -1*_xcount
_y1 = 0
P = np.append(P, _x1)
Q = np.append(Q, _y1)
_xcount += 1
# 2
while not _xcount > _element_cnt_s * 2:
_x2 = plot_interval * -1*_xcount
_y2 = 0
P = np.append(P, _x2)
Q = np.append(Q, _y2)
_xcount += 1
# 3 cercle
_rcount = 0
while not _rcount > _element_cnt_s:
_a1 = plot_interval*(_element_cnt_s*2) * -1 # cercle centerX
_b1 = plot_interval*_element_cnt_s - plot_interval # cercle centerY & radius
_x3 = _a1 + _b1*np.cos(np.radians(270-(90 / _element_cnt_s*_rcount)))
_y3 = _b1 + _b1*np.sin(np.radians(270-(90 / _element_cnt_s*_rcount)))
P = np.append(P, _x3)
Q = np.append(Q, _y3)
_rcount += 1
# 4
while not _ycount > _element_cnt_s:
_x4 = _x3
_y4 = plot_interval*_ycount + _y3
P = np.append(P, _x4)
Q = np.append(Q, _y4)
_ycount += 1
# 5 cercle
_rcount = 0
while not _rcount > _element_cnt_s:
_a2 = _a1 # cercle centerX
_b2 = _y4 # cercle centerY
_x5 = _a2 + _b1*np.cos(np.radians(180-(90 / _element_cnt_s*_rcount)))
_y5 = _b2 + _b1*np.sin(np.radians(180-(90 / _element_cnt_s*_rcount)))
P = np.append(P, _x5)
Q = np.append(Q, _y5)
_rcount += 1
# 6
_xcount = 1
while not _xcount > _element_cnt_s:
_x6 = _x5 + plot_interval*_xcount
_y6 = _y5
P = np.append(P, _x6)
Q = np.append(Q, _y6)
_xcount += 1
# 7
_xcount = 1
while not _xcount > _element_cnt_s:
_x7 = _x6 + plot_interval*_xcount
_y7 = _y6
P = np.append(P, _x7)
Q = np.append(Q, _y7)
_xcount += 1
# 8 cercle
_rcount = 0
while not _rcount > _element_cnt_s:
_a3 = 0 # cercle centerX
_b3 = _y4 # cercle centerY
_x8 = _a3 + _b1*np.cos(np.radians(90-(90 / _element_cnt_s*_rcount)))
_y8 = _b3 + _b1*np.sin(np.radians(90-(90 / _element_cnt_s*_rcount)))
P = np.append(P, _x8)
Q = np.append(Q, _y8)
_rcount += 1
# 9
_ycount = 1
while not _ycount > _element_cnt_s:
_x9 = _x8
_y9 = plot_interval*_ycount*-1 + _y8
P = np.append(P, _x9)
Q = np.append(Q, _y9)
_ycount += 1
# 10 cercle
_rcount = 0
while not _rcount > _element_cnt_s:
_a4 = 0 # cercle centerX
_b4 = _b1 # cercle centerY
_x10 = _a4 + _b1*np.cos(np.radians(0-(90 / _element_cnt_s*_rcount)))
_y10 = _b4 + _b1*np.sin(np.radians(0-(90 / _element_cnt_s*_rcount)))
P = np.append(P, _x10)
Q = np.append(Q, _y10)
_rcount += 1
if __name__ == '__main__':
original_course(100, 30)
plt.figure()
plt.plot(P, Q, '-', color='blue')
plt.show()
PC側プログラム
PC側プログラムで、EV3から送られてくるジャイロセンサーのデータから走行経路を描画し、元のコースデータと比較し誤差を算出する。また、算出した誤差と旋回値をCSVファイルに記録し線形回帰による推論の結果として適正な旋回値の値をEV3にフィードバックする。
course.pyと同じようにprogramフォルダ
にLinearRegression.py
をテキストドキュメントとして作成し、以下の内容を記述する。
import socket
import course # course.py
import sys
import csv
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os.path
from sklearn import linear_model
clf = linear_model.LinearRegression()
# Feedback phase function
def feedback(element_cnt,
plot_interval,
min_steer,
X,
cur_steer,
steers,
errors):
cnt = 1
limit = 100
# from course.py course coordinate point
if not element_cnt == 1:
course.original_course(element_cnt, plot_interval)
_X_abs = np.abs(X) # run coordinate point 'x'
_P_abs = np.abs(course.P) # from course.py course coordinate point 'x'
_X_ave = _X_abs.mean() # average
_P_ave = _P_abs.mean() # average
_point_error = np.abs(_X_ave - _P_ave) # point_average_error
# add feedback_data to csv
writedata = [cur_steer, _point_error]
with open('datafile.csv', 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow(writedata) # データの書き込み
steers = np.append(steers, cur_steer) # append steer data
errors = np.append(errors, _point_error) # append error data
print('steers = {}'.format(steers))
print('errors = {}'.format(errors))
print('len(errors) = {}'.format(len(errors)))
if len(errors) > 1:
if errors[-1] > errors[-2] and steers[-1] > min_steer:
min_steer = steers[-1]
errors = errors.reshape(-1, 1)
clf.fit(errors, steers) # linear regression
while cnt < limit:
ave_error = np.average(errors)
input_error = cnt/(cnt+1) * ave_error
input_error = input_error.reshape(-1, 1)
predict_steer = clf.predict(input_error)
if predict_steer > min_steer:
break
cnt += 1
str_prd_steer = str(predict_steer[0])
print('predict_steer = {}'.format(str_prd_steer))
conn.send(str_prd_steer.encode())
return predict_steer[0], min_steer, steers, errors
else:
cur_steer = cur_steer*2/3
print('next_steer = {}'.format(cur_steer))
conn.send(str(cur_steer).encode())
return cur_steer, min_steer, steers, errors
# variable initialize
gyro = np.zeros(0)
element_cnt = 1 # element count
plot_interval = 30 # plot point interval
X = np.zeros(0)
Y = np.zeros(0)
steers = np.zeros(0)
errors = np.zeros(0)
Lap = 0
ROUND_CHECK = 355 # confirm one round
ini_steer = 70
cur_steer = ini_steer
# generate
if os.path.exists('datafile.csv') == False:
writedata = ['steer', 'error']
f = open('datafile.csv', 'w', newline='') # ファイルを開く
writer = csv.writer(f)
writer.writerow(writedata) # データの書き込み
f.close()
data = pd.read_csv("datafile.csv", sep=",") # csvファイルの読み込み
steer_data = data.loc[:, 'steer'].values # 目的変数に旋回値を設定(steer列のデータを代入)
error_data = data['error'].values # 説明変数に誤差を設定(error列のデータを代入)
min_steer = 0 # 最小値
for cnt, data in np.ndenumerate(steer_data):
if error_data[cnt] < 900:
steers = np.append(steers, data) # コースアウト以外の値を代入
elif data > min_steer:
min_steer = data # 最小値の更新
for data in error_data:
if data < 900:
errors = np.append(errors, data) # コースアウト以外の値を代入
# Main program
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('169.254.207.161', 50010)) # your PC's Bluetooth IP & PORT
s.listen(1)
print('Start program...')
while True:
conn, addr = s.accept()
with conn:
if len(errors) > 1:
values = feedback(element_cnt,
plot_interval,
min_steer,
X,
cur_steer,
steers,
errors)
cur_steer = values[0]
min_steer = values[1]
steers = values[2]
errors = values[3]
else:
conn.send(str(cur_steer).encode())
while True:
gyro_data = conn.recv(1024).decode()
if not gyro_data:
break
gyro_data_float = float(gyro_data) # change type
gyro = np.append(gyro, gyro_data_float) # gyro angle
np.set_printoptions(suppress=True)
cosgy = plot_interval * np.cos(np.radians(gyro)) * -1
singy = plot_interval * np.sin(np.radians(gyro))
X = np.append(X, np.sum(cosgy[0:element_cnt]))
Y = np.append(Y, np.sum(singy[0:element_cnt]))
if ROUND_CHECK < gyro_data_float:
plt.figure()
plt.plot(X, Y, '-', color='blue')
print('Plot file output')
print(str(Lap) + '-plot.png')
plt.savefig(str(Lap) + '-plot.png')
values = feedback(element_cnt,
plot_interval,
min_steer,
X,
cur_steer,
steers,
errors)
cur_steer = values[0]
min_steer = values[1]
steers = values[2]
errors = values[3]
# reset phase
element_cnt = 0
X = np.zeros(0)
Y = np.zeros(0)
gyro = []
plt.clf() # figure clear
plot_interval = 30
Lap += 1
element_cnt = element_cnt + 1 # Element count
# コースアウトしたら誤差1000でcsvに書き込む
if element_cnt > 1:
writedata = [cur_steer, 1000]
f = open('datafile.csv', 'a', newline='')
writer = csv.writer(f)
writer.writerow(writedata)
f.close()
print('End program')
sys.exit()
clf = linear_model.LinearRegression()
で適用する学習のアルゴリズムを設定する
clf.fit(errors, steers)
でデータとラベルを与えて学習を行う
predict_steer = clf.predict(input_error)
で推論を行う
後半に記述しているs.bind(('169.254.207.161', 50010))
はEV3側プログラム同様環境に合わせて以下の手順で変更する。
ソケット通信のIP設定
ジャイロセンサーの値や、旋回地のフィードバックを行うためにEV3側プログラムとPC側プログラムでソケット通信を介してデータをやりとりしているが、プログラム内に記述しているIPアドレスを環境に合わせて変更する必要がある。
PC-EV3間のBluetooth接続に利用しているIPアドレスを調べる
PCとEV3をBluetooth接続すると169.254.XXX.YYY
のリンクローカルアドレスが割り当てられる。以下の手順でIPアドレスを調べる。
- コマンドプロンプトを開く
-
ipconfig
コマンドを実行する - 表示される
イーサネットアダプター Bluetooth ネットワークアドレス
に表示されるIPアドレスをメモする
data_get_gyro.py側設定
data_get_gyro.py
内後半の以下の箇所のIPアドレスをメモしたものに変更する必要がある。
s.connect(('169.254.207.161', 50010)) # your PC's Bluetooth IP & PORT
記述を変更したら、VSCode上でEV3にワークスペースを転送する。
LinearRegression.py側設定
LinearRegression.py
内後半の以下の箇所のIPアドレスをメモしたものに変更する必要がある。
s.bind(('169.254.207.161', 50010)) # your PC's Bluetooth IP & PORT
プログラムの実行
3つのプログラムの作成と、2箇所のIPアドレスの記述変更ができたらプログラムを実行していく。以下が実行手順となる。
- コマンドプロンプトからコマンド
cd Desktop\program
を実行する(\は¥マークと同義)
- EV3をコースのスタート地点に設置する
- VSCode上で接続しているEV3のSSHターミナルを開き
cd ev3workspace/
を実行する - SSHターミナルにて
python3 data_get_gyro.py
を実行する
- 1周するごとにEV3がスタート地点付近で停止するので、インテリジェントブロックの上ボタンを押して次の周回を始める
※以降6を繰り返す
実行結果
6周すると以下のような結果になる。
コマンドプロンプトには周回ごとに旋回値と誤差、データ数(周回数)、次の旋回値を表示する。VSCode上のSSHターミナルにはPC側プログラムに逐一送信しているジャイロセンサーの値が表示されている。
動作としては以下の動画ように、周回ごとにライントレースがなめらかになり周回のスピードも早くなっているのがわかる。
※クリックするとYoutubeに飛びます
データを蓄積しているCSVファイルをエクセルで開くと、以下のようにデータがまとめられている。
プログラム内でグラフ化させることも可能だが、今回はエクセル内で線形回帰のグラフを作成してみた。ある旋回値のときの誤差がある程度予測できているのがわかる。
まとめ
今回はライントレースと、その走行経路から旋回値を求める線形回帰を行った。2つの関連性があるデータを取得できればこのような方法で予測をすることができる。改良したい課題に対してどのようなデータで評価するのかを考え、カスタマイズする必要はあるが応用がきく手法である。
次回はカラーセンサーで取得するRGBの数値データから分類して数種類の色を判定していく。