1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

パッケージを使わずにLightGBMのモデルからスコアを出す

Last updated at Posted at 2025-09-09

はじめに

パッケージを使わずにmodelファイルだけで、二値分類モデルのスコアを算出したい場面がありました。
そのためいろいろためしたのですが、まだうまくいってません・・
(パッケージのmodel.predictと比較してスコアが一致しない)

コード

model_dump.json を使う

テスト中

from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Union
import json
import math
import numpy as np
import pandas as pd


# -------------------- Loader --------------------
def load_lgb_json(path: Path) -> Dict[str, Any]:
    """
    Load LightGBM dump_model() JSON.
    Required keys: "tree_info", "feature_names", "objective", optionally "num_class".
    """
    obj = json.loads(Path(path).read_text())
    if "tree_info" not in obj:
        raise ValueError("Invalid JSON: missing 'tree_info'")
    if "feature_names" not in obj:
        # some versions put names under 'pandas_categorical' or elsewhere; require explicit list for safety
        raise ValueError("Invalid JSON: missing 'feature_names' list")
    obj.setdefault("objective", "regression")
    obj.setdefault("num_class", 1)
    return obj


# -------------------- Traversal --------------------
def _is_leaf(node: Dict[str, Any]) -> bool:
    return "leaf_value" in node

def _go_left_numeric(val: float, node: dict) -> bool:
    decision = node.get("decision_type", "<=")
    thr = float(node["threshold"])
    default_left = bool(node.get("default_left", True))
    if pd.isna(val):
        return default_left
    if decision == "<=":
        return val <= thr
    if decision == "<":
        return val < thr
    # fallback (numeric only)
    return val <= thr


def _go_left(val, node: dict) -> bool:
    decision = node.get("decision_type", "<=")
    default_left = bool(node.get("default_left", True))

    # 欠損は既定方向へ
    if pd.isna(val):
        return default_left

    if decision in ("<=", "<"):
        thr = float(node["threshold"])
        return (val <= thr) if decision == "<=" else (val < thr)

    # 等値(カテゴリ or 離散値)
    if decision == "==":
        raw = str(node.get("threshold", ""))
        cats = [c for c in raw.split("||") if c != ""]
        sval = str(val)
        try:
            fval = float(val)
            return (sval in cats) or any(_c.replace(".", "", 1).isdigit() and float(_c) == fval for _c in cats)
        except Exception:
            return sval in cats

    # 集合包含(カテゴリ集合)
    if decision == "in":
        raw = node.get("threshold")
        # threshold が "a||b||c" 文字列 or 配列の両対応
        if isinstance(raw, str):
            cats = [c for c in raw.split("||") if c != ""]
        elif isinstance(raw, list):
            cats = [str(x) for x in raw]
        else:
            cats = []
        sval = str(val)
        try:
            fval = float(val)
            in_set = (sval in cats) or any(str(x) == sval or (isinstance(x, (int, float)) and float(x) == fval) for x in cats)
        except Exception:
            in_set = (sval in cats)
        return in_set  # 属していれば左へ

    # ここに来たら未対応の分割タイプ
    raise ValueError(f"Unsupported decision_type: {decision!r}")

def _tree_predict_row(x: np.ndarray, node: dict, feat_index: dict) -> float:
    cur = node
    while True:
        if "leaf_value" in cur:
            return float(cur["leaf_value"])
        sf = cur.get("split_feature", cur.get("split_feature_name"))
        if isinstance(sf, str) and sf.isdigit():
            sf = int(sf)
        fi = feat_index[sf]
        go_left = _go_left(x[fi], cur)
        cur = cur["left_child"] if go_left else cur["right_child"]

# -------------------- Predictors --------------------
def _sigmoid(z: float) -> float:
    # stable-ish sigmoid
    if z >= 0:
        ez = math.exp(-z)
        return 1.0 / (1.0 + ez)
    ez = math.exp(z)
    return ez / (1.0 + ez)

def predict_dataframe_from_json(
    df: pd.DataFrame,
    model: Dict[str, Any],
    raw_score: bool = False,
    num_iteration: int | None = None,
) -> pd.Series:
    feat_names: List[str] = list(model["feature_names"])
    # 必須列チェック(1つでも欠けてたら止める)
    missing = [c for c in feat_names if c not in df.columns]
    if missing:
        raise ValueError(f"DataFrame is missing required features: {missing}")
    X = df.reindex(columns=feat_names)

    # feature index map (name -> position) 兼 旧JSONの数値indexもサポート
    feat_index: Dict[Union[int, str], int] = {name: i for i, name in enumerate(feat_names)}
    for i, name in enumerate(feat_names):
        feat_index[i] = i  # some dumps refer by integer id

    trees = model["tree_info"]
    if num_iteration is not None:
        trees = trees[:num_iteration]

    # learning rateは各ツリーの "shrinkage" に入っている(ない場合は1.0相当)
    raw_scores: List[float] = []
    for _, row in X.iterrows():
        s = 0.0
        xv = row.values
        for t in trees:
            shrink = float(t.get("shrinkage", 1.0))
            s += shrink * _tree_predict_row(xv, t["tree_structure"], feat_index)
        raw_scores.append(s)

    if raw_score or str(model.get("objective", "")).startswith("regression"):
        return pd.Series(raw_scores, index=df.index)
    # binary
    probs = [_sigmoid(v) for v in raw_scores]
    return pd.Series(probs, index=df.index)


