0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ChatGPTと作る競馬予想AI (6)

Posted at

プログラミング初心者がChatGPTを使って競馬予想AIを作ることで、生成AIとプログラミングについて学んでいく企画の第6回です。

前回の記事はこちらから

前回はレース結果ページのスクレイピングを行い、約2万件のレース結果を取得することができました。いよいよここから機械学習モデルへ入力し、予測を行っていきます。

どのくらい完成に近づいているの?

現状、この競馬AIがどこまで来ているのかを料理(カレー作り)で例えてみましょう。
カレーを作る流れと、それに対応する競馬予想AIの作成手順です。

カレー作り 競馬予想AI
食材の調達 データのスクレイピング
食材の下処理 データの前処理
鍋に入れて煮込む AIモデルの学習
お皿の用意 検証データの用意
実食 モデル評価

この表に従うとまだ食材の調達(=データのスクレイピング) が済んだ段階です。
まだまだ道のりは長いですね…

したがってこれからは食材の下処理(=データの前処理) に移ります。

データの前処理

前回pickleファイルで保存した、レース結果がまとまったrace_result_table.pklは以下の通りです。

image.png

このデータを機械学習モデルに入力するには次の8つの処理が必要です。

  1. 着順の目的変数化
  2. 馬名のhorse_id化
  3. 性齢の分割
  4. 騎手のjockey_id化
  5. タイムを秒数に変換
  6. 厩舎のtrainer_id化
  7. 馬体重の分割
  8. 全データの数値化(to_numeric())

多いですね…
たくさん挙げていますが、要するに「すべてのデータは数値型で入力してあげてね」という話です。
どうしても数値に直せないものは適当な数値でラベリングしてあげます(これをエンコーディングといいます)。
さて、以降では関数を作りtableに対して上記の処理を施しましょう。

テーブルの処理

horse_idやjocky_idはhtmlから取得しなければいけないため、関数を修正してテーブルを作り直しちゃいましょう。
取得の段階では"2, 4, 6"を行い、追加する前に"1, 3, 5, 7, 8"を行いましょう。

HTMLからスクレイピングする関数(parse_race_html)を"horse_id, jockey_id, trainer_id"が得られるように改良します。

# what: HTMLを解析してレース結果テーブルをDataFrame化する関数
# for:  AIモデルの入力形式に合わせる
# in:   取得したhtml(.bin)
# out:  レース結果テーブル(DataFrame)
def parse_race_html(html_text):
    soup = BeautifulSoup(html_text, "html.parser")
    # result_table = soup.find("table", id="All_Result_Table")
    # result_table = soup.find("table", class_="RaceCommon_Table ResultRefund")
    result_table = soup.find("table", class_="RaceTable01")
    
    if not result_table:
        raise ValueError("レース結果テーブルが見つかりません。")

    rows = result_table.find_all("tr")[1:]  # ヘッダを除外
    race_data = []

    for row in rows:
        cols = row.find_all("td")
        if len(cols) < 15:
            continue

        # --- 馬IDを取得 ---
        horse_tag = row.find("a", href=re.compile(r"/horse/(\d+)"))
        horse_id = re.search(r"/horse/(\d+)", horse_tag["href"]).group(1) if horse_tag else None

        # --- 騎手IDを取得 ---
        jockey_tag = row.find("a", href=re.compile(r"/jockey/result/recent/(\d+)/"))
        jockey_id = re.search(r"/jockey/result/recent/(\d+)/", jockey_tag["href"]).group(1) if jockey_tag else None

        # --- 調教師IDを取得 ---
        trainer_tag = row.find("a", href=re.compile(r"/trainer/result/recent/(\d+)/"))
        trainer_id = re.search(r"/trainer/result/recent/(\d+)/", trainer_tag["href"]).group(1) if trainer_tag else None

        race_data.append([
            cols[0].get_text(strip=True),  # 着順
            horse_id,                      # 馬ID
            cols[4].get_text(strip=True),  # 性齢
            cols[5].get_text(strip=True),  # 斤量
            jockey_id,                     # 騎手ID
            cols[7].get_text(strip=True),  # タイム
            cols[9].get_text(strip=True),  # 人気
            cols[10].get_text(strip=True), # オッズ
            cols[11].get_text(strip=True), # 後3F
            trainer_id,                    # 厩舎ID
            cols[14].get_text(strip=True), # 馬体重
        ])

    df = pd.DataFrame(race_data, columns=[
        "rank", "horse_id", "sex&age", "weight_carried",
        "jockey_id", "time", "popularity", "odds", "last3f",
        "trainer_id", "body_weight"])
    # df = pd.DataFrame(race_data, columns=["着順", "馬名", "性齢", "斤量", "騎手", "タイム", "人気", "オッズ", "後3F", "厩舎", "馬体重"])
    return df

