はじめに
機械学習の代表的な応用例として、競馬の結果を予測する「競馬AI」があります。
今回は競馬のデータを取得し、その結果を予測する機械学習モデルを構築、実際に予測してみるところまでを簡単なコードの解説を交えて紹介できればと思います。
実行環境
・macOS 13.2.1
・Python 3.10.13
もくじ
- STEP1 : データの取得
- 1. スクレイピング先のサイトについて
- 2. スクレイピングを行うコード
- STEP2 : データの確認・前処理
- 1. データ構造の確認
- 2. 目的変数の前処理
- 3. 特徴量エンジニアリング
- STEP3 : モデリング
- 1. モデリング
- STEP4 : 性能評価
- 1. 走破時間の予測性能評価
- 1. 利益・回収率の性能評価
STEP1 : データの取得
1. スクレイピング先のサイトについて
netkeiba.comから過去のレース結果を取得します。
2. スクレイピングを行うコード
参考サイトのコードを用いて、Pythonでデータのスクレイピングを行います。スクレイピングはBeautifulSoupというライブラリを用いることで実現可能です。
今回は2019年~2024年のデータを取得して、2019年~2023年のデータで訓練を行い、2024年のデータで性能検証を行います。
※ 実行するファイルと同じディレクトリのdataフォルダに、年ごとのレース結果がcsvファイルとして保存されます。また、データの取得には1年あたり30分程度かかります。
import requests
from bs4 import BeautifulSoup
import time
import csv
#取得開始年
year_start = 2019
#取得終了年
year_end = 2025
for year in range(year_start, year_end):
race_data_all = []
#取得するデータのヘッダー情報を先に追加しておく
race_data_all.append(['race_id','馬','騎手','馬番','走破時間','オッズ','通過順','着順','体重','体重変化','性','齢','斤量','上がり','人気','レース名','日付','開催','クラス','芝・ダート','距離','回り','馬場','天気','場id','場名'])
List=[]
#競馬場
l=["01","02","03","04","05","06","07","08","09","10"]
for w in range(len(l)):
place = ""
if l[w] == "01":
place = "札幌"
elif l[w] == "02":
place = "函館"
elif l[w] == "03":
place = "福島"
elif l[w] == "04":
place = "新潟"
elif l[w] == "05":
place = "東京"
elif l[w] == "06":
place = "中山"
elif l[w] == "07":
place = "中京"
elif l[w] == "08":
place = "京都"
elif l[w] == "09":
place = "阪神"
elif l[w] == "10":
place = "小倉"
#開催回数分ループ(6回)
for z in range(7):
continueCounter = 0 # 'continue'が実行された回数をカウントするためのカウンターを追加
#開催日数分ループ(12日)
for y in range(13):
race_id = ''
if y<9:
race_id = str(year)+l[w]+"0"+str(z+1)+"0"+str(y+1)
url1="https://db.netkeiba.com/race/"+race_id
else:
race_id = str(year)+l[w]+"0"+str(z+1)+str(y+1)
url1="https://db.netkeiba.com/race/"+race_id
#yの更新をbreakするためのカウンター
yBreakCounter = 0
#レース数分ループ(12R)
for x in range(12):
if x<9:
url=url1+str("0")+str(x+1)
current_race_id = race_id+str("0")+str(x+1)
else:
url=url1+str(x+1)
current_race_id = race_id+str(x+1)
try:
r=requests.get(url)
#リクエストを投げすぎるとエラーになることがあるため
#失敗したら10秒待機してリトライする
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
print("Retrying in 10 seconds...")
time.sleep(10) # 10秒待機
r=requests.get(url)
#バグ対策でdecode
soup = BeautifulSoup(r.content.decode("euc-jp", "ignore"), "html.parser")
soup_span = soup.find_all("span")
#馬の数
allnum=(len(soup_span)-6)/3
#urlにデータがあるか判定
if allnum < 1:
yBreakCounter+=1
print('continue: ' + url)
continue
allnum=int(allnum)
race_data = []
for num in range(allnum):
#馬の情報
soup_txt_l=soup.find_all(class_="txt_l")
soup_txt_r=soup.find_all(class_="txt_r")
#走破時間
runtime=''
try:
runtime=soup_txt_r[2+5*num].contents[0]
except IndexError:
runtime = ''
soup_nowrap = soup.find_all("td",nowrap="nowrap",class_=None)
#通過順
pas = ''
try:
pas = str(soup_nowrap[3*num].contents[0])
except:
pas = ''
weight = 0
weight_dif = 0
#体重
var = soup_nowrap[3*num+1].contents[0]
try:
weight = int(var.split("(")[0])
weight_dif = int(var.split("(")[1][0:-1])
except ValueError:
weight = 0
weight_dif = 0
weight = weight
weight_dif = weight_dif
soup_tet_c = soup.find_all("td",nowrap="nowrap",class_="txt_c")
#上がり
last = ''
try:
last = soup_tet_c[6*num+3].contents[0].contents[0]
except IndexError:
last = ''
#人気
pop = ''
try:
pop = soup_span[3*num+10].contents[0]
except IndexError:
pop = ''
#レースの情報
try:
var = soup_span[8]
sur=str(var).split("/")[0].split(">")[1][0]
rou=str(var).split("/")[0].split(">")[1][1]
dis=str(var).split("/")[0].split(">")[1].split("m")[0][-4:]
con=str(var).split("/")[2].split(":")[1][1]
wed=str(var).split("/")[1].split(":")[1][1]
except IndexError:
try:
var = soup_span[7]
sur=str(var).split("/")[0].split(">")[1][0]
rou=str(var).split("/")[0].split(">")[1][1]
dis=str(var).split("/")[0].split(">")[1].split("m")[0][-4:]
con=str(var).split("/")[2].split(":")[1][1]
wed=str(var).split("/")[1].split(":")[1][1]
except IndexError:
var = soup_span[6]
sur=str(var).split("/")[0].split(">")[1][0]
rou=str(var).split("/")[0].split(">")[1][1]
dis=str(var).split("/")[0].split(">")[1].split("m")[0][-4:]
con=str(var).split("/")[2].split(":")[1][1]
wed=str(var).split("/")[1].split(":")[1][1]
soup_smalltxt = soup.find_all("p",class_="smalltxt")
detail=str(soup_smalltxt).split(">")[1].split(" ")[1]
date=str(soup_smalltxt).split(">")[1].split(" ")[0]
clas=str(soup_smalltxt).split(">")[1].split(" ")[2].replace(u'\xa0', u' ').split(" ")[0]
title=str(soup.find_all("h1")[1]).split(">")[1].split("<")[0]
race_data = [
current_race_id,
soup_txt_l[4*num].contents[1].contents[0],#馬の名前
soup_txt_l[4*num+1].contents[1].contents[0],#騎手の名前
soup_txt_r[1+5*num].contents[0],#馬番
runtime,#走破時間
soup_txt_r[3+5*num].contents[0],#オッズ,
pas,#通過順
num+1,#着順
weight,#体重
weight_dif,#体重変化
soup_tet_c[6*num].contents[0][0],#性
soup_tet_c[6*num].contents[0][1],#齢
soup_tet_c[6*num+1].contents[0],#斤量
last,#上がり
pop,#人気,
title,#レース名
date,#日付
detail,
clas,#クラス
sur,#芝かダートか
dis,#距離
rou,#回り
con,#馬場状態
wed,#天気
w,#場
place]
race_data_all.append(race_data)
print(detail+str(x+1)+"R")#進捗を表示
if yBreakCounter == 12:#12レース全部ない日が検出されたら、その開催中の最後の開催日と考える
break
#1年毎に出力
#出力先とファイル名は修正してください
with open('data/'+str(year)+'.csv', 'w', newline='',encoding="SHIFT-JIS") as f:
csv.writer(f).writerows(race_data_all)
print("終了")
STEP2 : データの確認・前処理
1. データ構造の確認
取得したデータを以下のコードで確認します。
データは指定したディレクトリのdataファイルに、年ごとに保存されているものとします。
# データの読み込み
df_2019 = pd.read_csv("~/data/2019.csv", encoding="shift-jis")
df_2020 = pd.read_csv("~/data/2020.csv", encoding="shift-jis")
df_2021 = pd.read_csv("~/data/2021.csv", encoding="shift-jis")
df_2022 = pd.read_csv("~/data/2022.csv", encoding="shift-jis")
df_2023 = pd.read_csv("~/data/2023.csv", encoding="shift-jis")
df_2024 = pd.read_csv("~/data/2024.csv", encoding="shift-jis")
# データの基本構造が同じことを確認
# 年ごとにレース数は若干異なる
# 2024年は途中のため、サンプル数が少ない
print(df_2019.shape) # (47574)
print(df_2020.shape) # (48282)
print(df_2021.shape) # (47821)
print(df_2022.shape) # (47220)
print(df_2023.shape) # (47672)
print(df_2024.shape) # (22045)
# 2019年のデータの先頭を確認
display(df_2019.head())
以上の最後のコードで以下の図が出力されます。
ひとつのサンプルには一騎を表現しており、race_idを参照することで同じレースの結果のみ確認することも可能です。
目的変数の候補となるカラムには、「走破時間」、「通過順」、「順位」などがあります。
目的変数以外は説明変数の候補となりますが、前述の目的変数候補カラムを用いて特徴量を作成する場合はリークの可能性に気をつける必要があります。
以下全てのカラムをまとめます。
カラム名 | 説明 |
---|---|
race_id | レースごとに与えられるID |
馬 | 馬の名前 |
騎手 | 騎手の名前 |
馬番 | 出走レーンの番号 |
走破時間 | 完走までのタイム |
オッズ | 最終的なオッズ |
通過順 | チェックポイントを通過した際の順位 |
着順 | 最終的な着順 |
体重 | 馬の体重 |
体重変化 | 直近(のレース?)からの体重の変化 |
性 | 馬の性別。牡馬(牡)、牝馬(牝)、騸馬 (セ)のいずれか |
齢 | 馬の年齢 |
斤量 | 馬にかかる負荷 |
上がり | 最後の600mのタイム |
人気 | 馬の人気。ランキング |
レース名 | レース名 |
日付 | レースの開催日 |
開催 | 開催場所、開催日 |
クラス | 開催レースの参加条件 |
芝・ダート | 芝かダートか |
距離 | レースの走行距離 |
回り | 右回り(右)、左回り(左)、直線(直)のいずれか。芝というデータもあるが何を意味するのか不明 |
馬場 | 馬場の状態。良、不、稍、重のいずれか |
天気 | 天気。曇、晴、雨、小、雪など |
場id | 場名を整数値でカテゴリ化したもの |
場名 | レースの会場名。札幌、函館、福島、新潟など10会場 |
2 . 目的変数の前処理
今回、実装するモデルは走破時間を予測するものを考えます。
その走破時間の早いものから順に順位付けしていくことで、あるレースにおける馬の順位を予測します。
元データでは走破時間は例えば「1:03:59」ように表されています。このままでは予測が行えないので、秒表記に直していきます。
# 訓練データセットとテストデータセットを作成する
train_df = pd.concat([df_2019, df_2020, df_2021, df_2022, df_2023], axis=0)
test_df = df_2024
# 走破時間に欠損値がある行を削除(予測変数とのRMSEを出すためtest_dfにおいても変換を実施)
train_df = train_df.dropna(subset=["走破時間"])
test_df = test_df.dropna(subset=["走破時間"])
# 走破時間を秒に変換する関数
def convert_time_to_seconds(time):
time_list = time.split(":")
return float(time_list[0]) * 60 + float(time_list[1])
# 変換を実施する
train_df["走破時間"] = train_df["走破時間"].astype(str).apply(convert_time_to_seconds)
3 . 特徴量エンジニアリング
機械学習を適用するにあたり、元データのカラムを加工して新たな特徴量として利用します。
今回は年ごとに「前回レースの着順」「前回レースからの日数」を特徴量に追加します。各年度最初のレースではこれらの特徴量は取得できないため、その場合は「前回レースの着順」はunknownというデータで置き換え、「前回レースからの日数」は30(日)で置き換えることとします。
# 日付をdatetime型に変換する関数
def convert_date_to_datetime(date):
date = date.replace("年", "-").replace("月", "-").replace("日", "")
return pd.to_datetime(date)
train_df["日付"] = train_df["日付"].apply(convert_date_to_datetime)
test_df["日付"] = test_df["日付"].apply(convert_date_to_datetime)
# 馬の名前ごとにデータを追加
new_train_list = []
for horse in train_df["馬"].unique():
tmp_horse_df = train_df[train_df["馬"] == horse]
# 前回の着順を追加
tmp_horse_df["前回着順"] = tmp_horse_df["着順"].shift(1).fillna("unknown")
# 前回レースからの日数を追加
tmp_horse_df["前回レースからの日数"] = tmp_horse_df["日付"].shift(1)
tmp_horse_df["前回レースからの日数"] = tmp_horse_df["日付"] - tmp_horse_df["前回レースからの日数"]
tmp_horse_df["前回レースからの日数"] = tmp_horse_df["前回レースからの日数"].apply(lambda x: x.days)
tmp_horse_df["前回レースからの日数"] = tmp_horse_df["前回レースからの日数"].fillna(30)
new_train_list.append(tmp_horse_df)
new_train = pd.concat(new_train_list, axis=0)
# 馬の名前ごとにデータを追加
new_test_list = []
for horse in test_df["馬"].unique():
tmp_horse_df = test_df[test_df["馬"] == horse]
# 前回の着順を追加
tmp_horse_df["前回着順"] = tmp_horse_df["着順"].shift(1).fillna("unknown")
# 前回レースからの日数を追加
tmp_horse_df["前回レースからの日数"] = tmp_horse_df["日付"].shift(1)
tmp_horse_df["前回レースからの日数"] = tmp_horse_df["日付"] - tmp_horse_df["前回レースからの日数"]
tmp_horse_df["前回レースからの日数"] = tmp_horse_df["前回レースからの日数"].apply(lambda x: x.days)
tmp_horse_df["前回レースからの日数"] = tmp_horse_df["前回レースからの日数"].fillna(30)
new_test_list.append(tmp_horse_df)
new_test = pd.concat(new_test_list, axis=0)
またレースに参加している馬の頭数も特徴量としてデータに追加します。
# 各レースの情報を追加
new_train_list = []
for race_id in new_train["race_id"].unique():
tmp_df = new_train[new_train["race_id"] == race_id]
tmp_df["レース参加頭数"] = len(tmp_df)
new_train_list.append(tmp_df)
new_train = pd.concat(new_train_list, axis=0)
# 各レースの情報を追加
new_test_list = []
for race_id in new_test["race_id"].unique():
tmp_df = new_test[new_test["race_id"] == race_id]
tmp_df["レース参加頭数"] = len(tmp_df)
new_test_list.append(tmp_df)
new_test = pd.concat(new_test_list, axis=0)
最後にレース名を簡略化します。具体的にはレース名から「GI」、「GII」、「GII」などのより大きな区分を示す文字列に置き換えます。
# レース名を変換する関数
def race_name_comverter(race_name):
if race_name.find("(") == -1:
if race_name.find("未勝利") != -1:
return "未勝利"
elif race_name.find("オープン") != -1:
return "オープン"
elif race_name.find("1勝") != -1:
return "1勝"
elif race_name.find("2勝") != -1:
return "2勝"
elif race_name.find("3勝") != -1:
return "3勝"
elif race_name.find("500万") != -1:
return "500万"
elif race_name.find("1000万") != -1:
return "1000万"
elif race_name.find("新馬") != -1:
return "新馬"
else:
start = race_name.find("(")
end = race_name.find(")")
race_type = race_name[start+1:end]
if race_type=="秋)(GI":
race_type = "GI"
elif race_type=="春)(GI ":
race_type = "GI"
return race_type
# 変換を実施する
new_train["レース名"] = new_train["レース名"].apply(race_name_comverter)
new_test["レース名"] = new_test["レース名"].apply(race_name_comverter)
モデリング
1. モデリング
加工したデータで機械学習をしていきます。
学習に使わないカラムは削除し、数値でない特徴量はcategory型に変換することで学習を行えるようにします。
# 訓練用、テスト用にデータを加工する
ignore_columns = ["race_id","通過順","着順","場名","日付","人気","上がり","開催"]
train_df = new_train.drop(columns=ignore_columns)
test_df = new_test.drop(columns=ignore_columns)
# カテゴリー変数に設定
for column in train_df.columns:
if train_df[column].dtype == "object":
train_df[column] = train_df[column].astype("category")
test_df[column] = test_df[column].astype("category")
# データを確認する
display(train_df.head())
display(test_df.head())
学習を行います。lightgbmを学習器として用います。本来はoptunaなどでパラメータフィッティングを行うべきですが、今回は簡単のためにパラメータを決めうちで行います。
# データの定義
X_train = train_df.drop(columns="走破時間")
y_train = train_df["走破時間"]
X_test = test_df.drop(columns="走破時間")
y_test = test_df["走破時間"]
# モジュールのインポート
import lightgbm as lgb
import shap
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt
# データセットを作成
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=0)
# LightGBMのデータセットを作成
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val)
# モデルのパラメータ
params = {
'objective': 'regression',
'metric': 'rmse',
'boosting_type': 'gbdt',
'num_leaves': 150,
'min_data_in_leaf': 50,
'seed': 0,
'learning_rate': 0.1,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'early_stopping_round': 100
}
# モデルのトレーニング
gbm = lgb.train(params, train_data, num_boost_round=10000, valid_sets=[val_data])
以上のコードを実行することで学習が完了します。
性能検証
訓練したモデルの評価を行います。
評価指標として、「目的変数のRMSE」と「予測した順位で単勝馬券を購入した際の回収率」を見てみます。
1 . 走破時間の性能検証評価
以下のコードで確認します。
# 予測値と正解のプロット
# テストデータに対する予測
y_pred = gbm.predict(X_test)
rmse = np.sqrt(np.mean((y_pred - y_test)**2))
plt.scatter(y_pred, y_test)
plt.plot([0, 300], [0, 300], color='red')
plt.title(f"RMSE: {rmse}")
plt.xlabel('y_pred')
plt.ylabel('y_true')
plt.show()
結果は以下のようになります。
RMSEが1.75程度なので、およそ1.75秒の誤差で予測ができているというこになります。よさげに見えますが、レースにおいて勝敗がコンマ数秒の差で決まることを考えるとあまり良くない結果でしょう。
2 . 利益・回収率の性能評価
次に「予測した順位で単勝馬券を購入した際の回収率」を確認します。
元データに存在している単勝オッズを使用することで、シュミレーションをしれみましょう。
簡単のために1~3着で予測した馬の100円分の単勝馬券を買い続けた場合を考えます。
# データの用意
# 一度新たに2024年度のデータを読み込む
df_2024 = pd.read_csv("~/data/2024.csv", encoding="shift-jis")
# 馬の名前ごとにデータを追加
new_df_2024_list = []
# 走破時間に欠損値がある行を削除
df_2024 = df_2024.dropna(subset=["走破時間"])
df_2024["日付"] = df_2024["日付"].apply(convert_date_to_datetime)
df_2024["走破時間"] = df_2024["走破時間"].astype(str).apply(convert_time_to_seconds)
for horse in df_2024["馬"].unique():
tmp_horse_df = df_2024[df_2024["馬"] == horse].sort_values("日付")
if len(tmp_horse_df) == 1:
tmp_horse_df["前回着順"] = "unknown"
tmp_horse_df["前回レースからの日数"] = 30
else:
# 前回の着順を追加
tmp_horse_df["前回着順"] = tmp_horse_df["着順"].shift(1).fillna("unknown")
# 前回レースからの日数を追加
tmp_horse_df["前回レースからの日数"] = tmp_horse_df["日付"].shift(1)
tmp_horse_df["前回レースからの日数"] = tmp_horse_df["日付"] - tmp_horse_df["前回レースからの日数"]
tmp_horse_df["前回レースからの日数"] = tmp_horse_df["前回レースからの日数"].apply(lambda x: x.days).fillna(30)
new_df_2024_list.append(tmp_horse_df)
new_df_2024 = pd.concat(new_df_2024_list, axis=0)
new_df_2024["レース名"] = new_df_2024["レース名"].apply(race_name_comverter)
new_df_2024["オッズ"] = new_df_2024["オッズ"].astype("float")
def predict_test(seed,iteration):
np.random.seed(seed)
count = 0
benefit = 0
benefit_list = [0]
benefit_rate_list = [0]
# df_2024のレースから iterationと同じ数だけランダムに選んで、そのレースの結果を予測する
race_list = np.random.choice(new_df_2024["race_id"].unique(), iteration)
for race_id in race_list:
count += 1
# 予測対象のレースのデータを取得
test_predict = new_df_2024[new_df_2024["race_id"]== race_id]
# 参加頭数の情報を追加
test_predict["レース参加頭数"] = len(test_predict)
# カテゴリー変数に設定
for column in test_predict.columns:
if test_predict[column].dtype == "object":
test_predict[column] = test_predict[column].astype("category")
# 予測対象のレースのデータを取得
test_predict["前回着順"] = test_predict["前回着順"].astype("category")
# 予測して、順位に変換する
test_predict["predict"] = gbm.predict(test_predict.drop(columns=ignore_columns + ["走破時間"]))
test_predict["predict_rank"] = test_predict["predict"].rank()
test_predict["actual"] = test_predict["着順"]
# 利益を計算
benefit -= 300
for i in range(1, 4):
# 1~3位と予測した馬が実際に1位だった場合、オッズをかけて利益を計算
if test_predict[test_predict["predict_rank"] == i]["actual"].values == 1:
benefit += 100 * float(test_predict[test_predict["predict_rank"] == i]["オッズ"].values)
benefit_rate = ((300 * count) + benefit) / (300 * count)
benefit_list.append(benefit)
benefit_rate_list.append(benefit_rate)
plt.figure(figsize=(10, 5))
plt.title("利益の推移")
plt.xlabel("レース数")
plt.ylabel("利益")
plt.plot(benefit_list)
plt.show()
plt.figure(figsize=(10, 5))
plt.title("利益率の推移")
plt.xlabel("レース数")
plt.ylabel("利益率")
plt.plot(benefit_rate_list)
plt.show()
return benefit_list, benefit
以上のようにデータと関数を設定することで、「テストデータからレースをランダムに取り出し」→「順位を予測」→「1,2,3着と予測された馬の単勝馬券を購入」→「利益を計算」という流れを好きな回数趣味レーションすることができます。出力として、利益と回収率の推移を見ることができるようにしています。
ここでは例として200レースの結果を示します。
predict_test(0, 200)
上図が利益の推移、下図が利益率の推移です。結果としては負けてしまっています(泣)。
また今回は30レース目近くで穴場券を当てることができており、最終的な利益率が80%程度と悪くない数値ですが、もしもそれがなければ、回収率は基本的に減少していってしまいそうであることが見て取れます。
非常に残念な結果です。
まとめ
今回はスクレイピングした過去のレース結果をもとに、競馬の結果を予測する簡単なモデルを構築・性能検証を行いました。特徴量エンジニアリング、パラメータ最適化、さまざまな馬券の購入方法があることなどを考慮すると、モデル改善の余地はまだまだありそうです。
今後も時間があるときに、さらにモデルを拡張していきたいです!
まずは単勝以外のオッズを取得するところからやりたいと思います。
ここまでお付き合いいただきありがとうございました!
コードの不具合、ご意見・ご感想ありましたら遠慮なくお願いいたします。
ではでは。