0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

線形回帰モデルの実装手順

Posted at

概要

PythonのScikit-learnで線形回帰モデルの実装手順とサンプルコードを共有する

手順

1️⃣ データの読み込み
2️⃣ 特徴量エンジニアリング
3️⃣ 説明変数 X & 目的変数 y に分割
4️⃣ train/test に分割
5️⃣ PolynomialFeatures 適用(必要に応じて)
6️⃣ StandardScaler で標準化(必要に応じて)
7️⃣ モデルの学習
8️⃣ モデルの評価
9️⃣ 可視化
🔟 モデルを保存

サンプル

サンプルコードを示す
今回は車速とアクセル開度とブレーキ開度の時系列データを与えモデルを作成した
未知の車速が与えられたときに、アクセルとブレーキ開度を予測する運転モデルを作った

sample.py
# ライブラリ
import pandas as pd
import numpy as np

from sklearn.preprocessing import PolynomialFeatures
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import joblib

import shap
import matplotlib.pyplot as plt


# 関数
def add_acceleration(df):
    """加速度の特徴量を追加"""
    df['Acceleration[m/s2]'] = df['Speed[km/h]'].diff().fillna(0) / 3.6 / df['Time[s]'].diff().fillna(0.1)

    return df

def add_feature(df, cnt):
    """数秒先の情報を特徴量に追加"""
    for i in range(1, cnt):  # 0.1秒, 0.2秒, 0.3秒先...
        # df[f"Speed_future_{i}[km/h]"] = df['Speed[km/h]'].shift(-i)
        df[f"Acceleration_future_{i}[m/s2]"] = df["Acceleration[m/s2]"].shift(-i)

    return df

# 予測結果を補正
def correct_prediction(y_pred_throttle, y_pred_brake, X_test):
    """予測結果を補正"""
    speed_value = X_test["Speed[km/h]"].values

    # 配列をコピーして元データを変更しないようにする
    y_pred_throttle_corrected = np.copy(y_pred_throttle)
    y_pred_brake_corrected = np.copy(y_pred_brake)

    # Speed[km/h] == 0 の場合、Throttle を 0 に補正
    y_pred_throttle_corrected[speed_value == 0] = 0

    # Throttle と Brake の負の値を 0 に補正
    y_pred_throttle_corrected = np.maximum(y_pred_throttle_corrected, 0)
    y_pred_brake_corrected = np.maximum(y_pred_brake_corrected, 0)
    
    # `Speed[km/h] == 0` かつ `y_pred_brake < 15%` の場合、中央値(22.78%)に補正
    y_pred_brake_corrected[(speed_value == 0) & (y_pred_brake_corrected < 15)] = 22.78

    # アクセルとブレーキが同時に踏まれている場合、どちらか大きい方を残し、小さい方を 0 にする
    mask = (y_pred_throttle_corrected > 0) & (y_pred_brake_corrected > 0)
    y_pred_throttle_corrected[mask] = np.where(y_pred_throttle_corrected[mask] >= y_pred_brake_corrected[mask], y_pred_throttle_corrected[mask], 0)
    y_pred_brake_corrected[mask] = np.where(y_pred_brake_corrected[mask] > y_pred_throttle_corrected[mask], y_pred_brake_corrected[mask], 0)
    
    return y_pred_throttle_corrected, y_pred_brake_corrected

# 1️⃣ データの読み込み
df = pd.read_csv("Actual_Data.csv")

# 2️⃣ 特徴量エンジニアリング
df = add_acceleration(df)
# 未来を追加
feature_cnt = 3
df = add_feature(df, feature_cnt)
# アクセルとブレーキが同時に踏まれているデータの行数をカウント
same_time_count = ((df["Throttle[%]"] > 0) & (df["Brake[%]"] > 0)).sum()
zero_speed_throttle_count = ((df["Speed[km/h]"] == 0) & (df["Throttle[%]"] > 0)).sum()
# print(f"アクセルとブレーキが同時に踏まれている行数: {same_time_count}")
# print(f"速度0でアクセルを踏んでいる行数: {zero_speed_throttle_count}")
# データを削除
# print(f"削除前の行数: {len(df)}")
df = df[~((df["Throttle[%]"] > 0) & (df["Brake[%]"] > 0))]
df = df[~((df["Speed[km/h]"] == 0) & (df["Throttle[%]"] > 0))]
# 確認
same_time_count = ((df["Throttle[%]"] > 0) & (df["Brake[%]"] > 0)).sum()
zero_speed_throttle_count = ((df["Speed[km/h]"] == 0) & (df["Throttle[%]"] > 0)).sum()
# print(f"アクセルとブレーキが同時に踏まれている行数: {same_time_count}")
# print(f"速度0でアクセルを踏んでいる行数: {zero_speed_throttle_count}")
# NaNを削除
num_nan_rows = df.isna().any(axis=1).sum()
# print(f"NaN を含む行の数: {num_nan_rows}")
df = df.dropna()
# 確認
num_nan_rows = df.isna().any(axis=1).sum()
# print(f"NaN を含む行の数: {num_nan_rows}")
# print(f"削除後の行数: {len(df)}")