def add_predictions_from_json(
    df: pd.DataFrame,
    model: Dict[str, Any],
    threshold: float = 0.5,
    raw_score: bool = False,
    num_iteration: int | None = None,
) -> pd.DataFrame:
    scores = predict_dataframe_from_json(df, model, raw_score=raw_score, num_iteration=num_iteration)
    out = df.copy()
    if not raw_score and not str(model.get("objective", "")).startswith("regression"):
        out["prediction_score"] = scores
        out["prediction"] = (scores >= threshold).astype(int)
    else:
        out["prediction_score"] = scores
        out["prediction"] = np.nan
    return out

使い方

model = load_lgb_json(Path("model_dump.json"))

# 必須列チェックを通ったら順序を自動で合わせます
df_with_pred = add_predictions_from_json(
    df=new_df,
    model=model,
    threshold=0.42,     # best threshold は別管理(meta)推奨
    raw_score=False,
    num_iteration=None,
)
print(df_with_pred.head())

【没】 model.txt を使う

model.save_model()で出したtxtファイルからスコア算出するのは厳しそう
辿るルートが全部同じになってしまう・・
以下は検討の記録

from __future__ import annotations

from pathlib import Path
from typing import Any, Dict, List, Tuple

import numpy as np
import pandas as pd
import re
from math import exp
import ast

def _parse_feature_names(raw: str) -> list[str]:
    s = raw.strip()
    # 形式: ["f1","f2",...]
    if s.startswith("[") and s.endswith("]"):
        try:
            lst = ast.literal_eval(s)
            return [str(x).strip() for x in lst if str(x).strip()]
        except Exception:
            pass
    # 形式: f1,f2,f3
    if "," in s:
        return [c.strip() for c in s.split(",") if c.strip()]
    # 形式: f1\tf2\tf3
    if "\t" in s:
        return [c.strip() for c in s.split("\t") if c.strip()]
    # 形式: f1 f2 f3
    parts = s.split()
    return [p for p in parts if p]

    
def _parse_kv(line: str) -> Dict[str, str]:
    kv: Dict[str, str] = {}
    for tok in line.strip().split():
        if "=" in tok:
            k, v = tok.split("=", 1)
            kv[k] = v
    return kv


def parse_model_txt(path: Path) -> Dict[str, Any]:
    lines = Path(path).read_text().splitlines()

    feature_names: List[str] = []
    feat_index: Dict[str, int] = {}
    learning_rate = 0.1
    objective = "regression"
    num_class = 1

    trees: List[Dict[str, Any]] = []
    cur_splits: Dict[int, Dict[str, Any]] = {}
    cur_leaves: Dict[int, float] = {}
    next_split_idx = 0
    next_leaf_idx = 0

    LEAF_ID_RE = re.compile(r"(?:\bleaf_index\b|\bleaf\b)\s*=\s*([LR]?)(\d+)")

    def close_tree() -> None:
        nonlocal cur_splits, cur_leaves, next_split_idx, next_leaf_idx
        if cur_splits or cur_leaves:
            root_id = min(cur_splits.keys()) if cur_splits else None
            trees.append({
                "splits": dict(cur_splits),
                "leaves": dict(cur_leaves),
                "root": root_id,
            })
        cur_splits, cur_leaves = {}, {}
        next_split_idx = 0
        next_leaf_idx = 0

    for line in lines:
        if line.startswith("feature_names="):
            raw = line.split("=", 1)[1]
            feature_names = _parse_feature_names(raw)
            if not feature_names:
                raise ValueError("feature_names list is empty or unparsable")
            feat_index = {name: i for i, name in enumerate(feature_names)}
            continue

        if line.startswith("learning_rate="):
            learning_rate = float(line.split("=", 1)[1]); continue
        if line.startswith("objective="):
            objective = line.split("=", 1)[1]; continue
        if line.startswith("num_class="):
            num_class = int(line.split("=", 1)[1]); continue

        if line.startswith("Tree="):
            close_tree(); continue

        if "leaf_value=" in line:
            kv = _parse_kv(line)
            # 1) 明示ID, 2) 正規表現, 3) 無ければ出現順で採番
            lid: int
            if "leaf_index" in kv or "leaf" in kv:
                s = kv.get("leaf_index", kv.get("leaf", "0"))
                lid = int(s.lstrip("L").lstrip("R"))
            else:
                m = LEAF_ID_RE.search(line)
                if m:
                    lid = int(m.group(2))
                else:
                    lid = next_leaf_idx
            cur_leaves[lid] = float(kv["leaf_value"])
            next_leaf_idx = max(next_leaf_idx + 1, lid + 1)
            continue

        if "split_feature=" in line and "threshold=" in line:
            kv = _parse_kv(line)

            # node id: split_index が無ければ出現順
            nid = int(kv["split_index"]) if "split_index" in kv else next_split_idx
            next_split_idx = max(next_split_idx + 1, nid + 1)

            # feature: int か name
            sf = kv["split_feature"]
            if sf.lstrip("-").isdigit():
                feat_id = int(sf)
            else:
                if not feat_index:
                    raise ValueError("feature_names= not found but split_feature is name")
                if sf not in feat_index:
                    raise ValueError(f"unknown split_feature name: {sf}")
                feat_id = feat_index[sf]

            # child 解析
            def parse_child(v: str) -> Tuple[str, int]:
                if v.startswith("leaf="):
                    return "leaf", int(v.split("=", 1)[1])
                v2 = v.lstrip("L").lstrip("R")
                return ("split", int(v2))

            lk, lv = parse_child(kv["left_child"])
            rk, rv = parse_child(kv["right_child"])

            cur_splits[nid] = {
                "feature": feat_id,
                "threshold": float(kv["threshold"]),
                "default_left": kv.get("default_left", "1") == "1",
                "left_kind": lk, "left": lv,
                "right_kind": rk, "right": rv,
            }
            continue

    close_tree()

    if not feature_names:
        raise ValueError("feature_names= not found in model.txt")

    return {
        "learning_rate": learning_rate,
        "objective": objective,
        "num_class": num_class,
        "feature_names": feature_names,
        "trees": trees,
    }


