この記事はKCS Advent Calendar 2024の21日目の記事です.
この記事
はじめに
初めまして!KCS(Keio Computer Society)所属,慶應義塾大学理工学部2年の しの です.アドベントカレンダーの告知がサークル内であったとき,誕生日(12/21)に何か書いてみよう!と思ったところからこの記事は始まりました.
いろいろ調べてみると,明日(12/22)が有馬記念でした.このレースは年の瀬の風物詩で,あの林修先生が
有馬記念に参加しないのは国民の義務を果たしていない
と仰ったほどです.
機械学習について勉強していたので,有馬記念を機械学習で予想したら面白そうだなと思い,実際に競馬予想AIを作ってみることにしました.
0. 設計の流れ
有馬記念を予想する機械学習モデルを作る上で,まず全体のプロジェクトの大まかな設計を決めました.
- スクレイピングプログラムを作り,netkeiba様からレース情報を取ってきてjson形式でまとめる.
- 機械学習モデルを作成し,学習を行う.
- 実際に有馬記念を予想する.
この流れに従って,競馬予想AIを作成していきました.有馬記念の予想だけ気になる方は,3から読んでいただければと思います.
1. スクレイピングプログラムの作成
機械学習モデルを作る上で,学習に必要なデータの収集は欠かせません.そこで,まず最初に過去レースのデータを収集するプログラムを作成しました.
スクレイピングプログラムを走らせると,最終的に次のような形式のjsonデータ(1レース分)がスクレイピングされたレースの数だけ入ったjsonデータが作成されます.
{
"racecourse": "racecourse",
"field_type": "field_type",
"distance": "distance",
"field_condition": "field_condition",
"race_date": "race_date",
"horse_links": "horse_links",
"race_link": "url",
"horses_data": [
{
"horse_link": "horse_link",
"past_race_data": [
{
"racecourse": "racecourse",
"field_type": "field_type",
"distance": "distance",
"field_condition": "field_condition",
"race_date": "race_date",
"time": "time",
"first_3f_time": "first_3f_time",
"last_3f_time": "last_3f_time"
}
],
"past_race_label": [
{
"placement": "placement"
}
]
}
],
"horse_placement_data": [
{
"placement": "placement"
}
]
}
大まかには,horses_data
内のpast_race_data
, past_race_label
にそれぞれの馬のレースの戦績が格納されています(説明変数と目的変数).また,上の方にあるracecourse
などは,このレースの距離や馬場状態,開催競馬場などが入っています.horse_placement_data
には,このレースでのそれぞれの馬の着順が入っています(最終的な目的変数).
実際のスクレイピングは,scrape_race_data
, scrape_horse_data
, create_race_data_and_horse_data
という3つの関数で行いました.scrape_race_data
であるレースの情報とそのレースに出走した馬のページへのリンクを取得して,それをcreate_race_data_and_horse_data
の引数として渡し,その内部でscrape_horse_data
を呼び出して目的のjsonデータを作る,といった感じです.以下にプログラムの一部を抜粋して示します.
def create_race_data_and_horse_data(race_data):
"""
race_dataのhorse_linksを基に, 各馬の過去データを取得して統合する関数。
input: race_data (辞書形式)
output: 完全なレースデータ(各馬のデータを含む辞書)
complete_race_data = {
"racecourse": racecourse,
"field_type": field_type,
"distance": distance,
"field_condition": field_condition,
"race_date": race_date,
"horse_links": horse_links,
"race_link": url,
"horses_data" = [
"horse_link": horse_link,
"past_race_data": {[
"racecourse": racecourse,
"field_type": field_type,
"distance": distance,
"field_condition": field_condition,
"race_date": race_date,
"time": time,
"first_3f_time": first_3f_time,
"last_3f_time": last_3f_time,],
},
"past_race_label": [
{"placement": placement},
],
],
"horse_placement_data" = [
{"placement": placement},
],
}
"""
# 元のrace_dataをディープコピー
complete_race_data = copy.deepcopy(race_data)
# 出走馬のリンク
horse_links = race_data["horse_links"]
# 各馬のデータを格納するリスト
horses_data = []
horses_placement_data = []
# そのレースでの着順を保存するようにする
placement = 1
for horse_link in horse_links:
try:
# 各馬のデータをスクレイピング
past_race_data, past_race_label = scrape_horse_data(horse_link)
# 馬データを辞書にまとめる
horse_data = {
"horse_link": horse_link,
"past_race_data": past_race_data,
"past_race_label": past_race_label,
}
horse_placement_data = {
"placement": placement
}
# jsonで保存するため,datetimeを文字列に変換
horse_data = convert_datetime_to_string(horse_data)
# リストに追加
horses_data.append(horse_data)
horses_placement_data.append(horse_placement_data)
placement += 1
# 待機
time.sleep(1)
except Exception as e:
print(f"Error processing horse link {horse_link}: {e}")
continue
# 結合してシャッフル
combined_data = list(zip(horses_data, horses_placement_data))
random.shuffle(combined_data)
# 再分離
horses_data, horses_placement_data = zip(*combined_data)
# 完全なレースデータに出走馬のデータを追加
complete_race_data["horses_data"] = list(horses_data)
complete_race_data["horses_placement_data"] = list(horses_placement_data)
# complete_race_dataのdatetimeを文字列に変換
complete_race_data = convert_datetime_to_string(complete_race_data)
print(f"Successfully created complete data for {complete_race_data["race_link"]}")
return complete_race_data
if __name__ == "__main__":
year = "2023"
raw_race_data_file = "raw_race_data.json"
# 既存データの読み込み(ファイルがあれば)
try:
with open(raw_race_data_file, "r", encoding="utf-8") as file:
raw_race_data = json.load(file)
except FileNotFoundError:
raw_race_data = {}
# レースデータのスクレイピング
for racecourse_id in range(5, 6): # was 1, 11 (時間がかかるので東京(5)と中山(6)だけ)
for event_count in range(3, 8):
for event_day in range(1, 13):
for race_number in range(9, 13):
# レースデータを取得
race_data = scrape_race_data(
year,
f"{racecourse_id:02}",
f"{event_count:02}",
f"{event_day:02}",
f"{race_number:02}",
)
if race_data:
try:
# レースデータに出走馬の過去データを追加
complete_race_data = create_race_data_and_horse_data(race_data)
# レースURLをキーとして辞書に追加
race_url = complete_race_data["race_link"]
raw_race_data[race_url] = complete_race_data
# JSONファイルに保存
with open(raw_race_data_file, "w", encoding="utf-8") as file:
json.dump(raw_race_data, file, ensure_ascii=False, indent=4)
print(f"Saved data for race: {race_url}")
except Exception as e:
print(f"Error processing complete race data: {e}")
time.sleep(1) # 待機
本当は2023年の全データを取得したかったのですが,あまりにも時間がかかったので2023年の東京,中山開催の9レース~12レースのデータを取得することにしました.最終的なサンプル数は360程度です(少ない).
これで,学習させるレースのデータをraw_race_data.json
として保存できました.次にいよいよモデル構築に移ります.
2. 競馬を予想する機械学習モデルの作成
いよいよ,肝となるモデル構築に移ります.結論から言うと,あまり上手くいきませんでした.大まかにはLinear層とLSTM層で構成されていますが,LSTMについてよくわからないままLSTMを使っているため,正しい使い方ができていないかもしれません.また,モデル内でのデータの流れが適切でない可能性もあります.ただ,一応形にはなり,予想タスクも遂行できたので,簡単に流れだけ解説します.
まず,モデルのコードを示します.
# モデルの定義
class LSTM_FNN_Model(nn.Module):
def __init__(self, horse_input_dim, race_meta_input_dim, lstm_hidden_dim, lstm_layers, fnn_hidden_dim, max_past_races=5):
super(LSTM_FNN_Model, self).__init__()
self.max_past_races = max_past_races
# LSTM: 各馬の過去データを独立して処理
self.lstm = nn.LSTM(horse_input_dim, lstm_hidden_dim, lstm_layers, batch_first=True)
# 馬ごとの埋め込みを変換
self.horse_fc = nn.Linear(lstm_hidden_dim, fnn_hidden_dim)
# レースメタデータの処理
self.race_meta_fc = nn.Sequential(
nn.Linear(race_meta_input_dim, fnn_hidden_dim),
nn.ReLU()
)
# 結合後の最終予測
self.final_fc = nn.Sequential(
nn.Linear(fnn_hidden_dim * 2, 1) # LSTM出力 + メタデータ
)
# 初期化
self._initialize_weights()
def _initialize_weights(self):
# 重みを適切に初期化する
# LSTM
for name, param in self.lstm.named_parameters():
if "weight_ih" in name: # 入力-隠れ層の重み
nn.init.xavier_uniform_(param.data) # Xavier初期化
elif "weight_hh" in name: # 隠れ層-隠れ層の重み
nn.init.orthogonal_(param.data) # 正則初期化
elif "bias" in name: # バイアス
nn.init.constant_(param.data, 0) # 0で初期化
# 馬の埋め込み用全結合層
nn.init.xavier_uniform_(self.horse_fc.weight)
nn.init.constant_(self.horse_fc.bias, 0)
# レースメタデータ用全結合層
for layer in self.race_meta_fc:
if isinstance(layer, nn.Linear):
nn.init.xavier_uniform_(layer.weight)
nn.init.constant_(layer.bias, 0)
# 最終結合層
for layer in self.final_fc:
if isinstance(layer, nn.Linear):
nn.init.xavier_uniform_(layer.weight)
nn.init.constant_(layer.bias, 0)
def forward(self, past_race_data, race_meta):
"""
past_race_data: [batch_size, max_horses=18, max_past_races=5, horse_input_dim=7]
race_meta: [batch_size, race_meta_input_dim]
"""
batch_size, max_horses = past_race_data.size(0), past_race_data.size(1)
horse_embeddings = []
for i in range(max_horses):
horse_data = past_race_data[:, i, :, :] # lstm入力データ [batch_size, max_past_races, horse_input_dim]
lstm_out, _ = self.lstm(horse_data) # # lstm出力データ(raw) [batch_size, max_past_races, lstm_hidden_dim]
lstm_last_step = lstm_out[:, -1, :] # lstmの最終出力 [batch_size, lstm_hidden_dim]
horse_emb = self.horse_fc(lstm_last_step) # lstmの最終出力をlinearで結合しやすくする [batch_size, fnn_hidden_dim]
horse_embeddings.append(horse_emb)
horse_embeddings = torch.stack(horse_embeddings, dim=1) # 馬全頭分の埋め込み [batch_size, max_horses, fnn_hidden_dim]
race_emb = self.race_meta_fc(race_meta).unsqueeze(1).expand(-1, max_horses, -1) # raec_metaをmax_horsesに拡張する [batch_size, max_horses, fnn_hidden_dim]
combined = torch.cat((horse_embeddings, race_emb), dim=2) # [batch_size, max_horses, fnn_hidden_dim * 2]
predictions = self.final_fc(combined).squeeze(-1) # 最終予想 [batch_size, max_horses]
return predictions
重みの初期化関数_initialize_weights
はとりあえずいいとして,モデルの流れforward
を見ていきます.
まず,馬の過去5走のデータをLSTMとLinearでいい感じの(?)埋め込みベクトルに変換しています.
horse_data = past_race_data[:, i, :, :] # lstm入力データ [batch_size, max_past_races, horse_input_dim]
lstm_out, _ = self.lstm(horse_data) # # lstm出力データ(raw) [batch_size, max_past_races, lstm_hidden_dim]
lstm_last_step = lstm_out[:, -1, :] # lstmの最終出力 [batch_size, lstm_hidden_dim]
horse_emb = self.horse_fc(lstm_last_step) # lstmの最終出力をlinearで結合しやすくする [batch_size, fnn_hidden_dim]
これをhorse_embeddings
に積んでいき,予想に必要な馬の埋め込みベクトルを合体させたテンソルhorse_embeddings
を作ります.
次に,予想するレースの馬場状態や競馬場,芝かダートか,何m走るのかなどのrace_meta
を埋め込みベクトルに変換します.このままだとhorse_embeddings
との間で形状の一貫性が取れないので,いい感じに拡張してrace_emb
とします.
race_emb = self.race_meta_fc(race_meta).unsqueeze(1).expand(-1, max_horses, -1) # raec_metaをmax_horsesに拡張する [batch_size, max_horses, fnn_hidden_dim]
これらを結合して,最終的な予想の1つ前のcombined
テンソルを作ります.
combined = torch.cat((horse_embeddings, race_emb), dim=2) # [batch_size, max_horses, fnn_hidden_dim * 2]
最後にこれをLinearに突っ込んで,最終的な予想predictions
を得ます.今回,predictions
はレースに出走する馬の着順の予想値としました.例えば,3頭の馬が出走するレースがあるとしたら,[2.5, 1.2, 0.8]
のようになります.
モデルの定義は終わったので,いよいよ学習させていきます.学習のロジックとして,次のような関数train
を定義しました.
# 学習と関数
def train(model, dataloader, criterion, optimizer, device, penalty_weight=100):
model.train()
total_loss = 0
for past_race_data, race_meta, past_labels, current_labels in dataloader:
past_race_data = normalize_features(past_race_data)
race_meta = normalize_features(race_meta)
past_race_data = past_race_data.to(device) # [batch_size, num_horses, max_past_races, horse_input_dim]
race_meta = race_meta.to(device) # [batch_size, race_meta_input_dim]
current_labels = current_labels.to(device) # [batch_size, num_horses]
optimizer.zero_grad()
predictions = model(past_race_data, race_meta) # [batch_size, num_horses]
# 損失計算
main_loss = criterion(predictions, current_labels)
# ペナルティ計算(同じような値になるのを防ぐ) (うまくいかなかった)
# batch_variance = torch.var(predictions, dim=1) # 各バッチ内の馬ごとの分散
# penalty = penalty_weight * torch.mean(1.0 / (batch_variance + 1e-6)) # ペナルティの計算
loss = main_loss
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
normalize_features
は,学習に使うjsonデータを正規化するための関数です.また,損失関数として次のranknet_loss
関数を用いました.
def ranknet_loss(predictions, labels):
"""
predictions: [batch_size, max_horses]
labels: [batch_size, max_horses]
"""
pairwise_diff = predictions.unsqueeze(2) - predictions.unsqueeze(1) # [batch_size, max_horses, num_horses]
pairwise_labels = (labels.unsqueeze(2) < labels.unsqueeze(1)).float() # [batch_size, max_horses, num_horses]
pairwise_loss = torch.log1p(torch.exp(-pairwise_diff * pairwise_labels))
return torch.mean(pairwise_loss)
このモデルを実際に学習させて,僕だけの最強競馬予想AIを作りました.学習はこんな感じです.
if __name__ == "__main__":
# ハイパーパラメータ
horse_input_dim = 7
race_meta_input_dim = 4
lstm_hidden_dim = 64
lstm_layers = 2
fnn_hidden_dim = 32
max_past_races = 5
max_horses = 18
batch_size = 8 # was 8
epochs = 10 # was 10
learning_rate = 0.001 # was 0.001
# データセットとデータローダー
dataset = HorseRaceDataset("raw_race_data.json", max_past_races=max_past_races, max_horses=max_horses)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# データ形状の確認
for past_race_data, race_meta, past_race_labels, placement in dataloader:
print("past_race_data:", past_race_data.shape)
print("race_meta:", race_meta.shape)
print("past_race_labels:", past_race_labels.shape)
print("placement:", placement.shape)
break
# モデル, 損失関数, 最適化
model = LSTM_FNN_Model(horse_input_dim, race_meta_input_dim, lstm_hidden_dim, lstm_layers, fnn_hidden_dim, max_past_races).to(device)
criterion = ranknet_loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 学習と評価
for epoch in range(epochs):
train_loss = train(model, dataloader, criterion, optimizer, device)
val_loss = evaluate(model, dataloader, criterion, device)
print(f"Epoch {epoch + 1}/{epochs} - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
# モデル保存
torch.save(model.state_dict(), "horse_race_model.pth")
3. 実際に有馬記念予想してみた
モデルができたので,実際に有馬記念を予想してみました.具体的には,予想したい,まだ行われていないレースの情報を取得するscrape_target_race.py
を作成し,target_race_data.json
を作成しました.このデータを,先程学習させたモデルhorse_race_model.pth
に通して,予想された着順が若かった順に出力させました.
気になるAIの予想結果は...
予測結果:
1: スタニングローズ
2: ドウデュース
3: プログノーシス
4: アーバンシック
5: ジャスティンパレス
6: ハヤヤッコ
7: ブローザホーン
8: スターズオンアース
9: ダノンベルーガ
10: シャフリヤール
11: シュトルーヴェ
12: ダノンデサイル
13: ディープボンド
14: ベラジオオペラ
15: ローシャムパーク
16: レガレイラ
こうなりました.思ったよりいい感じに予想できていますね!当日の結果が楽しみです.
ということで,この予想結果をもとにして,ChatGPTに買い目を考えてもらいました.
予測結果:
1: スタニングローズ
2: ドウデュース
3: プログノーシス
4: アーバンシック
5: ジャスティンパレス
6: ハヤヤッコ
7: ブローザホーン
8: スターズオンアース
9: ダノンベルーガ
10: シャフリヤール
11: シュトルーヴェ
12: ダノンデサイル
13: ディープボンド
14: ベラジオオペラ
15: ローシャムパーク
16: レガレイラ
このように,有馬記念2024の馬の力関係を考察しました.上の馬ほど高評価です.この評価をもとに,予算3000円で有馬記念の買い目を考えてください.
予算3000円をもとに、有馬記念2024の馬券購入プランを提案します。この予測結果では上位馬を軸に、リスク分散しつつ高配当を狙える買い方を組み立てます。
買い目の提案
1. 馬連(2頭が1着・2着に入れば的中)
上位人気馬の組み合わせを中心にする。
- 1-2(スタニングローズ × ドウデュース)
購入額:500円- 1-3(スタニングローズ × プログノーシス)
購入額:500円- 2-3(ドウデュース × プログノーシス)
購入額:500円2. ワイド(2頭が3着以内に入れば的中)
堅実に配当を得るための買い方。
- 1-2(スタニングローズ × ドウデュース)
購入額:500円- 1-4(スタニングローズ × アーバンシック)
購入額:500円3. 三連複(3頭が着順関係なく3着以内に入れば的中)
上位3頭を中心に、高配当を狙う。
- 1-2-3(スタニングローズ × ドウデュース × プログノーシス)
購入額:500円
ChatGPT攻めますね!これで行きましょう.これで,100%AI産の予想と買い目ができました.
参考までに,僕の予想と買い目も書いておきます.是非参考にしてください.
◎ドウデュース
〇 アーバンシック
▲ スタニングローズ
△ シャフリヤール
☆ プログノーシス単勝 ドウデュース 1800円
3連単 ◎ - 〇▲△☆ - 〇▲△☆ 各100円 1200円
蛙の子は蛙なのか,似たような予想になってしまいました.どっちかでも当たると嬉しいな
おわりに
ちょっとした思い付きで始まった今回のプロジェクトですが,私にとってとても学びの多いものでした.機械学習をKCSや大学の講義で勉強していたので,簡単に作れるだろ!と舐めてかかりましたが,まずスクレイピングがとても大変でした.データセットのありがたみを感じる...
また,LSTMの使い方が間違っている気がするし,モデルの構造も間違っている気がします.学校の講義を聞くと完全に理解した気分になりますが,実際にプロジェクトを進めていく中で難しいところや,詰まるところもわかってきました.また来年,強くなった自分で有馬記念を予想したいです.
ここまでお読みくださり,ありがとうございました!
2024/12/21追記
ドウデュース出走取消らしいです😭 ここ勝って秋古馬3冠してほしかったので,とても残念です.産駒に期待したいです。