LoginSignup
2
2

More than 1 year has passed since last update.

LEGO Mindstorms×AI 機械学習その2 線形回帰編

Last updated at Posted at 2020-10-27

本記事の内容はベータ公開としているので内容が変更される場合があります。


本記事では、教育版 LEGO® MINDSTORMS EV3(以降EV3)とPython環境を利用してライントレースに関する線形回帰を行っていく。

環境構築に関しては前回の記事を参照してほしい。

EV3で機械学習その1 環境構築編:こちら
EV3で機械学習その2 線形回帰編:本記事
EV3で機械学習その3 分類編:こちら

事前学習教材

本記事の内容は以下の書籍を参考にしている。
Pythonを使ったEV3の基本的な制御等については以下に網羅されている。

ロボットではじめるAI入門
上記書籍との内容の関係は以下の通りになる。

・概要
・PC,EV3のセットアップ ⬅ NEW(前回)
・プログラム作成方法 ⬅ NEW(前回)
・モーター,センサーの利用
・ライントレース制御
・ライントレースフィードバック
・線形回帰 ⬅ NEW(本記事)
・分類 ⬅ NEW(次回)

本記事内での環境

  • PC
    Windows10
    Python 3.7.3
    開発環境 VisualStudioCode(以降VSCode)

  • EV3
    python-ev3dev2(以降ev3dev)

目次

  1. 線形回帰とは
  2. やること
  3. EV3のモデルと走行コース
  4. 使用するライブラリとインストール
  5. プログラムの作成
  6. ソケット通信のIP設定
  7. プログラムの実行
  8. 実行結果
  9. まとめ

線形回帰とは

線形回帰はデータの分布があるとき、そのデータ群に一番当てはまる直線を求める分析手法である。

以下のような2種類のデータ群があったとき、データに対してよく当てはまる線を引くことで所持していないデータに対してもある程度予測をすることで結果を得ることができる。
19.png        19-1.png

やること

EV3で線形回帰を行うことが大きな目的だが、EV3の動作と線形回帰による推論を結びつけて行く必要がある。 
今回は以下の手順で線形回帰を行いながら、ライントレースをなめらかな走行で行うことを目的とする。

1. 楕円形のコースをライントレースで走行させながら、ジャイロセンサーの値を取得する
2. 取得したジャイロセンサーの値から、コースを走行した軌跡を描画する
3. 元のコースデータと比較し、どれだけブレた走行をしたか周回ごとに誤差を算出する
4. 誤差と旋回値をcsvデータに蓄積していきそのデータをもとに線形回帰を行い誤差の少ない旋回値を推論しフィードバックする

要約すると以下のようになる。
データ:ライントレースを行った旋回値,元のコースと走行軌跡の誤差
目的:旋回値と誤差の関係からよりきれいに走れる旋回値を見つけライントレースをする

旋回値について

前項で登場した旋回値について説明する。今回利用するロボットのモデルは車型であるため左右のモーターのスピードを指定して操舵角を決める。今回はそこに旋回値という変数と左右のモータースピードを紐付けている。旋回値を指定することでスピードに依存せずに操舵角を設定できるようにする。イメージとしては車のハンドル操作に近いものとなる。
20.png
21.png

EV3のモデルと走行コース

今回利用するEV3のモデルは以下のような車型のモデルを利用する。
80.png    88.png
車体前部にはカラーセンサーが付いており、ライントレース走行を可能にしている。
車体後部にはジャイロセンサーが付いており、車体が向いている角度を逐一取得する。この角度の値を利用して走行経路を描画する。

今回走行する楕円コースは以下のようなコースを利用する。

20.png

使用するライブラリとインストール

今回ev3devに加えて追加で利用するライブラリは以下である。

  • Numpy
  • matplotlib
  • Pandas
  • Scikit-learn

Numpy

NumpyはPythonで数値計算を行う際に、非常によく使われるライブラリである。
ベクトルや行列計算などの配列計算の処理を容易に行うことができるようになる。
今回は取得したセンサーデータを配列に格納したり、計算をしたりする際に用いる。