この関数をもちいてHTMLの解析→テーブル化→pickleファイルに保存を行います(この関数は過去記事参照)。
pickleファイルから出来上がったテーブルを確認します。

image.png

それぞれidとして得られていますね!

続いて以下の項目について取り掛かっていきましょう。

  1. 着順の目的変数化
  2. 性齢の分割
  3. タイムを秒数に変換
  4. 馬体重の分割
  5. 全データの数値化(to_numeric())

まず3, 5, 7については関数として定義してあげましょう。

# what: レース結果テーブルの前処理をする関数
# for:  AIモデルがうけつけられるようにする
# in:   レース結果テーブル(.pkl)
# out:  レース結果テーブル(.pkl)

def parse_sex_age(sexage):
    # 例: "牡4" -> ("牡", 4)
    if pd.isna(sexage): return (np.nan, np.nan)
    s = str(sexage)
    sex = s[0]
    try:
        age = int(s[1:])
    except:
        age = np.nan
    return sex, age

def parse_bodyweight(bw):
    # "494(-4)" -> weight=494, diff=-4
    try:
        s = str(bw)
        if "(" in s:
            w = int(s.split("(")[0])
            diff = int(s.split("(")[1].rstrip(")"))
        else:
            w = int(s)
            diff = np.nan
        return w, diff
    except:
        return (np.nan, np.nan)
    
def time_to_seconds(tstr):
    # "1:51.3" -> seconds float
    try:
        if pd.isna(tstr): return np.nan
        if ":" in str(tstr):
            mm, ss = str(tstr).split(":")
            return int(mm) * 60 + float(ss)
        else:
            return float(tstr)
    except:
        return np.nan

そしてこの関数を用いて全ての列を修正します。
取得したテーブルに前処理を施す関数を用いて、さらに既にある関数に結合する処理をしてあげます。

# what: 各レースのbinファイルからレース結果を抽出し、1つのテーブルに結合しpickleで保存する関数
# for:  特徴量の抽出用
# in:   取得したrace_id_list(.csv)とhtml(.bin)
# out:  結合されたresult_table(.pickle)
csv_path = r"C:\Users\yasak\Desktop\mykeibaAI_ver1p0\data\race_id_list_2310_2402.csv"
result_table_path = r"C:\Users\yasak\Desktop\mykeibaAI_ver1p0\data\race_result_table.pkl"
bin_dir = r"C:\Users\yasak\Desktop\mykeibaAI_ver1p0\data\race_result_html"
new_result_df = pd.read_csv(csv_path)

# 既存pickleのrace_idを確認
existing_df = pd.read_pickle(result_table_path)
existing_ids = set(existing_df["race_id"].astype(str))

# 新しく解析するrace_idだけを抽出
target_ids = [str(rid) for rid in df["race_id"] if str(rid) not in existing_ids]