def _sigmoid(x: float) -> float:
    return 1.0 / (1.0 + exp(-x))


def _predict_one_row(x: np.ndarray, tree: Dict[str, Any]) -> float:
    splits, leaves = tree["splits"], tree["leaves"]

    # Case 1: single-leaf tree (no splits)
    if not splits:
        if 0 in leaves:
            return leaves[0]
        return leaves[min(leaves.keys())]

    # Determine root
    nid = tree.get("root")
    if nid is None or nid not in splits:
        nid = min(splits.keys())

    # Traverse
    while True:
        node = splits.get(nid)
        if node is None:
            raise KeyError(f"split node id {nid} not found in tree splits")
        val = x[node["feature"]]
        go_left = node["default_left"] if pd.isna(val) else (val <= node["threshold"])
        kind, cid = (node["left_kind"], node["left"]) if go_left else (node["right_kind"], node["right"])
        if kind == "leaf":
            if cid not in leaves:
                # Fallback: some dumps use sequential leaf ids; try nearest existing id
                return leaves.get(cid, leaves[min(leaves.keys())])
            return leaves[cid]
        nid = cid



def predict_dataframe(
    df: pd.DataFrame,
    model: Dict[str, Any],
    raw_score: bool = False,
    num_iteration: int | None = None,
) -> pd.Series:
    trees = model["trees"] if num_iteration is None else model["trees"][:num_iteration]
    feat_names = model["feature_names"]

    # 欠損列があれば止める
    missing = [c for c in feat_names if c not in df.columns]
    if missing:
        raise ValueError(f"DataFrame is missing required features: {missing}")

    # 余計な列は無視、順序だけ合わせる
    X = df.reindex(columns=feat_names)

    scores: list[float] = []
    for _, row in X.iterrows():
        s = 0.0
        for t in trees:
            s += model["learning_rate"] * _predict_one_row(row.values, t)
        scores.append(s)

    if raw_score:
        return pd.Series(scores, index=df.index)
    if model["objective"].startswith("binary"):
        return pd.Series([_sigmoid(v) for v in scores], index=df.index)
    return pd.Series(scores, index=df.index)


def add_predictions(
    df: pd.DataFrame,
    model: Dict[str, Any],
    threshold: float = 0.5,
    raw_score: bool = False,
    num_iteration: int | None = None,
) -> pd.DataFrame:
    scores = predict_dataframe(df, model, raw_score=raw_score, num_iteration=num_iteration)
    out = df.copy()
    if model["objective"].startswith("binary") and not raw_score:
        out["prediction_score"] = scores
        out["prediction"] = (scores >= threshold).astype(int)
    else:
        out["prediction_score"] = scores
        out["prediction"] = np.nan
    return out

使い方

model = parse_model_txt(Path("model.txt"))
new_df = pd.read_csv("new_data.csv")

df_with_pred = add_predictions(
    df=new_df,
    model=model,
    threshold=0.42,
    raw_score=False,
    num_iteration=None,
)
print(df_with_pred.head())
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?