インストール手順
1. コマンドプロンプトを立ち上げる
2. コマンドpip install numpyを実行する
1.png

matplotlib

matplotlibはPythonでグラフを描画する際によく使われるライブラリである。
今回はジャイロセンサーデータを元に走行経路を描画するのに用いる。

以下インストール手順
1. コマンドプロンプトを立ち上げる
(Numpyインストール時に立ち上げたもので良い)
2. コマンドpip install matplotlibを実行する

2.png

Pandas

PandasはPythonでデータを効率的に扱うためのライブラリである。
今回はcsvファイルの読み込みに利用する。

以下インストール手順
1. コマンドプロンプトを立ち上げる
(Numpyインストール時に立ち上げたもので良い)
2. コマンドpip install pandasを実行する
3.png

scikit-learn

scikit-learnはPythonの機械学習ライブラリである。
分類・回帰・クラスタリングなどが比較的簡単に実装することができる。

以下インストール手順
1. コマンドプロンプトを立ち上げる
(Numpyインストール時に立ち上げたもので良い)
2. コマンドpip install scipyを実行する
3. コマンドpip install scikit-learnを実行する

4.png
5.png

プログラムの作成

今回は以下の3つのプログラムを作成する。

  • EV3側プログラム data_get_gyro.py
  • 比較用コースデータプログラム course.py
  • PC側プログラム LinearRegression.py

EV3のスペックには限度があるため、あくまで数値の計算や線形回帰の処理はPC側で行い処理結果の値をEV3に送るような構成にする。
それぞれのプログラムの関係性について以下の図に示す。

18-2.png

EV3側プログラム

EV3側プログラムは、実際にEV3で実行するプログラムである。主にEV3の動作として旋回値を指定したライントレース、ソケット通信を利用したジャイロセンサー値の転送、旋回値のフィードバックの受信を実装する。

以下EV3側プログラムdata_get_gyro.pyをVSCode上のワークスペースで作成する。ワークスペースの作成やEV3へのソースコード転送方法については前回の記事を参照してほしい。

data_get_gyro.py
import time
import socket
import sys
from ev3dev2.button import Button
from ev3dev2.motor import LargeMotor, OUTPUT_B, OUTPUT_C
from ev3dev2.sensor import INPUT_2, INPUT_3
from ev3dev2.sensor.lego import GyroSensor, ColorSensor


# power range:-1050 -> 1050, turn_ratio range:-100 -> 100
def linetrace_steer(power, turn_ratio):
    global data_cycle
    if color.reflected_light_intensity > 30:
        ev3_motor_steer(power, turn_ratio*-1)
    else:
        ev3_motor_steer(power, turn_ratio)
    time.sleep(0.1)
    data_cycle += 1


def ev3_motor_steer(power, turn_ratio):
    if turn_ratio < 0:
        lm_b.run_forever(speed_sp=power*(1+turn_ratio/50))
        lm_c.run_forever(speed_sp=power)
    elif turn_ratio > 0:
        lm_b.run_forever(speed_sp=power)
        lm_c.run_forever(speed_sp=power*(1-turn_ratio/50))
    else:
        lm_b.run_forever(speed_sp=power)
        lm_c.run_forever(speed_sp=power)


def gyro_reset():
    time.sleep(1.0)
    gyro.mode = 'GYRO-ANG'
    gyro.mode = 'GYRO-RATE'
    gyro.mode = 'GYRO-ANG'
    time.sleep(1.0)


def dataget():
    global fb_steer     # feedback steering value
    global set_steer

    _gyro_data = gyro.value()  # gyro data
    _gyro_data_str = str(_gyro_data)
    s.send(_gyro_data_str.encode())
    print(_gyro_data_str)

    if ROUND_CHECK < _gyro_data:
        while not(button.up):
            ev3_motor_steer(0, 0)
            if fb_steer is None:
                fb_steer = s.recv(1024).decode()
                fb_steer_float = float(fb_steer)
                print(fb_steer_float)
                set_steer = set_steer - fb_steer_float
            if button.backspace:
                s.close
                print('End program')
                sys.exit()
        print('set_steer = ' + str(set_steer))
        # gyro reset
        gyro_reset()
        fb_steer = None