new_dfs = []
for race_id in tqdm(target_ids, total=len(target_ids)):
    bin_path = os.path.join(bin_dir, f"{race_id}.bin")
    if not os.path.exists(bin_path):
        print(f"Missing bin file: {race_id}")
        continue
    try:
        # --- 展開 ---
        with open(bin_path, "rb") as f:
            html_text = f.read().decode("EUC-JP", errors="ignore")
        # --- HTML解析 ---
        df_race = parse_race_html(html_text)
        df_race.insert(0, "race_id", race_id)
        new_dfs.append(df_race)
    except Exception as e:
        print(f"Error fetching {race_id}: {e}")
new_result_df = pd.concat(new_dfs, ignore_index=True)

# 追加するテーブルに前処理をしておく
new_result_df.insert(2, "is_win", (new_result_df["rank"].astype(str).str.strip() == "1").astype(int)) # ターゲットエンコーディング(1着のみ1)
new_result_df[["sex","age"]] = new_result_df["sex&age"].apply(lambda x: pd.Series(parse_sex_age(x)))
new_result_df["age"] = new_result_df["age"].astype(float)
new_result_df["weight_carried"] = pd.to_numeric(new_result_df["weight_carried"], errors="coerce")
new_result_df["time_sec"] = new_result_df["time"].apply(time_to_seconds)
new_result_df["last3f"] = pd.to_numeric(new_result_df["last3f"], errors="coerce")
new_result_df[["body_weight","body_diff"]] = new_result_df["body_weight"].apply(lambda x: pd.Series(parse_bodyweight(x)))
new_result_df["odds"] = pd.to_numeric(new_result_df["odds"], errors="coerce")
new_result_df["popularity"] = pd.to_numeric(new_result_df["popularity"], errors="coerce")
new_result_df = new_result_df.drop(columns=["sex&age", "time"])

new_result_df = pd.concat([existing_df, new_result_df], ignore_index=True)
print(f"✅ 新規{len(new_result_df)}件を追加しました(合計 {len(result_df)} 件)")
print(result_df[:5])
result_table = result_df.to_pickle(result_table_path)

"# 追加するテーブルに前処理をしておく"以降の処理が新規に追加したものになりますが、1, 5については一行のプログラムで何とかなってしまいます。

さあ、これで必要なテーブルは完成しました。
これでようやく予測ができる…かと思いきや、まだ馬や騎手ごとの成績をまとめなければなりませんし、その際のエンコーディング方法や予測結果の評価指標など、検討することは盛りだくさんです。
最後に今後の方針について、ChatGPTに聞いておきました。
参考として以下に残しておきます。

参考:ChatGPTの指南

いいですね — データ量(約2万件)があれば LightGBM で「単勝(1着)を当てる二値確率予測」は現実的にチャレンジできます。
ここでは 実践で使える手順 + コード例(前処理→特徴量生成→エンコーダ→LightGBM学習→評価) を示します。
要点・落とし穴・改善案も最後にまとめます。

前提:あなたの result_df は既に以下の列を持っている想定です(例)
race_id, 着順, 馬名, 性齢, 斤量, 騎手, タイム, 人気, オッズ, 後3F, 厩舎, 馬体重
race_id の先頭8桁は YYYYMMDD(例: 20230904...)という前提で日付順ソートできます。違えば日付列を用意してください。


全体の方針(要点)

  1. 目的変数(target)を作る:is_win = (着順 == 1)(binary)
  2. データを時系列順にソートして、過去のみの情報で特徴量を作る(情報漏洩を防ぐ)
  3. 馬・騎手・厩舎ごとの**累積統計(過去の出走数・勝率・複勝率・平均着順・直近成績)**を生成
  4. テキスト列(騎手・厩舎・馬名など)には**frequency encoding / target-encoding(CV内)**を利用
  5. 数値列(オッズ、後3F、斤量、馬体重)を数値化(型変換)
  6. 学習は 時系列分割(学習は過去→検証は未来) で行う
  7. LightGBM(binary + predict_proba)で確率を出力。評価は log loss / AUC / top-kのHit率 など

