はじめに
パッケージを使わずにmodelファイルだけで、二値分類モデルのスコアを算出したい場面がありました。
そのためいろいろためしたのですが、まだうまくいってません・・
(パッケージのmodel.predictと比較してスコアが一致しない)
コード
model_dump.json を使う
テスト中
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Union
import json
import math
import numpy as np
import pandas as pd
# -------------------- Loader --------------------
def load_lgb_json(path: Path) -> Dict[str, Any]:
"""
Load LightGBM dump_model() JSON.
Required keys: "tree_info", "feature_names", "objective", optionally "num_class".
"""
obj = json.loads(Path(path).read_text())
if "tree_info" not in obj:
raise ValueError("Invalid JSON: missing 'tree_info'")
if "feature_names" not in obj:
# some versions put names under 'pandas_categorical' or elsewhere; require explicit list for safety
raise ValueError("Invalid JSON: missing 'feature_names' list")
obj.setdefault("objective", "regression")
obj.setdefault("num_class", 1)
return obj
# -------------------- Traversal --------------------
def _is_leaf(node: Dict[str, Any]) -> bool:
return "leaf_value" in node
def _go_left_numeric(val: float, node: dict) -> bool:
decision = node.get("decision_type", "<=")
thr = float(node["threshold"])
default_left = bool(node.get("default_left", True))
if pd.isna(val):
return default_left
if decision == "<=":
return val <= thr
if decision == "<":
return val < thr
# fallback (numeric only)
return val <= thr
def _go_left(val, node: dict) -> bool:
decision = node.get("decision_type", "<=")
default_left = bool(node.get("default_left", True))
# 欠損は既定方向へ
if pd.isna(val):
return default_left
if decision in ("<=", "<"):
thr = float(node["threshold"])
return (val <= thr) if decision == "<=" else (val < thr)
# 等値(カテゴリ or 離散値)
if decision == "==":
raw = str(node.get("threshold", ""))
cats = [c for c in raw.split("||") if c != ""]
sval = str(val)
try:
fval = float(val)
return (sval in cats) or any(_c.replace(".", "", 1).isdigit() and float(_c) == fval for _c in cats)
except Exception:
return sval in cats
# 集合包含(カテゴリ集合)
if decision == "in":
raw = node.get("threshold")
# threshold が "a||b||c" 文字列 or 配列の両対応
if isinstance(raw, str):
cats = [c for c in raw.split("||") if c != ""]
elif isinstance(raw, list):
cats = [str(x) for x in raw]
else:
cats = []
sval = str(val)
try:
fval = float(val)
in_set = (sval in cats) or any(str(x) == sval or (isinstance(x, (int, float)) and float(x) == fval) for x in cats)
except Exception:
in_set = (sval in cats)
return in_set # 属していれば左へ
# ここに来たら未対応の分割タイプ
raise ValueError(f"Unsupported decision_type: {decision!r}")
def _tree_predict_row(x: np.ndarray, node: dict, feat_index: dict) -> float:
cur = node
while True:
if "leaf_value" in cur:
return float(cur["leaf_value"])
sf = cur.get("split_feature", cur.get("split_feature_name"))
if isinstance(sf, str) and sf.isdigit():
sf = int(sf)
fi = feat_index[sf]
go_left = _go_left(x[fi], cur)
cur = cur["left_child"] if go_left else cur["right_child"]
# -------------------- Predictors --------------------
def _sigmoid(z: float) -> float:
# stable-ish sigmoid
if z >= 0:
ez = math.exp(-z)
return 1.0 / (1.0 + ez)
ez = math.exp(z)
return ez / (1.0 + ez)
def predict_dataframe_from_json(
df: pd.DataFrame,
model: Dict[str, Any],
raw_score: bool = False,
num_iteration: int | None = None,
) -> pd.Series:
feat_names: List[str] = list(model["feature_names"])
# 必須列チェック(1つでも欠けてたら止める)
missing = [c for c in feat_names if c not in df.columns]
if missing:
raise ValueError(f"DataFrame is missing required features: {missing}")
X = df.reindex(columns=feat_names)
# feature index map (name -> position) 兼 旧JSONの数値indexもサポート
feat_index: Dict[Union[int, str], int] = {name: i for i, name in enumerate(feat_names)}
for i, name in enumerate(feat_names):
feat_index[i] = i # some dumps refer by integer id
trees = model["tree_info"]
if num_iteration is not None:
trees = trees[:num_iteration]
# learning rateは各ツリーの "shrinkage" に入っている(ない場合は1.0相当)
raw_scores: List[float] = []
for _, row in X.iterrows():
s = 0.0
xv = row.values
for t in trees:
shrink = float(t.get("shrinkage", 1.0))
s += shrink * _tree_predict_row(xv, t["tree_structure"], feat_index)
raw_scores.append(s)
if raw_score or str(model.get("objective", "")).startswith("regression"):
return pd.Series(raw_scores, index=df.index)
# binary
probs = [_sigmoid(v) for v in raw_scores]
return pd.Series(probs, index=df.index)
def add_predictions_from_json(
df: pd.DataFrame,
model: Dict[str, Any],
threshold: float = 0.5,
raw_score: bool = False,
num_iteration: int | None = None,
) -> pd.DataFrame:
scores = predict_dataframe_from_json(df, model, raw_score=raw_score, num_iteration=num_iteration)
out = df.copy()
if not raw_score and not str(model.get("objective", "")).startswith("regression"):
out["prediction_score"] = scores
out["prediction"] = (scores >= threshold).astype(int)
else:
out["prediction_score"] = scores
out["prediction"] = np.nan
return out
使い方
model = load_lgb_json(Path("model_dump.json"))
# 必須列チェックを通ったら順序を自動で合わせます
df_with_pred = add_predictions_from_json(
df=new_df,
model=model,
threshold=0.42, # best threshold は別管理(meta)推奨
raw_score=False,
num_iteration=None,
)
print(df_with_pred.head())
【没】 model.txt を使う
model.save_model()で出したtxtファイルからスコア算出するのは厳しそう
辿るルートが全部同じになってしまう・・
以下は検討の記録
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Tuple
import numpy as np
import pandas as pd
import re
from math import exp
import ast
def _parse_feature_names(raw: str) -> list[str]:
s = raw.strip()
# 形式: ["f1","f2",...]
if s.startswith("[") and s.endswith("]"):
try:
lst = ast.literal_eval(s)
return [str(x).strip() for x in lst if str(x).strip()]
except Exception:
pass
# 形式: f1,f2,f3
if "," in s:
return [c.strip() for c in s.split(",") if c.strip()]
# 形式: f1\tf2\tf3
if "\t" in s:
return [c.strip() for c in s.split("\t") if c.strip()]
# 形式: f1 f2 f3
parts = s.split()
return [p for p in parts if p]
def _parse_kv(line: str) -> Dict[str, str]:
kv: Dict[str, str] = {}
for tok in line.strip().split():
if "=" in tok:
k, v = tok.split("=", 1)
kv[k] = v
return kv
def parse_model_txt(path: Path) -> Dict[str, Any]:
lines = Path(path).read_text().splitlines()
feature_names: List[str] = []
feat_index: Dict[str, int] = {}
learning_rate = 0.1
objective = "regression"
num_class = 1
trees: List[Dict[str, Any]] = []
cur_splits: Dict[int, Dict[str, Any]] = {}
cur_leaves: Dict[int, float] = {}
next_split_idx = 0
next_leaf_idx = 0
LEAF_ID_RE = re.compile(r"(?:\bleaf_index\b|\bleaf\b)\s*=\s*([LR]?)(\d+)")
def close_tree() -> None:
nonlocal cur_splits, cur_leaves, next_split_idx, next_leaf_idx
if cur_splits or cur_leaves:
root_id = min(cur_splits.keys()) if cur_splits else None
trees.append({
"splits": dict(cur_splits),
"leaves": dict(cur_leaves),
"root": root_id,
})
cur_splits, cur_leaves = {}, {}
next_split_idx = 0
next_leaf_idx = 0
for line in lines:
if line.startswith("feature_names="):
raw = line.split("=", 1)[1]
feature_names = _parse_feature_names(raw)
if not feature_names:
raise ValueError("feature_names list is empty or unparsable")
feat_index = {name: i for i, name in enumerate(feature_names)}
continue
if line.startswith("learning_rate="):
learning_rate = float(line.split("=", 1)[1]); continue
if line.startswith("objective="):
objective = line.split("=", 1)[1]; continue
if line.startswith("num_class="):
num_class = int(line.split("=", 1)[1]); continue
if line.startswith("Tree="):
close_tree(); continue
if "leaf_value=" in line:
kv = _parse_kv(line)
# 1) 明示ID, 2) 正規表現, 3) 無ければ出現順で採番
lid: int
if "leaf_index" in kv or "leaf" in kv:
s = kv.get("leaf_index", kv.get("leaf", "0"))
lid = int(s.lstrip("L").lstrip("R"))
else:
m = LEAF_ID_RE.search(line)
if m:
lid = int(m.group(2))
else:
lid = next_leaf_idx
cur_leaves[lid] = float(kv["leaf_value"])
next_leaf_idx = max(next_leaf_idx + 1, lid + 1)
continue
if "split_feature=" in line and "threshold=" in line:
kv = _parse_kv(line)
# node id: split_index が無ければ出現順
nid = int(kv["split_index"]) if "split_index" in kv else next_split_idx
next_split_idx = max(next_split_idx + 1, nid + 1)
# feature: int か name
sf = kv["split_feature"]
if sf.lstrip("-").isdigit():
feat_id = int(sf)
else:
if not feat_index:
raise ValueError("feature_names= not found but split_feature is name")
if sf not in feat_index:
raise ValueError(f"unknown split_feature name: {sf}")
feat_id = feat_index[sf]
# child 解析
def parse_child(v: str) -> Tuple[str, int]:
if v.startswith("leaf="):
return "leaf", int(v.split("=", 1)[1])
v2 = v.lstrip("L").lstrip("R")
return ("split", int(v2))
lk, lv = parse_child(kv["left_child"])
rk, rv = parse_child(kv["right_child"])
cur_splits[nid] = {
"feature": feat_id,
"threshold": float(kv["threshold"]),
"default_left": kv.get("default_left", "1") == "1",
"left_kind": lk, "left": lv,
"right_kind": rk, "right": rv,
}
continue
close_tree()
if not feature_names:
raise ValueError("feature_names= not found in model.txt")
return {
"learning_rate": learning_rate,
"objective": objective,
"num_class": num_class,
"feature_names": feature_names,
"trees": trees,
}
def _sigmoid(x: float) -> float:
return 1.0 / (1.0 + exp(-x))
def _predict_one_row(x: np.ndarray, tree: Dict[str, Any]) -> float:
splits, leaves = tree["splits"], tree["leaves"]
# Case 1: single-leaf tree (no splits)
if not splits:
if 0 in leaves:
return leaves[0]
return leaves[min(leaves.keys())]
# Determine root
nid = tree.get("root")
if nid is None or nid not in splits:
nid = min(splits.keys())
# Traverse
while True:
node = splits.get(nid)
if node is None:
raise KeyError(f"split node id {nid} not found in tree splits")
val = x[node["feature"]]
go_left = node["default_left"] if pd.isna(val) else (val <= node["threshold"])
kind, cid = (node["left_kind"], node["left"]) if go_left else (node["right_kind"], node["right"])
if kind == "leaf":
if cid not in leaves:
# Fallback: some dumps use sequential leaf ids; try nearest existing id
return leaves.get(cid, leaves[min(leaves.keys())])
return leaves[cid]
nid = cid
def predict_dataframe(
df: pd.DataFrame,
model: Dict[str, Any],
raw_score: bool = False,
num_iteration: int | None = None,
) -> pd.Series:
trees = model["trees"] if num_iteration is None else model["trees"][:num_iteration]
feat_names = model["feature_names"]
# 欠損列があれば止める
missing = [c for c in feat_names if c not in df.columns]
if missing:
raise ValueError(f"DataFrame is missing required features: {missing}")
# 余計な列は無視、順序だけ合わせる
X = df.reindex(columns=feat_names)
scores: list[float] = []
for _, row in X.iterrows():
s = 0.0
for t in trees:
s += model["learning_rate"] * _predict_one_row(row.values, t)
scores.append(s)
if raw_score:
return pd.Series(scores, index=df.index)
if model["objective"].startswith("binary"):
return pd.Series([_sigmoid(v) for v in scores], index=df.index)
return pd.Series(scores, index=df.index)
def add_predictions(
df: pd.DataFrame,
model: Dict[str, Any],
threshold: float = 0.5,
raw_score: bool = False,
num_iteration: int | None = None,
) -> pd.DataFrame:
scores = predict_dataframe(df, model, raw_score=raw_score, num_iteration=num_iteration)
out = df.copy()
if model["objective"].startswith("binary") and not raw_score:
out["prediction_score"] = scores
out["prediction"] = (scores >= threshold).astype(int)
else:
out["prediction_score"] = scores
out["prediction"] = np.nan
return out
使い方
model = parse_model_txt(Path("model.txt"))
new_df = pd.read_csv("new_data.csv")
df_with_pred = add_predictions(
df=new_df,
model=model,
threshold=0.42,
raw_score=False,
num_iteration=None,
)
print(df_with_pred.head())