# sensors&motors definition
button = Button()
color = ColorSensor(INPUT_3)
gyro = GyroSensor(INPUT_2)
lm_b = LargeMotor(OUTPUT_B)
lm_c = LargeMotor(OUTPUT_C)

# gyro initialize
gyro_reset()

# variable initialize
data_cycle = 1           # counter
fb_steer = None    # Feedback turn
ROUND_CHECK = 355   # confirm one round

# motor initialize
lm_b.reset()
lm_c.reset()
set_power = 200
set_steer = 70

# get gyrodate and into array
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect(('169.254.207.161', 50010))    # your PC's Bluetooth IP & PORT
    while not(button.backspace):
        linetrace_steer(set_power, set_steer)
        if data_cycle % 4 == 0:    # gyro_get_frequency
            dataget()

lm_b.stop(stop_action='brake')
lm_c.stop(stop_action='brake')
print('End program')
sys.exit()

※EV3側環境は日本語でコメントアウトすると文字コードの性質上エラーが発生するので注意が必要。

後半に記述しているs.connect(('169.254.207.161', 50010))は、後ほど環境に応じて書き換える。

比較用コースデータプログラム

比較用コースデータプログラムでは、ライントレースによって描画される軌跡の楕円と比較するために元のコースそっくりのデータを生成する。

本プログラムは直接実行するわけではなく後ほど作成するLinearRegression.pyにインポートするために作成する。
PCのデスクトップにprogramというフォルダを作成し、その中にcourse.pyをテキストドキュメントとして作成し以下の内容を記述する。

course.py
import numpy as np
import matplotlib.pyplot as plt


def original_course(element_cnt, plot_interval):
    _element_cnt_f = element_cnt % 10    # element count fraction
    _element_cnt_unf = (element_cnt - _element_cnt_f)
    _element_cnt_s = _element_cnt_unf / 10  # element count one course section
    plot_interval = plot_interval + (_element_cnt_f*(plot_interval/_element_cnt_unf))

    _xcount = 1
    _ycount = 1
    _rcount = 1
    global P
    P = np.zeros(0)
    global Q
    Q = np.zeros(0)

    # 1
    while not _xcount > _element_cnt_s:
        _x1 = plot_interval * -1*_xcount
        _y1 = 0
        P = np.append(P, _x1)
        Q = np.append(Q, _y1)
        _xcount += 1

    # 2
    while not _xcount > _element_cnt_s * 2:
        _x2 = plot_interval * -1*_xcount
        _y2 = 0
        P = np.append(P, _x2)
        Q = np.append(Q, _y2)
        _xcount += 1

    # 3 cercle
    _rcount = 0
    while not _rcount > _element_cnt_s:
        _a1 = plot_interval*(_element_cnt_s*2) * -1    # cercle centerX
        _b1 = plot_interval*_element_cnt_s - plot_interval  # cercle centerY & radius
        _x3 = _a1 + _b1*np.cos(np.radians(270-(90 / _element_cnt_s*_rcount)))
        _y3 = _b1 + _b1*np.sin(np.radians(270-(90 / _element_cnt_s*_rcount)))
        P = np.append(P, _x3)
        Q = np.append(Q, _y3)
        _rcount += 1

    # 4
    while not _ycount > _element_cnt_s:
        _x4 = _x3
        _y4 = plot_interval*_ycount + _y3
        P = np.append(P, _x4)
        Q = np.append(Q, _y4)
        _ycount += 1

    # 5 cercle
    _rcount = 0
    while not _rcount > _element_cnt_s:
        _a2 = _a1    # cercle centerX
        _b2 = _y4    # cercle centerY
        _x5 = _a2 + _b1*np.cos(np.radians(180-(90 / _element_cnt_s*_rcount)))
        _y5 = _b2 + _b1*np.sin(np.radians(180-(90 / _element_cnt_s*_rcount)))
        P = np.append(P, _x5)
        Q = np.append(Q, _y5)
        _rcount += 1

    # 6
    _xcount = 1
    while not _xcount > _element_cnt_s:
        _x6 = _x5 + plot_interval*_xcount
        _y6 = _y5
        P = np.append(P, _x6)
        Q = np.append(Q, _y6)
        _xcount += 1

    # 7
    _xcount = 1
    while not _xcount > _element_cnt_s:
        _x7 = _x6 + plot_interval*_xcount
        _y7 = _y6
        P = np.append(P, _x7)
        Q = np.append(Q, _y7)
        _xcount += 1

    # 8 cercle
    _rcount = 0
    while not _rcount > _element_cnt_s:
        _a3 = 0     # cercle centerX
        _b3 = _y4    # cercle centerY
        _x8 = _a3 + _b1*np.cos(np.radians(90-(90 / _element_cnt_s*_rcount)))
        _y8 = _b3 + _b1*np.sin(np.radians(90-(90 / _element_cnt_s*_rcount)))
        P = np.append(P, _x8)
        Q = np.append(Q, _y8)
        _rcount += 1

    # 9
    _ycount = 1
    while not _ycount > _element_cnt_s:
        _x9 = _x8
        _y9 = plot_interval*_ycount*-1 + _y8
        P = np.append(P, _x9)
        Q = np.append(Q, _y9)
        _ycount += 1

    # 10 cercle
    _rcount = 0
    while not _rcount > _element_cnt_s:
        _a4 = 0     # cercle centerX
        _b4 = _b1     # cercle centerY
        _x10 = _a4 + _b1*np.cos(np.radians(0-(90 / _element_cnt_s*_rcount)))
        _y10 = _b4 + _b1*np.sin(np.radians(0-(90 / _element_cnt_s*_rcount)))
        P = np.append(P, _x10)
        Q = np.append(Q, _y10)
        _rcount += 1