累積統計(past-only features)作成 — 重要!

過去データだけを使うように、レース日でソートして groupby().expanding().agg() 形式で累積値を作ります。

# 日付でソート(同一日ならrace_id順)
df = df.sort_values(["race_date","race_id"]).reset_index(drop=True)

# horseごとの累積統計(出走前の成績を取得するためにshift)
horse_stats = []
for name, g in df.groupby("馬名"):
    g = g.sort_values(["race_date","race_id"]).copy()
    g["horse_starts"] = np.arange(len(g))
    g["horse_wins"] = g["is_win"].shift(1).fillna(0).cumsum()  # wins up to previous races
    g["horse_places"] = g["is_win"].shift(1).fillna(0).cumsum() # adjust if place desired
    g["horse_win_rate"] = g["horse_wins"] / g["horse_starts"].replace(0, np.nan)
    horse_stats.append(g)
df = pd.concat(horse_stats).sort_index()

# 注意: 上の処理は簡易。より正確には expanding().sum().shift(1) を使う。
# 例 (効率良い方法):
df = df.sort_values(["race_date","race_id"])
grp = df.groupby("馬名", sort=False)
df["horse_starts"] = grp.cumcount()
df["horse_wins"] = grp["is_win"].apply(lambda x: x.shift(1).fillna(0).cumsum())
df["horse_win_rate"] = df["horse_wins"] / df["horse_starts"].replace(0, np.nan)

ポイント:shift(1) を使って「そのレースの結果は含めない」ようにすること(情報漏洩防止)。

同様に騎手・厩舎の累積成績も作ります:

# jockey stats
grp = df.groupby("騎手", sort=False)
df["jockey_starts"] = grp.cumcount()
df["jockey_wins"] = grp["is_win"].apply(lambda x: x.shift(1).fillna(0).cumsum())
df["jockey_win_rate"] = df["jockey_wins"] / df["jockey_starts"].replace(0, np.nan)

# trainer stats
grp = df.groupby("厩舎", sort=False)
df["trainer_starts"] = grp.cumcount()
df["trainer_wins"] = grp["is_win"].apply(lambda x: x.shift(1).fillna(0).cumsum())
df["trainer_win_rate"] = df["trainer_wins"] / df["trainer_starts"].replace(0, np.nan)

カテゴリ変数のエンコーディング方針

  • 低カーディナリティ(sex, age_bin) → one-hot(pd.get_dummies)でも可
  • 高カーディナリティ(馬名、騎手、厩舎) → frequency encoding(出現回数)や mean-target encoding(K-foldでCV内部)
  • 今回はまず安全な frequency encoding として実装、その後 target-encoding を CV 内で導入するとよい。
# frequency encoding
for col in ["馬名","騎手","厩舎"]:
    freq = df[col].value_counts(dropna=False)
    df[f"{col}_freq"] = df[col].map(freq).fillna(0)

# 簡単カテゴリ→one-hot 例
df = pd.get_dummies(df, columns=["sex"], prefix="sex", dummy_na=True)

target encoding(注意:学習データでリークさせない)

簡単な方法:学習用のfoldごとに mean(target) を計算して代入する。
(sklearn の KFold と手作りで実装するか package を利用)


特徴量一覧(候補)

  • 馬関連: horse_starts, horse_wins, horse_win_rate, last_n_positions (直近3走の平均着順)
  • 騎手関連: jockey_starts, jockey_win_rate, jockey_freq
  • 厩舎関連: trainer_starts, trainer_win_rate, trainer_freq
  • レース/馬券関連: odds, popularity, weight_carried, body_weight, body_diff, time_sec, last3f
  • コース・距離・馬場(取れるなら): course, distance, going
  • レース毎のダミー:出走頭数(field_size)、枠順(もしあるなら)

学習(LightGBM)コード例