# 3️⃣ 説明変数 X & 目的変数 y に分割
xcol = ['Speed[km/h]', 'Acceleration[m/s2]', 'Acceleration_future_2[m/s2]']
X = df[xcol]
y = y = df[['Throttle[%]', 'Brake[%]']]

# 4️⃣ train/test に分割(trainが学習用、testが評価用、xが説明変数、yが目的変数)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 5️⃣ PolynomialFeatures 適用(必要に応じて)
poly = PolynomialFeatures(degree=3, include_bias=False)
# PolynomialFeatures を適用(train のみ fit)
X_train_poly = poly.fit_transform(X_train)
X_test_poly = poly.transform(X_test)

# 6️⃣ StandardScaler で標準化(必要に応じて)
scaler = StandardScaler()
# StandardScaler を適用(train のみ fit)
X_train_scaled = scaler.fit_transform(X_train_poly)
X_test_scaled = scaler.transform(X_test_poly)

# 7️⃣ モデルの学習
model_throttle = LinearRegression()  # 切片なしならfit_intercept=False
model_brake = LinearRegression()

model_throttle.fit(X_train_scaled, y_train["Throttle[%]"])
model_brake.fit(X_train_scaled, y_train["Brake[%]"])

# 8️⃣ モデルの評価
y_pred_throttle = model_throttle.predict(X_test_scaled)
y_pred_brake = model_brake.predict(X_test_scaled)
# 補正
y_pred_throttle_corrected, y_pred_brake_corrected = correct_prediction(y_pred_throttle, y_pred_brake, X_test)
# 評価(R^2)
throttle_score = r2_score(y_test["Throttle[%]"], y_pred_throttle_corrected)
brake_score = r2_score(y_test["Brake[%]"], y_pred_brake_corrected)
# 評価(RMSE)
rmse_throttle = np.sqrt(mean_squared_error(y_test["Throttle[%]"], y_pred_throttle_corrected))
rmse_brake = np.sqrt(mean_squared_error(y_test["Brake[%]"], y_pred_brake_corrected))
print(f"Throttle R^2 Score: {throttle_score}")
print(f"Brake R^2 Score: {brake_score}")
print(f"Throttle RMSE: {rmse_throttle}")
print(f"Brake RMSE: {rmse_brake}")

# 9️⃣ 可視化
# 特徴量の重要度を SHAP などで可視化
explainer = shap.Explainer(model_throttle, X_test_scaled)
shap_values = explainer(X_test_scaled)
shap.summary_plot(shap_values, X_test_scaled)
# 回帰式の係数と切片をdfに格納
df_coef = pd.DataFrame()
feature_names = poly.get_feature_names_out(list(X.columns))
df_coef["Feature"] = feature_names
df_coef["Throttle"] = model_throttle.coef_
df_coef["Brake"] = model_brake.coef_
# csvに出力
df_coef.to_csv("coef.csv", index=False)

print("回帰式")
print(f"切片:Throttle {model_throttle.intercept_}")
print(f"切片:Brake    {model_brake.intercept_}")
df_coef
# 回帰式の係数の分析
# 大きい順にソート
df_coef_throttle = df_coef.sort_values(by="Throttle", ascending=False)
df_coef_brake = df_coef.sort_values(by="Brake", ascending=False)

# グラフ表示
# グラフのサイズを設定
plt.figure(figsize=(8, 5))
# 横棒グラフを作成
plt.barh(df_coef_throttle["Feature"], df_coef_throttle["Throttle"], color="blue", edgecolor="black")
plt.title("Throttle")
# グリッドを表示(X軸方向のみ)
plt.grid(axis="x", linestyle="--", alpha=0.7)
plt.show()

# 横棒グラフを作成
plt.barh(df_coef_brake["Feature"], df_coef_brake["Brake"], color="red", edgecolor="black")
plt.title("Brake")
# グリッドを表示(X軸方向のみ)
plt.grid(axis="x", linestyle="--", alpha=0.7)
plt.show()

# 🔟 モデルを保存
# モデルを保存(Throttle と Brake)
joblib.dump(model_throttle, "model_throttle.pkl")
joblib.dump(model_brake, "model_brake.pkl")
print("モデルが保存されました")
# モデルを読み込む
model_throttle_loaded = joblib.load("model_throttle.pkl")
model_brake_loaded = joblib.load("model_brake.pkl")
print("モデルが読み込まれました")

結果

Throttle R^2 Score: 0.9906606414122686
Brake R^2 Score: 0.9508386434187767
Throttle RMSE: 0.7252065372965398
Brake RMSE: 1.8993579634510172
回帰式
切片:Throttle 10.95503148550699
切片:Brake    4.419940406452666
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?