if __name__ == '__main__':
    original_course(100, 30)
    plt.figure()
    plt.plot(P, Q, '-', color='blue')
    plt.show()

PC側プログラム

PC側プログラムで、EV3から送られてくるジャイロセンサーのデータから走行経路を描画し、元のコースデータと比較し誤差を算出する。また、算出した誤差と旋回値をCSVファイルに記録し線形回帰による推論の結果として適正な旋回値の値をEV3にフィードバックする。

course.pyと同じようにprogramフォルダLinearRegression.pyをテキストドキュメントとして作成し、以下の内容を記述する。

LinearRegression.py
import socket
import course   # course.py
import sys
import csv
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os.path
from sklearn import linear_model
clf = linear_model.LinearRegression()


# Feedback phase function
def feedback(element_cnt,
             plot_interval,
             min_steer,
             X,
             cur_steer,
             steers,
             errors):
    cnt = 1
    limit = 100

    # from course.py course coordinate point
    if not element_cnt == 1:
        course.original_course(element_cnt, plot_interval)

        _X_abs = np.abs(X)          # run coordinate point 'x'
        _P_abs = np.abs(course.P)   # from course.py course coordinate point 'x'
        _X_ave = _X_abs.mean()      # average
        _P_ave = _P_abs.mean()      # average
        _point_error = np.abs(_X_ave - _P_ave)    # point_average_error

        # add feedback_data to csv
        writedata = [cur_steer, _point_error]
        with open('datafile.csv', 'a', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(writedata)          # データの書き込み

        steers = np.append(steers, cur_steer)  # append steer data
        errors = np.append(errors, _point_error)       # append error data
    print('steers = {}'.format(steers))
    print('errors = {}'.format(errors))
    print('len(errors) = {}'.format(len(errors)))

    if len(errors) > 1:
        if errors[-1] > errors[-2] and steers[-1] > min_steer:
            min_steer = steers[-1]
        errors = errors.reshape(-1, 1)

        clf.fit(errors, steers)    # linear regression

        while cnt < limit:
            ave_error = np.average(errors)
            input_error = cnt/(cnt+1) * ave_error
            input_error = input_error.reshape(-1, 1)
            predict_steer = clf.predict(input_error)
            if predict_steer > min_steer:
                break
            cnt += 1
        str_prd_steer = str(predict_steer[0])
        print('predict_steer = {}'.format(str_prd_steer))
        conn.send(str_prd_steer.encode())
        return predict_steer[0], min_steer, steers, errors
    else:
        cur_steer = cur_steer*2/3
        print('next_steer = {}'.format(cur_steer))
        conn.send(str(cur_steer).encode())
        return cur_steer, min_steer, steers, errors


# variable initialize
gyro = np.zeros(0)
element_cnt = 1   # element count
plot_interval = 30   # plot point interval
X = np.zeros(0)
Y = np.zeros(0)
steers = np.zeros(0)
errors = np.zeros(0)
Lap = 0
ROUND_CHECK = 355   # confirm one round
ini_steer = 70
cur_steer = ini_steer

# generate
if os.path.exists('datafile.csv') == False:
    writedata = ['steer', 'error']
    f = open('datafile.csv', 'w', newline='')  # ファイルを開く
    writer = csv.writer(f)
    writer.writerow(writedata)          # データの書き込み
    f.close()
data = pd.read_csv("datafile.csv", sep=",")  # csvファイルの読み込み
steer_data = data.loc[:, 'steer'].values          # 目的変数に旋回値を設定(steer列のデータを代入)
error_data = data['error'].values                # 説明変数に誤差を設定(error列のデータを代入)
min_steer = 0        # 最小値

for cnt, data in np.ndenumerate(steer_data):
    if error_data[cnt] < 900:
        steers = np.append(steers, data)               # コースアウト以外の値を代入
    elif data > min_steer:
        min_steer = data        # 最小値の更新
for data in error_data:
    if data < 900:
        errors = np.append(errors, data)          # コースアウト以外の値を代入

# Main program
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind(('169.254.207.161', 50010))  # your PC's Bluetooth IP & PORT
    s.listen(1)
    print('Start program...')
    while True:
        conn, addr = s.accept()
        with conn:
            if len(errors) > 1:
                values = feedback(element_cnt,
                                  plot_interval,
                                  min_steer,
                                  X,
                                  cur_steer,
                                  steers,
                                  errors)
                cur_steer = values[0]
                min_steer = values[1]
                steers = values[2]
                errors = values[3]
            else:
                conn.send(str(cur_steer).encode())
            while True:
                gyro_data = conn.recv(1024).decode()
                if not gyro_data:
                    break
                gyro_data_float = float(gyro_data)  # change type
                gyro = np.append(gyro, gyro_data_float)    # gyro angle
                np.set_printoptions(suppress=True)

                cosgy = plot_interval * np.cos(np.radians(gyro)) * -1
                singy = plot_interval * np.sin(np.radians(gyro))
                X = np.append(X, np.sum(cosgy[0:element_cnt]))
                Y = np.append(Y, np.sum(singy[0:element_cnt]))

                if ROUND_CHECK < gyro_data_float:
                    plt.figure()
                    plt.plot(X, Y, '-', color='blue')
                    print('Plot file output')
                    print(str(Lap) + '-plot.png')
                    plt.savefig(str(Lap) + '-plot.png')
                    values = feedback(element_cnt,
                                      plot_interval,
                                      min_steer,
                                      X,
                                      cur_steer,
                                      steers,
                                      errors)
                    cur_steer = values[0]
                    min_steer = values[1]
                    steers = values[2]
                    errors = values[3]

                    # reset phase
                    element_cnt = 0
                    X = np.zeros(0)
                    Y = np.zeros(0)
                    gyro = []
                    plt.clf()   # figure clear
                    plot_interval = 30
                    Lap += 1

                element_cnt = element_cnt + 1   # Element count

            # コースアウトしたら誤差1000でcsvに書き込む
            if element_cnt > 1:
                writedata = [cur_steer, 1000]
                f = open('datafile.csv', 'a', newline='')
                writer = csv.writer(f)
                writer.writerow(writedata)
                f.close()
            print('End program')
            sys.exit()

clf = linear_model.LinearRegression()で適用する学習のアルゴリズムを設定する
clf.fit(errors, steers)でデータとラベルを与えて学習を行う
predict_steer = clf.predict(input_error)で推論を行う

後半に記述しているs.bind(('169.254.207.161', 50010))はEV3側プログラム同様環境に合わせて以下の手順で変更する。

ソケット通信のIP設定

ジャイロセンサーの値や、旋回地のフィードバックを行うためにEV3側プログラムとPC側プログラムでソケット通信を介してデータをやりとりしているが、プログラム内に記述しているIPアドレスを環境に合わせて変更する必要がある。

PC-EV3間のBluetooth接続に利用しているIPアドレスを調べる

PCとEV3をBluetooth接続すると169.254.XXX.YYYのリンクローカルアドレスが割り当てられる。以下の手順でIPアドレスを調べる。

  1. コマンドプロンプトを開く
  2. ipconfigコマンドを実行する
  3. 表示されるイーサネットアダプター Bluetooth ネットワークアドレスに表示されるIPアドレスをメモする
    6.png

data_get_gyro.py側設定

data_get_gyro.py内後半の以下の箇所のIPアドレスをメモしたものに変更する必要がある。

data_get_gyro.py
    s.connect(('169.254.207.161', 50010))    # your PC's Bluetooth IP & PORT

実際のプログラム編集としては以下のようになる。
8.png

記述を変更したら、VSCode上でEV3にワークスペースを転送する。

LinearRegression.py側設定

LinearRegression.py内後半の以下の箇所のIPアドレスをメモしたものに変更する必要がある。

LinearRegression.py
    s.bind(('169.254.207.161', 50010))  # your PC's Bluetooth IP & PORT

実際のプログラムの編集は以下のようになる。
9.png

プログラムの実行

3つのプログラムの作成と、2箇所のIPアドレスの記述変更ができたらプログラムを実行していく。以下が実行手順となる。

  1. コマンドプロンプトからコマンドcd Desktop\programを実行する(\は¥マークと同義) 10.png
  2. コマンドプロンプトにてコマンドpython LinearRegression.pyを実行する
    ※実行後Start program...と表示され待機状態になる。
    11.png
    以下のようなポップアップがでたらアクセスを許可する。
    11-1.png

  3. EV3をコースのスタート地点に設置する
    85.png

  4. VSCode上で接続しているEV3のSSHターミナルを開きcd ev3workspace/を実行する

  5. SSHターミナルにてpython3 data_get_gyro.pyを実行する
    12.png

  6. 1周するごとにEV3がスタート地点付近で停止するので、インテリジェントブロックの上ボタンを押して次の周回を始める
    ※以降6を繰り返す

実行結果

6周すると以下のような結果になる。
コマンドプロンプトには周回ごとに旋回値と誤差、データ数(周回数)、次の旋回値を表示する。VSCode上のSSHターミナルにはPC側プログラムに逐一送信しているジャイロセンサーの値が表示されている。

12-1-3.png
14.png

動作としては以下の動画ように、周回ごとにライントレースがなめらかになり周回のスピードも早くなっているのがわかる。
※クリックするとYoutubeに飛びます
IMAGE ALT TEXT HERE

データを蓄積しているCSVファイルをエクセルで開くと、以下のようにデータがまとめられている。
17.png

プログラム内でグラフ化させることも可能だが、今回はエクセル内で線形回帰のグラフを作成してみた。ある旋回値のときの誤差がある程度予測できているのがわかる。
18.png

まとめ

今回はライントレースと、その走行経路から旋回値を求める線形回帰を行った。2つの関連性があるデータを取得できればこのような方法で予測をすることができる。改良したい課題に対してどのようなデータで評価するのかを考え、カスタマイズする必要はあるが応用がきく手法である。

次回はカラーセンサーで取得するRGBの数値データから分類して数種類の色を判定していく。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2