# 学習に使う列を決める(例)
features = [
    "odds","popularity","time_sec","last3f","weight_carried","body_weight","body_diff",
    "horse_starts","horse_wins","horse_win_rate",
    "jockey_starts","jockey_win_rate",
    "trainer_starts","trainer_win_rate",
    "馬名_freq","騎手_freq","厩舎_freq",
    "sex_牡","sex_牝","sex_セ"  # get_dummiesの例
]
# 欠損補完
X = df[features].fillna(-999)
y = df["is_win"]

# 時系列分割(直近をバリデーション)
train_mask = df["race_date"] < "2023-01-01"  # 例:日付で分ける
valid_mask = df["race_date"] >= "2023-01-01"
X_train, y_train = X[train_mask], y[train_mask]
X_val, y_val = X[valid_mask], y[valid_mask]

lgb_train = lgb.Dataset(X_train, label=y_train)
lgb_val = lgb.Dataset(X_val, label=y_val, reference=lgb_train)

params = {
    "objective":"binary",
    "metric":["binary_logloss","auc"],
    "learning_rate":0.05,
    "num_leaves":64,
    "feature_fraction":0.8,
    "bagging_fraction":0.8,
    "bagging_freq":5,
    "seed":42,
    "verbosity":-1
}

model = lgb.train(params, lgb_train, valid_sets=[lgb_train,lgb_val], num_boost_round=2000,
                  early_stopping_rounds=100, verbose_eval=50)

# 予測・評価
y_pred = model.predict(X_val, num_iteration=model.best_iteration)
print("logloss:", log_loss(y_val, y_pred))
print("AUC:", roc_auc_score(y_val, y_pred))

評価指標(競馬向けの考え方)

  • binary_logloss:確率予測の品質(重要)
  • AUC:順序性(参考)
  • top-k hit rate:各レースで上位k頭を確率順に取り、実際に勝った馬がk内に入る割合(現実的指標)
  • 期待回収率(回収率):実際のオッズとモデル推薦を使ってシミュレーション(最終目標)

改善ポイント・注意点(重要)

  1. 情報漏洩に注意:累積統計は必ず過去のみで計算(shift(1) など)。学習/検証の分割も時系列で。
  2. 馬の寿命問題:馬の成績は急に変わる。直近重視の特徴(直近3走の平均着順、trend)を入れると効果的。
  3. カテゴリの扱い:騎手・厩舎は多数のカテゴリがある。frequency + target-encoding(CV内) が有効。
  4. オッズの利用:オッズ(市場の期待)は強力な説明変数だが、予測目的によっては使わない(ベッティング戦略次第)。
  5. 不均衡対策:1着は1/出走頭数程度で極めて少ない。scale_pos_weight を使うか、適切な評価指標でモデルを選ぶ。
  6. ハイパーパラメータ探索:Optuna などで num_leaves, learning_rate, min_data_in_leaf をチューニング。
  7. Feature importance確認:SHAPで解釈性を確認し、不要特徴を削除する。
  8. Time-based CV:ランダムCVは不可。必ず時系列分割で性能評価。

次にやるべき実作業(優先度順)

  1. race_date を確実に作成(race_idから or レースページから)し、時系列ソートを行う。
  2. 馬・騎手・厩舎の**累積統計(shift付き)**を作る(上のコード参照)。
  3. 欠損・文字列パース(体重、タイム)を完了させる。
  4. 簡単なモデル(上の LightGBM)でベースラインを作る(logloss/AUC/top-kを計測)。
  5. カテゴリのエンコード(freq→target)・ハイパーパラメータ調整・SHAP分析 を順次実施。

もしよければ、次はどれを一緒にやりますか?

  • A: 「累積統計(馬・騎手・厩舎)の正確な実装コード(shift付き、効率的)」
  • B: 「target-encoding を時系列安全に使う実装 + LightGBM 学習フルパイプライン」
  • C: 「モデル評価:top-kヒット率と回収率シミュレーター」

やりたいものを選んでください。どれでも作ります。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?