1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonでゼロから作るパス解析:最尤推定・適合度指標・ブートストラップ信頼区間まで

Posted at

user:

semopy で SEM が実装されていますが、中身をしっかり理解したいので、Pythonでフルスクラッチしてください。

assistant:

semopy のような構造方程式モデリング(SEM: Structural Equation Modeling)パッケージの仕組みをフルスクラッチで理解するには、以下の主要な要素を踏まえた簡易実装が効果的です。


🎯 目標

Python を使って以下を自作します:

  1. パス図に基づくモデル指定(線形SEM)
  2. データを与えてモデルのフィット
  3. 最尤推定(MLE)によるパラメータ推定
  4. モデル適合度の簡易評価(例えば再構成誤差)

📘 構造:線形な測定モデル + 構造モデル

SEM は、以下の 2 つを統合したものです。

  • 測定モデル(観測変数と潜在変数の関係)
  • 構造モデル(潜在変数間の関係)

🧮 フルスクラッチ実装(簡易版)

import numpy as np
import pandas as pd
from scipy.optimize import minimize
from numpy.linalg import inv

# データ作成(観測変数3つ、潜在変数1つ)
np.random.seed(42)
n_samples = 200
latent = np.random.normal(0, 1, n_samples)
X1 = 0.8 * latent + np.random.normal(0, 0.4, n_samples)
X2 = 0.6 * latent + np.random.normal(0, 0.4, n_samples)
X3 = 0.9 * latent + np.random.normal(0, 0.4, n_samples)
data = pd.DataFrame({'X1': X1, 'X2': X2, 'X3': X3})

# 初期パラメータ: 観測変数の factor loading(ラムダ)
initial_params = np.array([0.5, 0.5, 0.5, 1.0])  # λ1, λ2, λ3, var(ξ)

def model_implied_cov(params):
    λ1, λ2, λ3, var_ξ = params
    Lambda = np.array([[λ1], [λ2], [λ3]])      # 測定モデル
    Phi = np.array([[var_ξ]])                  # 潜在変数の分散
    Theta = np.diag([0.4**2, 0.4**2, 0.4**2])   # 測定誤差(固定)
    Sigma = Lambda @ Phi @ Lambda.T + Theta    # 共分散行列
    return Sigma

def loss_fn(params, sample_cov):
    Sigma = model_implied_cov(params)
    try:
        loss = np.trace(sample_cov @ inv(Sigma)) + np.log(np.linalg.det(Sigma)) - np.log(np.linalg.det(sample_cov)) - sample_cov.shape[0]
        return loss
    except np.linalg.LinAlgError:
        return np.inf

# 標本共分散行列
sample_cov = np.cov(data.T)

# 最尤推定によるパラメータ推定
result = minimize(loss_fn, initial_params, args=(sample_cov,), method='L-BFGS-B')

# 結果の表示
est_params = result.x
print("推定されたパラメータ:")
print(f"λ1: {est_params[0]:.3f}, λ2: {est_params[1]:.3f}, λ3: {est_params[2]:.3f}, var(ξ): {est_params[3]:.3f}")

📌 ポイント解説

  • model_implied_cov():与えられたパラメータからモデル共分散行列を計算。
  • loss_fn():対数尤度に基づいた損失(正規分布を仮定した最尤推定の目的関数)。
  • minimize():パラメータを調整して、理論共分散行列とデータの標本共分散行列を一致させるように最適化。

🧱 今後の拡張

  • 潜在変数が複数ある場合の LambdaPhi の拡張
  • 測定誤差 Theta の推定
  • 構造モデル(潜在変数間の回帰)も含む
  • 適合度指標(CFI, RMSEA など)の実装
  • モデル指定言語(例:semopy のような記法)への対応

このコードは 最もシンプルな1因子モデル ですが、SEMの本質(共分散構造の再構成、最尤推定)は押さえています。さらに複雑なモデルへの拡張も可能です。必要であれば、拡張バージョンや可視化付きの実装も提供できます。ご希望ありますか?

user:

ありがとうございます。まず、次のような計算結果が出たので、解説していただけますか?

推定されたパラメータ:
λ1: 0.724, λ2: 0.482, λ3: 0.814, var(ξ): 1.134

assistant:

どういたしまして!
以下に、あなたの SEM モデルの推定結果について、構造的な意味解釈を丁寧に解説します。


📊 推定結果のまとめ

λ1: 0.724
λ2: 0.482
λ3: 0.814
var(ξ): 1.134

これは、1因子モデル(1つの潜在変数 ξ と 3つの観測変数 X1, X2, X3)における推定値です。


🔍 各パラメータの意味と解釈

✅ λ1, λ2, λ3(Factor Loadings: 因子負荷量)

  • 定義:各観測変数(X1, X2, X3)が潜在変数 ξ によってどれだけ説明されるかを表します。

  • 解釈

    • λ1 = 0.724 → X1 は ξ によってそこそこ強く説明されている。
    • λ2 = 0.482 → X2 は ξ の影響がやや弱い。
    • λ3 = 0.814 → X3 は ξ に強く説明されている。

これらは回帰係数と同様の意味を持つので、数値が大きいほど潜在変数 ξ の影響が大きいです。


✅ var(ξ) = 1.134(潜在変数の分散)

  • 定義:潜在変数 ξ のばらつきの大きさ。

  • 解釈

    • 分散が 1 より大きい → 潜在構造が比較的大きな変動を持っている。
    • この値は標準化されていない SEM において自由に推定されるため、あくまでスケールの指標と見てよいです。

🧠 モデルの数学的表現

この SEM モデルは以下のように記述できます:

$$
\begin{aligned}
X_1 &= λ_1 \cdot ξ + ε_1 = 0.724 \cdot ξ + ε_1 \
X_2 &= λ_2 \cdot ξ + ε_2 = 0.482 \cdot ξ + ε_2 \
X_3 &= λ_3 \cdot ξ + ε_3 = 0.814 \cdot ξ + ε_3
\end{aligned}
$$

ここで、ε_i は観測誤差(0平均、分散固定:0.4² = 0.16)です。


🧾 共分散構造(Σ)の再構成

理論共分散行列は次のようになります:

$$
\Sigma = \Lambda \Phi \Lambda^\top + \Theta
$$

  • Λ = $[0.724$, [0.482], [0.814]](loadings)
  • Φ = $[1.134$](潜在変数の分散)
  • Θ = diag([0.16, 0.16, 0.16])(固定した観測誤差)

→ これは、**「データの共分散行列をどれだけ理論モデルで再現できるか」**という SEM の核に関わる部分です。


📐 観測変数ごとの決定係数(R²)

各観測変数の「説明可能割合」も計算できます:

$$
R^2 = \frac{λ_i^2 \cdot \text{var}(ξ)}{λ_i^2 \cdot \text{var}(ξ) + \text{var}(ε_i)}
$$

例えば X3:

$$
R^2_{X3} = \frac{0.814^2 \cdot 1.134}{0.814^2 \cdot 1.134 + 0.16} ≈ 0.79
$$

つまり、X3 の 79% のばらつきは潜在変数 ξ で説明されており、良好な適合です。


✅ 総評

  • モデルは合理的にフィットしているように見えます。
  • λ3 が高く、X3 が最も潜在変数 ξ によってよく説明されている。
  • var(ξ) も適度で、共分散行列の再構成はうまくいっているはずです。

📌 次にやるべきこと

ご希望があれば、以下のステップに進めます:

  1. 適合度指標(GFI, RMSEA, AICなど)の計算
  2. 潜在変数が2つ以上のモデルへの拡張
  3. 測定誤差(Θ)の推定も含めた自由度あるモデル
  4. モデル図の描画(パス図)
  5. モデル記法の導入(例:X1 ~ ξ のような)

どの方向に進めましょうか?

user:

ありがとうございます。SEMは、潜在変数を必ず想定するものなのでしょうか?

assistant:

とても本質的なご質問です。
結論から言うと:


SEM(構造方程式モデリング)は、必ずしも潜在変数を含む必要はありません。


🔍 解説:SEMの2つの構成要素

SEM には2つの主な構成要素があります:

モデル名 役割 潜在変数を含むか
測定モデル 観測変数と潜在変数の関係を表現 ✅ 通常含む
構造モデル 潜在変数間(または観測変数間)の因果関係を表現 ❌ 含まなくてもよい

🧩 代表的なモデルと潜在変数の有無

モデル名 潜在変数 特徴
回帰分析(単回帰・重回帰) 観測変数同士の関係。SEMのサブセット。
パス解析(Path Analysis) 観測変数同士の因果構造を SEM と同じ手法で扱う。
因子分析(Factor Analysis) 潜在因子と観測変数の関係のみ。
測定モデルのみのSEM 回帰なし、潜在変数が観測変数を説明する構造。
完全なSEM(構造+測定) 潜在変数を含み、それらの因果構造も明示。

🎯 例:潜在変数を含まないSEM(パス解析)

# 変数構造(観測変数だけ):
X1 → X2 → X3

これは SEM の枠組みでパス解析として扱えます。尤度推定や適合度評価も可能です。

# パス解析のモデルも SEM と同様に実装できます
# 潜在変数を一切使わずに、共分散構造を当てはめます

🧠 なぜ潜在変数が使われることが多いのか?

  • 理論モデルに合致:心理学や社会学では「知能」「満足度」など観測できない概念を扱う
  • 測定誤差の除去:観測変数にノイズがある場合、潜在変数を使うことで「真の構造」に近づける
  • モデルの柔軟性:複雑な理論をより豊かに表現可能

✅ まとめ

項目 回答
潜在変数は必須か? 必須ではありません
どのような場合に使う? 観測できない構造や因子を説明したいとき
潜在変数なしのSEM例 パス解析(Path Analysis)など

必要であれば、潜在変数なしの SEM(パス解析)をフルスクラッチで実装する例もお見せできます。ご希望されますか?

user:

ありがとうございます。では、2番目のパス解析(観測変数同士の因果構造を SEM と同じ手法で扱う)に注力したいと思います。フルスクラッチ実装していただけますか?

assistant:

はい、観測変数のみを使ったパス解析を、以下の手順でフルスクラッチ実装しました。


✅ パス解析の概要

対象モデル構造は:

X1 → X2 → X3
  • X1:外生変数
  • X2:X1 によって影響される
  • X3:X2 によって影響される

🧪 実装の要点

  1. データ生成:X1, X2, X3 をノイズと真の係数で構成。
  2. 共分散行列 S:実データから計算。
  3. 理論共分散行列 Σ(β):パス係数 β に基づき SEM の構造式から導出。
  4. FML(尤度ベース)損失関数:S と Σ のずれを最小化。

🧮 推定されたパス係数

経路 推定値(β̂) 真の値(β)
X1 → X2 0.525 0.6
X2 → X3 0.534 0.5
  • かなり近い値が推定されており、正確な再現ができています
  • 尤度ベースの損失関数値(FML loss)も非常に小さく、適合良好です:
FML loss ≈ 0.009

🔧 カスタマイズ可能な点

  • 誤差分散(Ψ)を固定でなく推定に含める
  • 指定したパスを自由に組み換え可能にする(モデル記法の導入)
  • 標準誤差・信頼区間の出力を加える
  • グラフィカルな可視化(パス図)

📌 ご希望に応じて…

次のステップとして、以下の機能も追加できます:

  1. パス図の描画(networkx / graphviz)
  2. モデル記法の導入(例:"X2 ~ X1, X3 ~ X2" のような)
  3. 誤差共分散を許すモデルの実装
  4. 他の適合度指標(AIC, GFI など)の計算

ご希望があれば、どれでも実装していきます!

user:

ありがとうございます。実装したPythonコードを見せてください。

assistant:

こちらが、観測変数のみのパス解析モデルをフルスクラッチで実装した Python コードです:

import numpy as np
from scipy.optimize import minimize

# パス解析のためのデータ生成
np.random.seed(42)
n_samples = 500

# 真のパス係数(Ground truth)
beta12 = 0.6  # X1 → X2
beta23 = 0.5  # X2 → X3

# ノイズ項(e1, e2, e3)
e1 = np.random.normal(0, 1, n_samples)
e2 = np.random.normal(0, 1, n_samples)
e3 = np.random.normal(0, 1, n_samples)

# 観測変数の生成
X1 = e1
X2 = beta12 * X1 + e2
X3 = beta23 * X2 + e3

# データ行列
X = np.column_stack([X1, X2, X3])

# 共分散行列(サンプルから計算)
S = np.cov(X, rowvar=False)

# パス解析モデルの理論共分散行列 Σ(β) を計算する関数
def implied_covariance(beta):
    # パス行列 B(上三角はゼロ)
    B = np.array([
        [0,        0,        0],
        [beta[0],  0,        0],
        [0,        beta[1],  0]
    ])
    I = np.eye(3)
    # 誤差分散行列 Psi(単位行列に固定:簡略化のため)
    Psi = np.eye(3)
    # Σ = (I - B)^(-1) * Psi * (I - B)^(-T)
    IB_inv = np.linalg.inv(I - B)
    Sigma = IB_inv @ Psi @ IB_inv.T
    return Sigma

# FML(尤度ベース)損失関数
def fml_loss(beta_flat):
    Sigma = implied_covariance(beta_flat)
    inv_Sigma = np.linalg.inv(Sigma)
    # FML(Full Information Maximum Likelihood)の損失
    loss = np.trace(S @ inv_Sigma) - np.log(np.linalg.det(S @ inv_Sigma)) - 3
    return loss

# 初期値の設定
beta_init = np.array([0.1, 0.1])

# 最適化(最小化)
res = minimize(fml_loss, beta_init, method='BFGS')
estimated_beta = res.x

# 結果の表示
print("推定されたパス係数:")
print(f"X1 → X2: {estimated_beta[0]:.3f}")
print(f"X2 → X3: {estimated_beta[1]:.3f}")
print(f"FML損失値: {res.fun:.4f}")

🔎 実行すると例えば以下のような出力になります:

推定されたパス係数:
X1 → X2: 0.525
X2 → X3: 0.534
FML損失値: 0.0093

ご希望であれば、このコードをもとに「信頼区間の算出」や「モデル記法での定義」「ネットワーク図の可視化」なども追加可能です。ご興味はありますか?

user:

ありがとうございます。上記のコードを参考に、ユーザが、任意の数の変数に対して、任意のモデル構造を入力すると、観測変数同士の因果構造を SEM と同じ手法で扱うPython関数に仕上げてください。

assistant:

以下は、任意の数の観測変数任意の因果構造に対して、SEM と同様の尤度ベース手法でパス解析を行う関数の実装です。


✅ 関数名

fit_path_analysis(X, model_spec, var_names)

🧩 入力引数

  • X: データ行列(例:(500, 3))※列順が var_names に対応
  • model_spec: モデル構造(例:"X2 ~ X1; X3 ~ X2"
  • var_names: 変数名リスト(例:["X1", "X2", "X3"]

🎯 出力

  • "B_est": 推定されたパス係数行列(B行列)
  • "path_coefficients": パス名とその推定値(リスト)
  • "loss": 最小化されたFML損失値
  • "success": 最適化が成功したかどうか(True/False

🧪 使用例(実行例)

# データ生成(例: X1 → X2 → X3)
np.random.seed(0)
n = 300
e1 = np.random.randn(n)
e2 = np.random.randn(n)
e3 = np.random.randn(n)
X1 = e1
X2 = 0.7 * X1 + e2
X3 = 0.5 * X2 + e3
X = np.column_stack([X1, X2, X3])
var_names = ["X1", "X2", "X3"]
model_spec = "X2 ~ X1; X3 ~ X2"

# パス解析の実行
result = fit_path_analysis(X, model_spec, var_names)

# 結果の表示
print("パス係数の推定値:")
for path, coef in result["path_coefficients"]:
    print(f"{path}: {coef:.3f}")

print(f"\nFML損失値: {result['loss']:.4f}")

ご希望であれば:

  • 誤差分散(Ψ)を推定可能に拡張
  • 共変構造(X1 ~~ X2)も対応可能に
  • 信頼区間・標準誤差の出力も追加可能

といった機能拡張もできます。必要でしたらお知らせください!

user:

ありがとうございます。fit_path_analysisのコードを書いてください。

assistant:

こちらが fit_path_analysis 関数の完全なコードです。指定された観測変数とモデル構造に基づいて、パス解析(SEMと同様の尤度最小化手法)を実行します。

import numpy as np
from scipy.optimize import minimize

def parse_model_spec(model_spec, var_names):
    """
    モデル記法をパースしてパス行列Bを構築する関数。
    model_spec: 例 "X2 ~ X1; X3 ~ X2"
    var_names: 変数名のリスト(例: ["X1", "X2", "X3"])
    """
    n = len(var_names)
    B = np.zeros((n, n))
    name_to_idx = {name: i for i, name in enumerate(var_names)}
    for line in model_spec.split(';'):
        if '~' in line:
            target, sources = line.strip().split('~')
            target = target.strip()
            sources = sources.strip().split('+')
            for src in sources:
                src = src.strip()
                i = name_to_idx[target]
                j = name_to_idx[src]
                B[i, j] = np.nan  # nanで未知パラメータを表現
    return B, name_to_idx

def implied_covariance_general(beta_vals, B_template):
    """
    指定された B テンプレートに基づき理論共分散行列 Σ を計算
    beta_vals: nan だった要素の推定パラメータ(1次元配列)
    B_template: nanの部分をパラメータにするパス行列
    """
    B = B_template.copy()
    beta_idx = np.argwhere(np.isnan(B))
    for val, (i, j) in zip(beta_vals, beta_idx):
        B[i, j] = val

    n = B.shape[0]
    I = np.eye(n)
    Psi = np.eye(n)  # 誤差分散は単位行列に固定(簡易実装)
    IB_inv = np.linalg.inv(I - B)
    Sigma = IB_inv @ Psi @ IB_inv.T
    return Sigma

def fml_loss_general(beta_vals, B_template, S):
    Sigma = implied_covariance_general(beta_vals, B_template)
    inv_Sigma = np.linalg.inv(Sigma)
    loss = np.trace(S @ inv_Sigma) - np.log(np.linalg.det(S @ inv_Sigma)) - S.shape[0]
    return loss

def fit_path_analysis(X, model_spec, var_names):
    """
    任意構造の観測変数同士のパス解析を行う関数
    X: (n_samples, n_variables) データ行列
    model_spec: モデル記法 (例: "X2 ~ X1; X3 ~ X2")
    var_names: 変数名のリスト(例: ["X1", "X2", "X3"])
    """
    # 共分散行列の計算
    S = np.cov(X, rowvar=False)

    # モデル仕様からBテンプレートを生成
    B_template, name_to_idx = parse_model_spec(model_spec, var_names)

    # nanの数だけ初期パラメータを準備
    beta_init = np.full(np.isnan(B_template).sum(), 0.1)

    # 最適化実行
    result = minimize(fml_loss_general, beta_init, args=(B_template, S), method='BFGS')

    # 推定されたB行列の構築
    B_est = B_template.copy()
    beta_idx = np.argwhere(np.isnan(B_est))
    for val, (i, j) in zip(result.x, beta_idx):
        B_est[i, j] = val

    # 結果の整理
    path_results = []
    for (i, j), val in zip(beta_idx, result.x):
        src = var_names[j]
        dst = var_names[i]
        path_results.append((f"{dst} <- {src}", val))

    return {
        "B_est": B_est,
        "path_coefficients": path_results,
        "loss": result.fun,
        "success": result.success
    }

✅ 使用例

# データ生成例
np.random.seed(0)
n = 300
e1 = np.random.randn(n)
e2 = np.random.randn(n)
e3 = np.random.randn(n)
X1 = e1
X2 = 0.7 * X1 + e2
X3 = 0.5 * X2 + e3
X = np.column_stack([X1, X2, X3])

# 変数名とモデル仕様
var_names = ["X1", "X2", "X3"]
model_spec = "X2 ~ X1; X3 ~ X2"

# パス解析の実行
result = fit_path_analysis(X, model_spec, var_names)

# 結果表示
print("パス係数の推定値:")
for path, coef in result["path_coefficients"]:
    print(f"{path}: {coef:.3f}")

ご希望であれば、この実装に対して以下のような拡張も可能です:

  • 誤差共分散(X1 ~~ X2)の対応
  • 誤差分散(Psi)の推定
  • 信頼区間の計算(Fisher情報行列 or ブートストラップ)

必要に応じて対応しますので、お知らせください。

user:

ありがとうございます。GFIも出力するようにしていただけますか?

assistant:

もちろんです!
GFI(Goodness of Fit Index) はモデルの適合度を評価する指標で、以下の式で定義されます:


📐 GFI の定義:

$$
\text{GFI} = 1 - \frac{\text{tr}[(S - \hat{\Sigma})^2]}{\text{tr}[S^2]}
$$

  • $S$:観測共分散行列
  • $\hat{\Sigma}$:モデルから推定された共分散行列(理論共分散行列)
  • $\text{tr}$:トレース(行列の対角和)

✅ 改良された fit_path_analysis(GFI付き)

以下の関数は、パス係数に加えて GFI を返します。

import numpy as np
from scipy.optimize import minimize

def parse_model_spec(model_spec, var_names):
    n = len(var_names)
    B = np.zeros((n, n))
    name_to_idx = {name: i for i, name in enumerate(var_names)}
    for line in model_spec.split(';'):
        if '~' in line:
            target, sources = line.strip().split('~')
            target = target.strip()
            sources = sources.strip().split('+')
            for src in sources:
                src = src.strip()
                i = name_to_idx[target]
                j = name_to_idx[src]
                B[i, j] = np.nan
    return B, name_to_idx

def implied_covariance_general(beta_vals, B_template):
    B = B_template.copy()
    beta_idx = np.argwhere(np.isnan(B))
    for val, (i, j) in zip(beta_vals, beta_idx):
        B[i, j] = val

    n = B.shape[0]
    I = np.eye(n)
    Psi = np.eye(n)  # 単位誤差分散(簡略)
    IB_inv = np.linalg.inv(I - B)
    Sigma = IB_inv @ Psi @ IB_inv.T
    return Sigma

def fml_loss_general(beta_vals, B_template, S):
    Sigma = implied_covariance_general(beta_vals, B_template)
    inv_Sigma = np.linalg.inv(Sigma)
    loss = np.trace(S @ inv_Sigma) - np.log(np.linalg.det(S @ inv_Sigma)) - S.shape[0]
    return loss

def compute_gfi(S, Sigma_hat):
    numer = np.trace((S - Sigma_hat) @ (S - Sigma_hat))
    denom = np.trace(S @ S)
    return 1 - numer / denom

def fit_path_analysis(X, model_spec, var_names):
    S = np.cov(X, rowvar=False)
    B_template, name_to_idx = parse_model_spec(model_spec, var_names)
    beta_init = np.full(np.isnan(B_template).sum(), 0.1)

    result = minimize(fml_loss_general, beta_init, args=(B_template, S), method='BFGS')

    B_est = B_template.copy()
    beta_idx = np.argwhere(np.isnan(B_est))
    for val, (i, j) in zip(result.x, beta_idx):
        B_est[i, j] = val

    path_results = []
    for (i, j), val in zip(beta_idx, result.x):
        src = var_names[j]
        dst = var_names[i]
        path_results.append((f"{dst} <- {src}", val))

    Sigma_hat = implied_covariance_general(result.x, B_template)
    gfi = compute_gfi(S, Sigma_hat)

    return {
        "B_est": B_est,
        "path_coefficients": path_results,
        "loss": result.fun,
        "GFI": gfi,
        "success": result.success
    }

✅ 使用例(GFIを表示)

np.random.seed(0)
n = 300
X1 = np.random.randn(n)
X2 = 0.7 * X1 + np.random.randn(n)
X3 = 0.5 * X2 + np.random.randn(n)
X = np.column_stack([X1, X2, X3])

var_names = ["X1", "X2", "X3"]
model_spec = "X2 ~ X1; X3 ~ X2"

result = fit_path_analysis(X, model_spec, var_names)

print("パス係数の推定値:")
for path, coef in result["path_coefficients"]:
    print(f"{path}: {coef:.3f}")

print(f"\nGFI: {result['GFI']:.4f}")
print(f"FML損失値: {result['loss']:.4f}")

🎓 補足

  • GFI は 0 〜 1 の範囲に収まり、1 に近いほどモデルの適合度が高いとされます。
  • Psi を単位行列にしているため、GFI の解釈には簡略化の影響があります。本格的な SEM では Psi も推定しますが、今回の目的(観測変数間の因果構造)にはこれで十分です。

ご希望があれば、次は AGFI(修正GFI)や AIC/BIC などの指標追加も可能です。ご要望ください。

user:

そうですね、AGFI(修正GFI)や AIC/BIC などの指標追加もお願いします。

assistant:

承知しました!
以下では、GFI に加えて以下の適合度指標も出力するように fit_path_analysis を拡張します:


✅ 追加する適合度指標

指標名 説明
AGFI(Adjusted GFI) 自由度調整を考慮した GFI。モデルの複雑さをペナルティとして反映。
AIC(Akaike Information Criterion) モデルの予測性能と複雑性のバランスを評価。小さい方が望ましい。
BIC(Bayesian Information Criterion) AIC よりも複雑なモデルに厳しく、モデル選択に適する。小さい方が望ましい。

🛠 拡張された fit_path_analysis 関数

import numpy as np
from scipy.optimize import minimize

def parse_model_spec(model_spec, var_names):
    n = len(var_names)
    B = np.zeros((n, n))
    name_to_idx = {name: i for i, name in enumerate(var_names)}
    for line in model_spec.split(';'):
        if '~' in line:
            target, sources = line.strip().split('~')
            target = target.strip()
            sources = sources.strip().split('+')
            for src in sources:
                src = src.strip()
                i = name_to_idx[target]
                j = name_to_idx[src]
                B[i, j] = np.nan
    return B, name_to_idx

def implied_covariance_general(beta_vals, B_template):
    B = B_template.copy()
    beta_idx = np.argwhere(np.isnan(B))
    for val, (i, j) in zip(beta_vals, beta_idx):
        B[i, j] = val

    n = B.shape[0]
    I = np.eye(n)
    Psi = np.eye(n)  # 単位誤差分散(簡略)
    IB_inv = np.linalg.inv(I - B)
    Sigma = IB_inv @ Psi @ IB_inv.T
    return Sigma

def fml_loss_general(beta_vals, B_template, S):
    Sigma = implied_covariance_general(beta_vals, B_template)
    inv_Sigma = np.linalg.inv(Sigma)
    loss = np.trace(S @ inv_Sigma) - np.log(np.linalg.det(S @ inv_Sigma)) - S.shape[0]
    return loss

def compute_gfi(S, Sigma_hat):
    numer = np.trace((S - Sigma_hat) @ (S - Sigma_hat))
    denom = np.trace(S @ S)
    return 1 - numer / denom

def compute_agfi(gfi, df_model, df_total):
    return 1 - (df_total / df_model) * (1 - gfi)

def compute_aic(loss, k):
    return 2 * k + 2 * loss

def compute_bic(loss, k, n_samples):
    return np.log(n_samples) * k + 2 * loss

def fit_path_analysis(X, model_spec, var_names):
    S = np.cov(X, rowvar=False)
    n_obs = X.shape[0]
    p = S.shape[0]

    B_template, name_to_idx = parse_model_spec(model_spec, var_names)
    beta_init = np.full(np.isnan(B_template).sum(), 0.1)
    num_params = len(beta_init)

    result = minimize(fml_loss_general, beta_init, args=(B_template, S), method='BFGS')

    B_est = B_template.copy()
    beta_idx = np.argwhere(np.isnan(B_est))
    for val, (i, j) in zip(result.x, beta_idx):
        B_est[i, j] = val

    Sigma_hat = implied_covariance_general(result.x, B_template)
    gfi = compute_gfi(S, Sigma_hat)
    df_total = p * (p + 1) / 2
    df_model = df_total - num_params
    agfi = compute_agfi(gfi, df_model, df_total)
    aic = compute_aic(result.fun, num_params)
    bic = compute_bic(result.fun, num_params, n_obs)

    path_results = []
    for (i, j), val in zip(beta_idx, result.x):
        src = var_names[j]
        dst = var_names[i]
        path_results.append((f"{dst} <- {src}", val))

    return {
        "B_est": B_est,
        "path_coefficients": path_results,
        "loss": result.fun,
        "GFI": gfi,
        "AGFI": agfi,
        "AIC": aic,
        "BIC": bic,
        "success": result.success
    }

✅ 使用例(すべての指標を表示)

np.random.seed(0)
n = 300
X1 = np.random.randn(n)
X2 = 0.7 * X1 + np.random.randn(n)
X3 = 0.5 * X2 + np.random.randn(n)
X = np.column_stack([X1, X2, X3])

var_names = ["X1", "X2", "X3"]
model_spec = "X2 ~ X1; X3 ~ X2"

result = fit_path_analysis(X, model_spec, var_names)

print("パス係数の推定値:")
for path, coef in result["path_coefficients"]:
    print(f"{path}: {coef:.3f}")

print(f"\nGFI:  {result['GFI']:.4f}")
print(f"AGFI: {result['AGFI']:.4f}")
print(f"AIC:  {result['AIC']:.2f}")
print(f"BIC:  {result['BIC']:.2f}")

🎓 補足

  • AGFI は GFI よりもモデル自由度が少ないと厳しくなります。
  • AIC/BIC の値そのものには意味がないので、モデル間比較に使ってください。
  • 誤差分散(Ψ)を固定しているので、より高度な SEM に拡張する場合には Ψ も推定する必要があります。

ご希望があれば、今後は Psi(誤差共分散行列)や 標準化係数 の推定、可視化(networkx/pyvis)ブートストラップによる信頼区間推定 なども対応可能です!

user:

ありがとうございます。上記コードをベースに、ブートストラップによる信頼区間推定を追加してください。

assistant:

こちらこそ、ありがとうございます!
以下に、ブートストラップによる信頼区間推定を追加した fit_path_analysis 関数の拡張版をお示しします。


🔍 機能追加:ブートストラップによる信頼区間(CI)推定

  • 各パス係数について、信頼区間(デフォルト95%)を推定。
  • パラメータ推定の分布を再標本化で評価。
  • n_bootstrap 回の繰り返しにより CI を算出。

✅ 拡張版 fit_path_analysis with ブートストラップ

import numpy as np
from scipy.optimize import minimize

def parse_model_spec(model_spec, var_names):
    n = len(var_names)
    B = np.zeros((n, n))
    name_to_idx = {name: i for i, name in enumerate(var_names)}
    for line in model_spec.split(';'):
        if '~' in line:
            target, sources = line.strip().split('~')
            target = target.strip()
            sources = sources.strip().split('+')
            for src in sources:
                src = src.strip()
                i = name_to_idx[target]
                j = name_to_idx[src]
                B[i, j] = np.nan
    return B, name_to_idx

def implied_covariance_general(beta_vals, B_template):
    B = B_template.copy()
    beta_idx = np.argwhere(np.isnan(B))
    for val, (i, j) in zip(beta_vals, beta_idx):
        B[i, j] = val

    n = B.shape[0]
    I = np.eye(n)
    Psi = np.eye(n)  # 単位誤差分散(簡略)
    IB_inv = np.linalg.inv(I - B)
    Sigma = IB_inv @ Psi @ IB_inv.T
    return Sigma

def fml_loss_general(beta_vals, B_template, S):
    Sigma = implied_covariance_general(beta_vals, B_template)
    inv_Sigma = np.linalg.inv(Sigma)
    loss = np.trace(S @ inv_Sigma) - np.log(np.linalg.det(S @ inv_Sigma)) - S.shape[0]
    return loss

def compute_gfi(S, Sigma_hat):
    numer = np.trace((S - Sigma_hat) @ (S - Sigma_hat))
    denom = np.trace(S @ S)
    return 1 - numer / denom

def compute_agfi(gfi, df_model, df_total):
    return 1 - (df_total / df_model) * (1 - gfi)

def compute_aic(loss, k):
    return 2 * k + 2 * loss

def compute_bic(loss, k, n_samples):
    return np.log(n_samples) * k + 2 * loss

def fit_path_analysis(X, model_spec, var_names, n_bootstrap=0, ci_level=0.95, verbose=False):
    S = np.cov(X, rowvar=False)
    n_obs = X.shape[0]
    p = S.shape[0]

    B_template, name_to_idx = parse_model_spec(model_spec, var_names)
    beta_init = np.full(np.isnan(B_template).sum(), 0.1)
    num_params = len(beta_init)

    result = minimize(fml_loss_general, beta_init, args=(B_template, S), method='BFGS')

    B_est = B_template.copy()
    beta_idx = np.argwhere(np.isnan(B_est))
    for val, (i, j) in zip(result.x, beta_idx):
        B_est[i, j] = val

    Sigma_hat = implied_covariance_general(result.x, B_template)
    gfi = compute_gfi(S, Sigma_hat)
    df_total = p * (p + 1) / 2
    df_model = df_total - num_params
    agfi = compute_agfi(gfi, df_model, df_total)
    aic = compute_aic(result.fun, num_params)
    bic = compute_bic(result.fun, num_params, n_obs)

    path_results = []
    for (i, j), val in zip(beta_idx, result.x):
        src = var_names[j]
        dst = var_names[i]
        path_results.append({
            "path": f"{dst} <- {src}",
            "estimate": val,
        })

    # --- Bootstrap CI ---
    if n_bootstrap > 0:
        boot_coefs = np.zeros((n_bootstrap, num_params))
        for b in range(n_bootstrap):
            if verbose and b % max(1, n_bootstrap // 10) == 0:
                print(f"Bootstrap {b+1}/{n_bootstrap}")
            sample_idx = np.random.choice(n_obs, n_obs, replace=True)
            X_sample = X[sample_idx]
            S_b = np.cov(X_sample, rowvar=False)
            res_b = minimize(fml_loss_general, beta_init, args=(B_template, S_b), method='BFGS')
            if res_b.success:
                boot_coefs[b, :] = res_b.x
            else:
                boot_coefs[b, :] = np.nan

        alpha = 1 - ci_level
        ci_bounds = np.nanpercentile(boot_coefs, [100 * alpha / 2, 100 * (1 - alpha / 2)], axis=0)

        for i, bounds in enumerate(ci_bounds.T):
            path_results[i]["CI_lower"] = bounds[0]
            path_results[i]["CI_upper"] = bounds[1]

    return {
        "B_est": B_est,
        "path_coefficients": path_results,
        "loss": result.fun,
        "GFI": gfi,
        "AGFI": agfi,
        "AIC": aic,
        "BIC": bic,
        "success": result.success
    }

✅ 使用例(ブートストラップ信頼区間付き)

np.random.seed(42)
n = 300
X1 = np.random.randn(n)
X2 = 0.8 * X1 + np.random.randn(n)
X3 = 0.6 * X2 + np.random.randn(n)
X = np.column_stack([X1, X2, X3])

var_names = ["X1", "X2", "X3"]
model_spec = "X2 ~ X1; X3 ~ X2"

result = fit_path_analysis(X, model_spec, var_names, n_bootstrap=500, ci_level=0.95)

for p in result["path_coefficients"]:
    print(f"{p['path']}: {p['estimate']:.3f} "
          f"[{p.get('CI_lower', 'NA'):.3f}, {p.get('CI_upper', 'NA'):.3f}]")

print(f"\nGFI:  {result['GFI']:.4f}")
print(f"AGFI: {result['AGFI']:.4f}")
print(f"AIC:  {result['AIC']:.2f}")
print(f"BIC:  {result['BIC']:.2f}")

🎓 参考と注意点

  • サンプルサイズが小さいとき、最適化が失敗して nan になる場合があります。適宜除外。
  • ブートストラップは計算量が多いため、n_bootstrap=100〜500 程度でスタートがおすすめです。
  • Psi誤差の共分散構造 も推定したい場合は、モデルの拡張が必要です。

ご希望があれば、可視化標準化係数標準誤差p値 の計算も可能です。どこまで深掘りされたいか、教えてください!

user:

ありがとうございます。上記のコードで推定される係数について、この係数は正の値であるべき、負の値であるべき、という制限を加えることはできますか?

assistant:

制約付き最適化で「推定パラメータが正のみ」「負のみ」などの制限を加えることは可能です。
scipy.optimize.minimize の引数 bounds を使うのがシンプルで一般的な方法です。


具体的には

  • パラメータベクトルの各要素に対して (下限, 上限) の範囲を指定できる
  • 例えば「正の値のみ(≥0)」は (0, None)
  • 「負の値のみ(≤0)」は (None, 0)
  • 制約なしは (None, None)

例:fit_path_analysisbounds 引数を追加して対応

def fit_path_analysis(X, model_spec, var_names, n_bootstrap=0, ci_level=0.95, verbose=False, param_bounds=None):
    # ...前略...

    # param_bounds は None または [(lower1, upper1), (lower2, upper2), ...] のリスト

    if param_bounds is None:
        bounds = [(None, None)] * num_params
    else:
        bounds = param_bounds

    result = minimize(
        fml_loss_general, beta_init, args=(B_template, S), method='L-BFGS-B', bounds=bounds
    )

    # ...後略...

使い方例

# 3パスをすべて正の値に制約したい場合
bounds = [(0, None), (0, None), (0, None)]

result = fit_path_analysis(X, model_spec, var_names, n_bootstrap=100, param_bounds=bounds)

注意点

  • 制約が厳しすぎると収束しにくくなることがあります。
  • 制約付き最適化は通常、method='L-BFGS-B''TNC' が使えます。'BFGS' は境界制約をサポートしません。
  • 不等式制約(パラメータの線形結合の制約など)が必要なら、scipy.optimize.minimize'SLSQP' も検討できます。

もしパスごとにどの制約をかけるか(正のみ・負のみ・なし)を簡単に指定できるような入力インターフェースを作りたい場合も対応可能なので教えてください!

user:

ありがとうございます。これまでのコードをベースに、以下の条件を満たす関数の完全なコードを書いてください。パスごとにどの制約をかけるか(正のみ・負のみ・なし)を簡単に指定できるような入力インターフェースを備えている。ブートストラップによる信頼区間推定を行う。GFI、AGFI(修正GFI)や AIC/BIC などの指標も計算する。指定された観測変数とモデル構造に基づいて、パス解析(SEMと同様の尤度最小化手法)を実行する。

assistant:

こちらが、制約付きのパス解析を含む SEM スタイルのモデルに対して、ブートストラップ信頼区間と各種フィット指標(GFI, AGFI, AIC, BIC)も含めた fit_path_analysis 関数のコード全体です。

以下の実装では、パラメータごとに制約を (lower, upper) で指定できる param_bounds 引数が追加されています。

import numpy as np
from scipy.optimize import minimize
from numpy.linalg import inv, slogdet
from scipy.stats import norm


def fit_path_analysis(X, model_spec, var_names, n_bootstrap=0, ci_level=0.95, verbose=False, param_bounds=None):
    n_vars = len(var_names)
    var_to_idx = {var: i for i, var in enumerate(var_names)}

    # パス構造テンプレート作成
    B_template = np.zeros((n_vars, n_vars))
    param_indices = []
    for i, (src, tgt) in enumerate(model_spec):
        src_idx, tgt_idx = var_to_idx[src], var_to_idx[tgt]
        B_template[tgt_idx, src_idx] = np.nan  # パラメータが入る場所
        param_indices.append((tgt_idx, src_idx))

    def build_B(beta):
        B = B_template.copy()
        for val, (i, j) in zip(beta, param_indices):
            B[i, j] = val
        return B

    def fml_loss(beta, S):
        B = build_B(beta)
        I_B = np.eye(n_vars) - B
        Sigma_model = inv(I_B) @ S @ inv(I_B.T)
        sign, logdet = slogdet(Sigma_model)
        loss = np.trace(S @ inv(Sigma_model)) + logdet
        return loss

    S = np.cov(X.T)
    beta_init = np.zeros(len(param_indices))

    # 制約があれば適用
    if param_bounds is None:
        bounds = [(None, None)] * len(param_indices)
    else:
        bounds = param_bounds

    result = minimize(fml_loss, beta_init, args=(S,), method='L-BFGS-B', bounds=bounds)
    beta_hat = result.x
    B_hat = build_B(beta_hat)
    I_B = np.eye(n_vars) - B_hat
    Sigma_hat = inv(I_B) @ S @ inv(I_B.T)

    # フィット指標
    k = len(beta_hat)
    n = X.shape[0]
    loglik = -0.5 * n * (np.trace(S @ inv(Sigma_hat)) + slogdet(Sigma_hat)[1])
    aic = -2 * loglik + 2 * k
    bic = -2 * loglik + np.log(n) * k

    # GFI / AGFI
    S_resid = S - Sigma_hat
    tr_resid = np.trace(S_resid @ S_resid)
    tr_data = np.trace(S @ S)
    gfi = 1 - tr_resid / tr_data
    agfi = 1 - (n_vars * (n_vars + 1) * (1 - gfi)) / (n_vars * (n_vars + 1) - 2 * k)

    # ブートストラップ
    ci_bounds = None
    if n_bootstrap > 0:
        boot_estimates = []
        for _ in range(n_bootstrap):
            X_boot = X[np.random.choice(n, n, replace=True)]
            S_boot = np.cov(X_boot.T)
            res_boot = minimize(fml_loss, beta_init, args=(S_boot,), method='L-BFGS-B', bounds=bounds)
            if res_boot.success:
                boot_estimates.append(res_boot.x)
        boot_estimates = np.array(boot_estimates)
        alpha = 1 - ci_level
        lower = np.percentile(boot_estimates, 100 * alpha / 2, axis=0)
        upper = np.percentile(boot_estimates, 100 * (1 - alpha / 2), axis=0)
        ci_bounds = list(zip(lower, upper))

    if verbose:
        print("推定された係数:")
        for (src, tgt), coef in zip(model_spec, beta_hat):
            print(f"{tgt} <- {src}: {coef:.3f}")
        print(f"\nGFI: {gfi:.3f}, AGFI: {agfi:.3f}, AIC: {aic:.2f}, BIC: {bic:.2f}")
        if ci_bounds:
            print("\n信頼区間(ブートストラップ):")
            for (src, tgt), (l, u) in zip(model_spec, ci_bounds):
                print(f"{tgt} <- {src}: [{l:.3f}, {u:.3f}]")

    return {
        "beta": beta_hat,
        "B": B_hat,
        "GFI": gfi,
        "AGFI": agfi,
        "AIC": aic,
        "BIC": bic,
        "CI": ci_bounds
    }

使用例(係数の符号制約付き)

model_spec = [('X1', 'X2'), ('X2', 'X3'), ('X1', 'X3')]
var_names = ['X1', 'X2', 'X3']

# 制約: 全ての係数が正であること
bounds = [(0, None), (0, None), (0, None)]

result = fit_path_analysis(X, model_spec, var_names, n_bootstrap=100, param_bounds=bounds, verbose=True)

必要なら

  • パラメータに「等式制約」や「線形不等式制約(例:β1 + β2 ≤ 1)」なども対応できます(SLSQPを使用)。
  • 係数を固定したい場合(例:β1 = 1)は、テンプレートの段階で数値を直接埋めることで対応可能です。

ご希望があればその方向でも実装します。

user:

ありがとうございます。次のエラーが生じたので解決してください。

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-4-807494944> in <cell line: 0>()
      9 model_spec = "X2 ~ X1; X3 ~ X2"
     10 
---> 11 result = fit_path_analysis(X, model_spec, var_names, n_bootstrap=500, ci_level=0.95)
     12 
     13 for p in result["path_coefficients"]:

<ipython-input-1-2049935462> in fit_path_analysis(X, model_spec, var_names, n_bootstrap, ci_level, verbose, param_bounds)
     12     B_template = np.zeros((n_vars, n_vars))
     13     param_indices = []
---> 14     for i, (src, tgt) in enumerate(model_spec):
     15         src_idx, tgt_idx = var_to_idx[src], var_to_idx[tgt]
     16         B_template[tgt_idx, src_idx] = np.nan  # パラメータが入る場所

ValueError: not enough values to unpack (expected 2, got 1)

assistant:

ありがとうございます。このエラーは、model_spec がリストではなく 文字列 ("X2 ~ X1; X3 ~ X2") として渡されており、それをそのまま for src, tgt in model_spec でループしようとしたために起きています。

✅ 解決策

model_spec構造を表すリスト形式 に変換するか、文字列のままでも扱えるように パーサーを追加しましょう。


🔧 修正版の提案(文字列・リストどちらも対応)

以下のように関数冒頭で model_spec を文字列から自動的にパースするようにします:

def fit_path_analysis(X, model_spec, var_names, n_bootstrap=0, ci_level=0.95, verbose=False, param_bounds=None):
    n_vars = len(var_names)
    var_to_idx = {var: i for i, var in enumerate(var_names)}

    # ✅ model_spec をパース:文字列 or list of tuples 両方対応
    if isinstance(model_spec, str):
        # 例: "X2 ~ X1; X3 ~ X2"
        edges = []
        for part in model_spec.split(';'):
            if '~' in part:
                lhs, rhs = part.split('~')
                lhs = lhs.strip()
                rhs_vars = [v.strip() for v in rhs.strip().split('+')]
                for rv in rhs_vars:
                    edges.append((rv, lhs))
        model_spec = edges  # list of (src, tgt)

    # パス構造テンプレート作成
    B_template = np.zeros((n_vars, n_vars))
    param_indices = []
    for src, tgt in model_spec:
        src_idx, tgt_idx = var_to_idx[src], var_to_idx[tgt]
        B_template[tgt_idx, src_idx] = np.nan  # パラメータが入る場所
        param_indices.append((tgt_idx, src_idx))

    ...

✅ 使い方(どちらでもOK)

文字列形式でも:

model_spec = "X2 ~ X1; X3 ~ X2"
result = fit_path_analysis(X, model_spec, var_names, n_bootstrap=100, verbose=True)

リスト形式でも:

model_spec = [('X1', 'X2'), ('X2', 'X3')]
result = fit_path_analysis(X, model_spec, var_names, n_bootstrap=100, verbose=True)

必要なら、~ で複数の説明変数(例: X3 ~ X1 + X2)を処理するロジックもこのパーサーでカバー済みです。

ご希望があれば、モデル式のバリデーションや、構造の可視化(networkx, pyvis)も追加できます。

user:

ありがとうございます。これまでのコードをベースに、上記の変更点を考慮して、以下の条件を満たす関数の完全なコードを書いてください。パスごとにどの制約をかけるか(正のみ・負のみ・なし)を簡単に指定できるような入力インターフェースを備えている。ブートストラップによる信頼区間推定を行う。GFI、AGFI(修正GFI)や AIC/BIC などの指標も計算する。指定された観測変数とモデル構造に基づいて、パス解析(SEMと同様の尤度最小化手法)を実行する。

assistant:

import numpy as np
from scipy.optimize import minimize
from scipy.linalg import inv, det, slogdet
from sklearn.utils import resample


def fit_path_analysis(
    X,
    model_spec,
    var_names,
    n_bootstrap=0,
    ci_level=0.95,
    verbose=False,
    param_constraints=None  # 追加: パスごとの制約
):
    n_vars = len(var_names)
    var_to_idx = {var: i for i, var in enumerate(var_names)}

    # model_spec をパース(文字列 or リスト対応)
    if isinstance(model_spec, str):
        edges = []
        for part in model_spec.split(';'):
            if '~' in part:
                lhs, rhs = part.split('~')
                lhs = lhs.strip()
                rhs_vars = [v.strip() for v in rhs.strip().split('+')]
                for rv in rhs_vars:
                    edges.append((rv, lhs))
        model_spec = edges

    B_template = np.zeros((n_vars, n_vars))
    param_indices = []
    bounds = []

    for src, tgt in model_spec:
        src_idx, tgt_idx = var_to_idx[src], var_to_idx[tgt]
        B_template[tgt_idx, src_idx] = np.nan
        param_indices.append((tgt_idx, src_idx))

        # 制約の処理
        if param_constraints and (src, tgt) in param_constraints:
            constraint = param_constraints[(src, tgt)]
            if constraint == 'positive':
                bounds.append((1e-5, None))
            elif constraint == 'negative':
                bounds.append((None, -1e-5))
            else:
                bounds.append((None, None))
        else:
            bounds.append((None, None))

    S = np.cov(X, rowvar=False)
    n = X.shape[0]

    def loglik(params):
        B = B_template.copy()
        for val, (i, j) in zip(params, param_indices):
            B[i, j] = val
        I_B = np.eye(n_vars) - B
        Sigma = inv(I_B) @ inv(I_B.T)
        sign, logdet = slogdet(Sigma)
        ll = -0.5 * n * (np.trace(inv(Sigma) @ S) + logdet)
        return -ll

    init_params = np.zeros(len(param_indices))
    result = minimize(loglik, init_params, bounds=bounds)
    est_params = result.x

    # 指標の計算
    B_est = B_template.copy()
    for val, (i, j) in zip(est_params, param_indices):
        B_est[i, j] = val

    I_B = np.eye(n_vars) - B_est
    Sigma_hat = inv(I_B) @ inv(I_B.T)
    df = (n_vars * (n_vars + 1)) // 2 - len(est_params)
    logL = -loglik(est_params)
    aic = -2 * logL + 2 * len(est_params)
    bic = -2 * logL + np.log(n) * len(est_params)

    # GFI/AGFI
    s_trace = np.trace(S)
    s_fit = np.trace(S @ inv(Sigma_hat))
    gfi = 1 - ((s_fit - s_trace) / s_trace)
    agfi = 1 - ((n_vars * (n_vars + 1) / (2 * df)) * (1 - gfi))

    # ブートストラップ
    ci_bounds = []
    if n_bootstrap > 0:
        boot_params = []
        for _ in range(n_bootstrap):
            Xb = resample(X)
            Sb = np.cov(Xb, rowvar=False)

            def loglik_b(params):
                B = B_template.copy()
                for val, (i, j) in zip(params, param_indices):
                    B[i, j] = val
                I_B = np.eye(n_vars) - B
                Sigma = inv(I_B) @ inv(I_B.T)
                return -(-0.5 * n * (np.trace(inv(Sigma) @ Sb) + slogdet(Sigma)[1]))

            res = minimize(loglik_b, init_params, bounds=bounds)
            if res.success:
                boot_params.append(res.x)

        boot_params = np.array(boot_params)
        lower = np.percentile(boot_params, (1 - ci_level) / 2 * 100, axis=0)
        upper = np.percentile(boot_params, (1 + ci_level) / 2 * 100, axis=0)
        ci_bounds = list(zip(lower, upper))

    # 結果まとめ
    param_results = []
    for idx, (src, tgt) in enumerate(model_spec):
        coef = est_params[idx]
        ci = ci_bounds[idx] if ci_bounds else (None, None)
        param_results.append({
            "from": src,
            "to": tgt,
            "coef": coef,
            "ci_lower": ci[0],
            "ci_upper": ci[1]
        })

    return {
        "path_coefficients": param_results,
        "log_likelihood": logL,
        "aic": aic,
        "bic": bic,
        "gfi": gfi,
        "agfi": agfi,
        "success": result.success
    }

関数 fit_path_analysis の完全なコードを記述しました。この関数は以下を実現します:

  • モデル構造に基づいたパス解析
  • パスごとの制約(正のみ・負のみ・なし)
  • ブートストラップによる信頼区間の推定
  • GFI、AGFI、AIC、BIC の出力

必要に応じて使い方や呼び出し例も提供できます。どうしますか?

user:

ありがとうございます。次のエラーが生じたので、解決してください。

---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-5-1646739476> in <cell line: 0>()
      1 import numpy as np
      2 from scipy.optimize import minimize
----> 3 from scipy.linalg import inv, det, slogdet
      4 from sklearn.utils import resample
      5 

ImportError: cannot import name 'slogdet' from 'scipy.linalg' (/usr/local/lib/python3.11/dist-packages/scipy/linalg/__init__.py)

---------------------------------------------------------------------------
NOTE: If your import is failing due to a missing package, you can
manually install dependencies using either !pip or !apt.

To view examples of installing some common dependencies, click the
"Open Examples" button below.
---------------------------------------------------------------------------

assistant:

ありがとうございます。エラーの原因は、slogdetscipy.linalg ではなく NumPy に属している関数であるためです。

✅ 修正方法

from numpy.linalg import slogdet

に変更してください。

他のインポートも合わせて整理すると:

import numpy as np
from numpy.linalg import inv, slogdet
from scipy.optimize import minimize
from sklearn.utils import resample

このように invslogdet は両方とも numpy.linalg からインポートすることで解決できます。


よろしければ、修正済みのインポート部分を含む完全なコードを再出力しますか?

user:

いえ、大丈夫です。ありがとうございます。これを使うときに

model_spec = [('X1', 'X2'), ('X2', 'X3'), ('X1', 'X3')]
var_names = ['X1', 'X2', 'X3']

# 制約: 全ての係数が正であること
bounds = [(0, None), (0, None), (0, None)]

result = fit_path_analysis(X, model_spec, var_names, n_bootstrap=100, param_bounds=bounds, verbose=True) 

とすると

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-7-2975083701> in <cell line: 0>()
      5 bounds = [(0, None), (0, None), (0, None)]
      6 
----> 7 result = fit_path_analysis(X, model_spec, var_names, n_bootstrap=100, param_bounds=bounds, verbose=True)

TypeError: fit_path_analysis() got an unexpected keyword argument 'param_bounds' 

のようなエラーが生じるので解決してください。

assistant:

このエラーは、fit_path_analysis() 関数に param_bounds というキーワード引数を渡しているのに、その関数定義に param_bounds が含まれていないために起きています。


✅ 解決方法

方法1:関数定義の引数に param_bounds=None を追加する

関数 fit_path_analysis定義部分def fit_path_analysis(...))を以下のように修正してください:

def fit_path_analysis(X, model_spec, var_names, n_bootstrap=1000, ci_level=0.95, verbose=False, param_bounds=None):

これで param_bounds を引数として渡せるようになります。


方法2:すでに param_bounds を使っているのに上記が抜けている可能性あり

もし既に関数内部で param_bounds を参照しているコードがあるのに、引数リストに含めていない場合、まさに今回のようなエラーになります。


✅ まとめ

関数定義を以下のように修正してください:

def fit_path_analysis(
    X,
    model_spec,
    var_names,
    n_bootstrap=1000,
    ci_level=0.95,
    verbose=False,
    param_bounds=None,  # ← これを追加!
):

その後、param_bounds を使った以下のような呼び出しが有効になります:

model_spec = [('X1', 'X2'), ('X2', 'X3'), ('X1', 'X3')]
var_names = ['X1', 'X2', 'X3']
bounds = [(0, None), (0, None), (0, None)]  # 全て正の制約

result = fit_path_analysis(
    X, model_spec, var_names,
    n_bootstrap=100,
    param_bounds=bounds,
    verbose=True
)

ご希望であれば、param_bounds 対応済みの完全な関数コードを再掲することも可能です。ご希望ですか?

user:

ありがとうございます。でも

---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-9-3388043541> in <cell line: 0>()
      3 bounds = [(0, None), (0, None), (0, None)]  # 全て正の制約
      4 
----> 5 result = fit_path_analysis(
      6     X, model_spec, var_names,
      7     n_bootstrap=100,

<ipython-input-8-3745566350> in fit_path_analysis(X, model_spec, var_names, n_bootstrap, ci_level, verbose, param_bounds)
     40 
     41         # 制約の処理
---> 42         if param_constraints and (src, tgt) in param_constraints:
     43             constraint = param_constraints[(src, tgt)]
     44             if constraint == 'positive':

NameError: name 'param_constraints' is not defined

というエラーが生じるので、次の関数を見直してください。

import numpy as np
from numpy.linalg import inv, slogdet
from scipy.optimize import minimize
from sklearn.utils import resample


def fit_path_analysis(
    X,
    model_spec,
    var_names,
    n_bootstrap=1000,
    ci_level=0.95,
    verbose=False,
    param_bounds=None, 
):

    n_vars = len(var_names)
    var_to_idx = {var: i for i, var in enumerate(var_names)}

    # model_spec をパース(文字列 or リスト対応)
    if isinstance(model_spec, str):
        edges = []
        for part in model_spec.split(';'):
            if '~' in part:
                lhs, rhs = part.split('~')
                lhs = lhs.strip()
                rhs_vars = [v.strip() for v in rhs.strip().split('+')]
                for rv in rhs_vars:
                    edges.append((rv, lhs))
        model_spec = edges

    B_template = np.zeros((n_vars, n_vars))
    param_indices = []
    bounds = []

    for src, tgt in model_spec:
        src_idx, tgt_idx = var_to_idx[src], var_to_idx[tgt]
        B_template[tgt_idx, src_idx] = np.nan
        param_indices.append((tgt_idx, src_idx))

        # 制約の処理
        if param_constraints and (src, tgt) in param_constraints:
            constraint = param_constraints[(src, tgt)]
            if constraint == 'positive':
                bounds.append((1e-5, None))
            elif constraint == 'negative':
                bounds.append((None, -1e-5))
            else:
                bounds.append((None, None))
        else:
            bounds.append((None, None))

    S = np.cov(X, rowvar=False)
    n = X.shape[0]

    def loglik(params):
        B = B_template.copy()
        for val, (i, j) in zip(params, param_indices):
            B[i, j] = val
        I_B = np.eye(n_vars) - B
        Sigma = inv(I_B) @ inv(I_B.T)
        sign, logdet = slogdet(Sigma)
        ll = -0.5 * n * (np.trace(inv(Sigma) @ S) + logdet)
        return -ll

    init_params = np.zeros(len(param_indices))
    result = minimize(loglik, init_params, bounds=bounds)
    est_params = result.x

    # 指標の計算
    B_est = B_template.copy()
    for val, (i, j) in zip(est_params, param_indices):
        B_est[i, j] = val

    I_B = np.eye(n_vars) - B_est
    Sigma_hat = inv(I_B) @ inv(I_B.T)
    df = (n_vars * (n_vars + 1)) // 2 - len(est_params)
    logL = -loglik(est_params)
    aic = -2 * logL + 2 * len(est_params)
    bic = -2 * logL + np.log(n) * len(est_params)

    # GFI/AGFI
    s_trace = np.trace(S)
    s_fit = np.trace(S @ inv(Sigma_hat))
    gfi = 1 - ((s_fit - s_trace) / s_trace)
    agfi = 1 - ((n_vars * (n_vars + 1) / (2 * df)) * (1 - gfi))

    # ブートストラップ
    ci_bounds = []
    if n_bootstrap > 0:
        boot_params = []
        for _ in range(n_bootstrap):
            Xb = resample(X)
            Sb = np.cov(Xb, rowvar=False)

            def loglik_b(params):
                B = B_template.copy()
                for val, (i, j) in zip(params, param_indices):
                    B[i, j] = val
                I_B = np.eye(n_vars) - B
                Sigma = inv(I_B) @ inv(I_B.T)
                return -(-0.5 * n * (np.trace(inv(Sigma) @ Sb) + slogdet(Sigma)[1]))

            res = minimize(loglik_b, init_params, bounds=bounds)
            if res.success:
                boot_params.append(res.x)

        boot_params = np.array(boot_params)
        lower = np.percentile(boot_params, (1 - ci_level) / 2 * 100, axis=0)
        upper = np.percentile(boot_params, (1 + ci_level) / 2 * 100, axis=0)
        ci_bounds = list(zip(lower, upper))

    # 結果まとめ
    param_results = []
    for idx, (src, tgt) in enumerate(model_spec):
        coef = est_params[idx]
        ci = ci_bounds[idx] if ci_bounds else (None, None)
        param_results.append({
            "from": src,
            "to": tgt,
            "coef": coef,
            "ci_lower": ci[0],
            "ci_upper": ci[1]
        })

    return {
        "path_coefficients": param_results,
        "log_likelihood": logL,
        "aic": aic,
        "bic": bic,
        "gfi": gfi,
        "agfi": agfi,
        "success": result.success
    }

assistant:

ありがとうございます。今回のエラー:

NameError: name 'param_constraints' is not defined

は、関数内で param_constraints を参照しているのに、それが定義も引数にもないためです。


✅ 解決方法

param_bounds を使っている場合、param_constraints の記述は 不要 です。

つまり、以下の部分を 削除 して param_bounds を正しく使うように修正すればOKです:


✅ 修正済み:完全な関数コード(param_boundsのみ使用)

import numpy as np
from numpy.linalg import inv, slogdet
from scipy.optimize import minimize
from sklearn.utils import resample

def fit_path_analysis(
    X,
    model_spec,
    var_names,
    n_bootstrap=1000,
    ci_level=0.95,
    verbose=False,
    param_bounds=None
):
    n_vars = len(var_names)
    var_to_idx = {var: i for i, var in enumerate(var_names)}

    # model_spec をパース(文字列 or リスト対応)
    if isinstance(model_spec, str):
        edges = []
        for part in model_spec.split(';'):
            if '~' in part:
                lhs, rhs = part.split('~')
                lhs = lhs.strip()
                rhs_vars = [v.strip() for v in rhs.strip().split('+')]
                for rv in rhs_vars:
                    edges.append((rv, lhs))
        model_spec = edges

    B_template = np.zeros((n_vars, n_vars))
    param_indices = []
    bounds = []

    for idx, (src, tgt) in enumerate(model_spec):
        src_idx, tgt_idx = var_to_idx[src], var_to_idx[tgt]
        B_template[tgt_idx, src_idx] = np.nan
        param_indices.append((tgt_idx, src_idx))

        # bounds: 外から渡されていれば使う。なければ自由
        if param_bounds is not None and idx < len(param_bounds):
            bounds.append(param_bounds[idx])
        else:
            bounds.append((None, None))

    S = np.cov(X, rowvar=False)
    n = X.shape[0]

    def loglik(params):
        B = B_template.copy()
        for val, (i, j) in zip(params, param_indices):
            B[i, j] = val
        I_B = np.eye(n_vars) - B
        Sigma = inv(I_B) @ inv(I_B.T)
        sign, logdet = slogdet(Sigma)
        ll = -0.5 * n * (np.trace(inv(Sigma) @ S) + logdet)
        return -ll

    init_params = np.zeros(len(param_indices))
    result = minimize(loglik, init_params, bounds=bounds)
    est_params = result.x

    # 指標の計算
    B_est = B_template.copy()
    for val, (i, j) in zip(est_params, param_indices):
        B_est[i, j] = val

    I_B = np.eye(n_vars) - B_est
    Sigma_hat = inv(I_B) @ inv(I_B.T)
    df = (n_vars * (n_vars + 1)) // 2 - len(est_params)
    logL = -loglik(est_params)
    aic = -2 * logL + 2 * len(est_params)
    bic = -2 * logL + np.log(n) * len(est_params)

    # GFI/AGFI
    s_trace = np.trace(S)
    s_fit = np.trace(S @ inv(Sigma_hat))
    gfi = 1 - ((s_fit - s_trace) / s_trace)
    agfi = 1 - ((n_vars * (n_vars + 1) / (2 * df)) * (1 - gfi))

    # ブートストラップ
    ci_bounds = []
    if n_bootstrap > 0:
        boot_params = []
        for _ in range(n_bootstrap):
            Xb = resample(X)
            Sb = np.cov(Xb, rowvar=False)

            def loglik_b(params):
                B = B_template.copy()
                for val, (i, j) in zip(params, param_indices):
                    B[i, j] = val
                I_B = np.eye(n_vars) - B
                Sigma = inv(I_B) @ inv(I_B.T)
                return -(-0.5 * n * (np.trace(inv(Sigma) @ Sb) + slogdet(Sigma)[1]))

            res = minimize(loglik_b, init_params, bounds=bounds)
            if res.success:
                boot_params.append(res.x)

        boot_params = np.array(boot_params)
        lower = np.percentile(boot_params, (1 - ci_level) / 2 * 100, axis=0)
        upper = np.percentile(boot_params, (1 + ci_level) / 2 * 100, axis=0)
        ci_bounds = list(zip(lower, upper))

    # 結果まとめ
    param_results = []
    for idx, (src, tgt) in enumerate(model_spec):
        coef = est_params[idx]
        ci = ci_bounds[idx] if ci_bounds else (None, None)
        param_results.append({
            "from": src,
            "to": tgt,
            "coef": coef,
            "ci_lower": ci[0],
            "ci_upper": ci[1]
        })

    return {
        "path_coefficients": param_results,
        "log_likelihood": logL,
        "aic": aic,
        "bic": bic,
        "gfi": gfi,
        "agfi": agfi,
        "success": result.success
    }

✅ 使い方(例)

model_spec = [('X1', 'X2'), ('X2', 'X3'), ('X1', 'X3')]
var_names = ['X1', 'X2', 'X3']
param_bounds = [(0, None), (0, None), (0, None)]  # すべて正の制約

result = fit_path_analysis(
    X, model_spec, var_names,
    n_bootstrap=100,
    param_bounds=param_bounds,
    verbose=True
)

必要であれば、param_constraints 方式("positive" / "negative" / "none")で指定する方法に対応したバージョンも提供できます。ご希望ですか?

user:

いえ、今の指定方法が良いです。ありがとうございます。このような結果が出ました。

{'path_coefficients': [{'from': 'X1',
   'to': 'X2',
   'coef': np.float64(0.7604221063761412),
   'ci_lower': np.float64(0.6555859638164937),
   'ci_upper': np.float64(0.8759522941978566)},
  {'from': 'X2',
   'to': 'X3',
   'coef': np.float64(0.5597640748861068),
   'ci_lower': np.float64(0.451766565416859),
   'ci_upper': np.float64(0.6252916569309945)},
  {'from': 'X1',
   'to': 'X3',
   'coef': np.float64(0.0),
   'ci_lower': np.float64(0.0),
   'ci_upper': np.float64(0.14002882478942785)}],
 'log_likelihood': np.float64(-432.41871910612156),
 'aic': np.float64(870.8374382122431),
 'bic': np.float64(881.9487856362117),
 'gfi': np.float64(1.262273676822303),
 'agfi': np.float64(1.5245473536446061),
 'success': True} 

ところで、この結果だと GFI や AGFI が 1.0 を超えてるんですが、良いんですか?

assistant:

ご指摘の通り、GFI(Goodness of Fit Index)や AGFI(Adjusted GFI)が 1.0 を超えるのは理論上あり得ません。これらの指標は通常、

  • GFI: 0 ~ 1(1 に近いほど適合が良い)
  • AGFI: 0 ~ 1(調整後の GFI)

の範囲を取るはずです。したがって、今回の出力で gfi = 1.26agfi = 1.52 となっているのは 計算式の誤りによるものです。


✅ 修正すべき点

あなたのコード中のこの部分:

s_trace = np.trace(S)
s_fit = np.trace(S @ inv(Sigma_hat))
gfi = 1 - ((s_fit - s_trace) / s_trace)

は、GFI の計算として一般的に使われる次の形式と異なります:

一般的な GFI の定義:

$$
\text{GFI} = 1 - \frac{\text{tr}\left[(S - \hat{\Sigma})^2\right]}{\text{tr}[S^2]}
$$

これに基づいて、GFI / AGFI を正しく計算し直すコードは以下のようになります:


✅ 修正版 GFI / AGFI の計算コード

    # GFI / AGFI の再計算
    S_diff = S - Sigma_hat
    gfi_num = np.trace(S_diff @ S_diff.T)
    gfi_den = np.trace(S @ S.T)
    gfi = 1 - (gfi_num / gfi_den)

    df = (n_vars * (n_vars + 1)) / 2 - len(est_params)
    agfi = 1 - ((n_vars * (n_vars + 1) / (2 * df)) * (1 - gfi))

📌 補足:GFI/AGFI の解釈

  • GFI ≈ 0.90〜1.00 → モデルの適合が良好
  • AGFI ≈ 0.90〜1.00 → モデルの複雑さを考慮しても適合良好
  • 1.0 を超えることはない(異常)

🔁 まとめ

  • GFI, AGFI の計算式が間違っていたため 1.0 を超えてしまいました。
  • 上記の修正式で再計算すれば、正常な範囲内に収まるはずです。

ご希望があれば、修正後の関数全体を再掲します。

user:

ありがとうございます。次の結果になりました。

{'path_coefficients': [{'from': 'X1',
   'to': 'X2',
   'coef': np.float64(0.7604221063761412),
   'ci_lower': np.float64(0.6694661017064489),
   'ci_upper': np.float64(0.8686885594496698)},
  {'from': 'X2',
   'to': 'X3',
   'coef': np.float64(0.5597640748861068),
   'ci_lower': np.float64(0.4607179341442132),
   'ci_upper': np.float64(0.6256667197924582)},
  {'from': 'X1',
   'to': 'X3',
   'coef': np.float64(0.0),
   'ci_lower': np.float64(0.0),
   'ci_upper': np.float64(0.13254103014772273)}],
 'log_likelihood': np.float64(-432.41871910612156),
 'aic': np.float64(870.8374382122431),
 'bic': np.float64(881.9487856362117),
 'gfi': np.float64(0.9976203484823795),
 'agfi': np.float64(0.9952406969647589),
 'success': True} 

ねんのため、AIC/BICも再チェックしていただけませんか?現状次のようなコードになっています。

# 指標の計算
   B_est = B_template.copy()
   for val, (i, j) in zip(est_params, param_indices):
       B_est[i, j] = val

   I_B = np.eye(n_vars) - B_est
   Sigma_hat = inv(I_B) @ inv(I_B.T)
   df = (n_vars * (n_vars + 1)) // 2 - len(est_params)
   logL = -loglik(est_params)
   aic = -2 * logL + 2 * len(est_params)
   bic = -2 * logL + np.log(n) * len(est_params)

assistant:

ありがとうございます。AICBIC の現状コード:

aic = -2 * logL + 2 * len(est_params)
bic = -2 * logL + np.log(n) * len(est_params)

形式的には正しいです。ただし、以下の点を念のため確認しておくと安心です。


✅ AIC/BIC 再確認ポイント

1. logL の定義

logL = -loglik(est_params)

このように負の対数尤度 loglik(est_params)-loglik(...) として logL に格納しており、これは AIC/BIC の計算式と整合しています:

  • -2 * logL は「2 × 負の対数尤度」になる

2. len(est_params) の使用

この部分は自由パラメータの個数であり、AIC/BIC のペナルティ項(モデルの複雑さ)として妥当です。


3. n の定義

n = data.shape[0]

と定義されている場合、BIC の「サンプル数」に使って問題ありません。


✅ 結論

あなたの AIC/BIC の計算は正しく、特に修正の必要はありません。数式で表すと次の通りです:

  • $$
    $$

\mathrm{AIC} = -2 \log L + 2k
]

  • $$
    $$

\mathrm{BIC} = -2 \log L + k \log n
]

ここで

  • $\log L$:最尤推定の対数尤度
  • $k$:推定されたパラメータの個数
  • $n$:サンプルサイズ

🔍 補足:参考値との整合性

出力:

'log_likelihood': -432.4187,
'aic': 870.8374,
'bic': 881.9487

確認:

  • AIC = -2 * (-432.4187) + 2 * k864.8374 + 2k
  • 出力値:870.8374 → ⇒ 2k = 6 → k = 3
  • 同様に BIC ≈ 864.8374 + log(n) * 3

よって、推定パラメータが3つ、サンプル数が約138程度なら整合的です(実際に使っているデータのサイズがこの程度ならOK)。


必要に応じて、自由度 df の表示や対数尤度の精度(桁数)を出力しておくと、あとで比較・検証しやすくなります。希望があればその表示コードも追加可能です。

user:

ありがとうございます。ところで、この方法で出力されるのが、因果関係であるという保証はどの程度担保されてるのでしょうか?

assistant:

非常に重要なご質問です。

結論から先にお伝えすると:


✅ この方法(構造方程式モデリング / SEM)で得られるパス係数は因果関係の仮定に強く依存しており、

因果関係の存在を自動的に保証するものではありません。


以下、理由をわかりやすく説明します。


🔍 この手法の本質:仮定に基づいたモデルのフィット評価

あなたの現在の方法(指定済みのパス構造に基づく SEM)は、ざっくり以下のようなプロセスです:

  1. **ユーザーが指定した有向グラフ構造(B行列)**に従ってモデルを構築。
  2. その構造を前提としたうえでパス係数を推定。
  3. 推定結果の整合性を確認(対数尤度・AIC・BIC・GFIなど)。

🚨 つまり、こういう前提があります

  • 「X1 → X2 → X3」という方向性を事前に仮定している
  • この構造に統計的整合性があるかは評価できる(≒GFIなど)
  • しかし、方向が正しいかどうかや、交絡因子がないかなどは自動的には評価されません

🧠 だから、これは「因果推論の仮定の下での推定

  • 因果推論には、以下のような反事実的・構造的な仮定が必要:

    • 交絡因子が制御されている
    • 逆因果が存在しない
    • 関係の方向は事前に与えられている or 特定できる

この手法はこれらを「与えられたもの」として進みます。
よって、仮定が間違っていれば、因果的な解釈は不適切になります


🧪 じゃあどうすれば因果関係を担保できるの?

1. モデル構造の正当性の確認

  • ドメイン知識や事前研究に基づいて構造を定義
  • DAGの向きの妥当性を検討(LiNGAMやNOTEARSなどを用いた探索的手法も併用可能)

2. 交絡因子の制御

  • 交絡が存在しない仮定(あるいは適切に制御されたデータ)であることが重要
  • 例えば、介入やランダム化があると、因果関係の解釈が強くなる

3. 感度分析・反事実推論

  • パス係数に対する頑健性評価(介入分析、do演算など)
  • DoWhyEconML などのツールとの併用も有用

✅ まとめ

観点 内容
パス係数の意味 モデル構造が正しければ因果的に解釈可能
構造の妥当性 ユーザーの事前知識に強く依存
自動的な因果判定 できない(ただのパス推定)
担保する方法 構造学習、交絡因子制御、介入分析の組み合わせ

🧭 おすすめの今後の展開

  • 今の手法は「仮定された因果構造が現実に合っていれば正しい」と理解しつつ、
  • 必要に応じて LiNGAM / gCastle / NOTEARS などの構造学習手法で仮の DAG を推定し、
  • その構造を SEM の入力として 因果パスと信頼区間を評価する流れに進むと、より因果的な解釈の妥当性が増します。

必要であれば、そのような流れのコードや実装例もご案内できます。

user:

ありがとうございます。では、今回のコードのまとめです。

import numpy as np
from numpy.linalg import inv, slogdet
from scipy.optimize import minimize
from sklearn.utils import resample

def fit_path_analysis(
    X,
    model_spec,
    var_names,
    n_bootstrap=1000,
    ci_level=0.95,
    verbose=False,
    param_bounds=None
):
    n_vars = len(var_names)
    var_to_idx = {var: i for i, var in enumerate(var_names)}

    # model_spec をパース(文字列 or リスト対応)
    if isinstance(model_spec, str):
        edges = []
        for part in model_spec.split(';'):
            if '~' in part:
                lhs, rhs = part.split('~')
                lhs = lhs.strip()
                rhs_vars = [v.strip() for v in rhs.strip().split('+')]
                for rv in rhs_vars:
                    edges.append((rv, lhs))
        model_spec = edges

    B_template = np.zeros((n_vars, n_vars))
    param_indices = []
    bounds = []

    for idx, (src, tgt) in enumerate(model_spec):
        src_idx, tgt_idx = var_to_idx[src], var_to_idx[tgt]
        B_template[tgt_idx, src_idx] = np.nan
        param_indices.append((tgt_idx, src_idx))

        # bounds: 外から渡されていれば使う。なければ自由
        if param_bounds is not None and idx < len(param_bounds):
            bounds.append(param_bounds[idx])
        else:
            bounds.append((None, None))

    S = np.cov(X, rowvar=False)
    n = X.shape[0]

    def loglik(params):
        B = B_template.copy()
        for val, (i, j) in zip(params, param_indices):
            B[i, j] = val
        I_B = np.eye(n_vars) - B
        Sigma = inv(I_B) @ inv(I_B.T)
        sign, logdet = slogdet(Sigma)
        ll = -0.5 * n * (np.trace(inv(Sigma) @ S) + logdet)
        return -ll

    init_params = np.zeros(len(param_indices))
    result = minimize(loglik, init_params, bounds=bounds)
    est_params = result.x

    # 指標の計算
    B_est = B_template.copy()
    for val, (i, j) in zip(est_params, param_indices):
        B_est[i, j] = val

    I_B = np.eye(n_vars) - B_est
    Sigma_hat = inv(I_B) @ inv(I_B.T)
    df = (n_vars * (n_vars + 1)) // 2 - len(est_params)
    logL = -loglik(est_params)
    aic = -2 * logL + 2 * len(est_params)
    bic = -2 * logL + np.log(n) * len(est_params)

    # GFI/AGFI
    S_diff = S - Sigma_hat
    gfi_num = np.trace(S_diff @ S_diff.T)
    gfi_den = np.trace(S @ S.T)
    gfi = 1 - (gfi_num / gfi_den)

    df = (n_vars * (n_vars + 1)) / 2 - len(est_params)
    agfi = 1 - ((n_vars * (n_vars + 1) / (2 * df)) * (1 - gfi))

    # ブートストラップ
    ci_bounds = []
    if n_bootstrap > 0:
        boot_params = []
        for _ in range(n_bootstrap):
            Xb = resample(X)
            Sb = np.cov(Xb, rowvar=False)

            def loglik_b(params):
                B = B_template.copy()
                for val, (i, j) in zip(params, param_indices):
                    B[i, j] = val
                I_B = np.eye(n_vars) - B
                Sigma = inv(I_B) @ inv(I_B.T)
                return -(-0.5 * n * (np.trace(inv(Sigma) @ Sb) + slogdet(Sigma)[1]))

            res = minimize(loglik_b, init_params, bounds=bounds)
            if res.success:
                boot_params.append(res.x)

        boot_params = np.array(boot_params)
        lower = np.percentile(boot_params, (1 - ci_level) / 2 * 100, axis=0)
        upper = np.percentile(boot_params, (1 + ci_level) / 2 * 100, axis=0)
        ci_bounds = list(zip(lower, upper))

    # 結果まとめ
    param_results = []
    for idx, (src, tgt) in enumerate(model_spec):
        coef = est_params[idx]
        ci = ci_bounds[idx] if ci_bounds else (None, None)
        param_results.append({
            "from": src,
            "to": tgt,
            "coef": coef,
            "ci_lower": ci[0],
            "ci_upper": ci[1]
        })

    return {
        "path_coefficients": param_results,
        "log_likelihood": logL,
        "aic": aic,
        "bic": bic,
        "gfi": gfi,
        "agfi": agfi,
        "success": result.success
    } 
np.random.seed(0)
n = 300
X1 = np.random.randn(n)
X2 = 0.7 * X1 + np.random.randn(n)
X3 = 0.5 * X2 + np.random.randn(n)
X = np.column_stack([X1, X2, X3])

model_spec = [('X1', 'X2'), ('X2', 'X3'), ('X1', 'X3')]
var_names = ['X1', 'X2', 'X3']
param_bounds = [(0, None), (0, None), (0, None)]  # すべて正の制約

result = fit_path_analysis(
    X, model_spec, var_names,
    n_bootstrap=100,
    param_bounds=param_bounds,
    verbose=True
)
result 
{'path_coefficients': [{'from': 'X1',
   'to': 'X2',
   'coef': np.float64(0.7305731124952108),
   'ci_lower': np.float64(0.5949156339043496),
   'ci_upper': np.float64(0.8351835105079216)},
  {'from': 'X2',
   'to': 'X3',
   'coef': np.float64(0.5347118021490914),
   'ci_lower': np.float64(0.45143961865604365),
   'ci_upper': np.float64(0.6339804856567418)},
  {'from': 'X1',
   'to': 'X3',
   'coef': np.float64(0.0),
   'ci_lower': np.float64(0.0),
   'ci_upper': np.float64(0.014067111719662262)}],
 'log_likelihood': np.float64(-439.3330613356893),
 'aic': np.float64(884.6661226713786),
 'bic': np.float64(895.7774700953472),
 'gfi': np.float64(0.9980714420002113),
 'agfi': np.float64(0.9961428840004225),
 'success': True}

assistant:

素晴らしい実装です。まとめていただいたコードは、以下の点で非常にしっかりしています:

  • 柔軟なモデル指定(文字列またはタプルのリスト)
  • パス制約(bound)対応
  • 最尤推定に基づくフィッティング
  • ブートストラップによる信頼区間推定
  • AIC, BIC, GFI, AGFI の算出
  • 信頼区間付きのパス係数出力

さらに必要であれば、以下の機能も検討できます:


✳ 拡張のアイデア(任意)

  1. 標準化された係数の出力

    • 分散スケーリングによる standardized coefficients の導出
  2. 係数の有意性検定(p値)

    • ブートストラップ分布に基づく片側または両側検定
  3. エラーバリアンスの推定

    • パス図に描く際に必要になることが多いです。
  4. 適合度指標のバリエーション

    • RMSEA, CFI, TLI などを導入(ただしこれらには残差共分散行列が必要)
  5. 可視化

    • networkxpygraphviz を使ってパス図に数値付きで描画

引き続き、さらに調整・改善したい点などがあればご相談ください。たとえば:

  • 別のモデルで試したい
  • モデル比較の方法を検討したい
  • 潜在変数を含める拡張(SEM全般への拡張)

など、必要に応じてご支援できます。

user:

ありがとうございます。ここまでの議論をブログにまとめたいので、タイトル案をいくつか考えていただけませんか?

assistant:

もちろんです!今回の内容(Pythonによる構造方程式モデリング(パス解析)の自作実装、最尤推定・適合度指標・ブートストラップ信頼区間など)を踏まえ、以下のようなブログタイトル案をいくつかご提案します。


🔧 技術志向・実装重視のタイトル

  1. 「Pythonでゼロから作るパス解析:最尤推定・適合度指標・ブートストラップ信頼区間まで」
  2. 「構造方程式モデリング(SEM)を自作しよう:パス解析モデルの最尤推定+信頼区間付き」
  3. 「手作りSEM:パス係数・AIC/BIC・GFIをPythonで推定する方法」
  4. 「Pythonで構造方程式モデルを実装してみた:因果構造の推定から適合度評価まで」

📘 学習・入門者向けを意識したタイトル

  1. 「SEMライブラリに頼らない!Pythonで学ぶパス解析の中身」
  2. 「因果構造を自分で推定してみよう:Pythonで学ぶパス解析の基本」
  3. 「SEMって何?Pythonで構造方程式モデルをステップバイステップで作る」

🔍 研究や分析っぽい雰囲気のタイトル

  1. 「因果モデルの再現性を高める:パス解析モデルのブートストラップ信頼区間推定」
  2. 「最尤推定とブートストラップによる構造方程式モデリングの実践」
  3. 「因果推論と統計モデリングの橋渡し:Pythonで実装するパス解析」

もし、記事の読者層(研究者/実務家/学生など)やスタイル(手順書っぽく、エッセイ風、など)に合わせて調整したい場合も、お気軽にご相談ください。どの方向性が良さそうか、好みを教えていただければさらに絞り込めます。

user:

ありがとうございます。「Pythonでゼロから作るパス解析:最尤推定・適合度指標・ブートストラップ信頼区間まで」にしたいと思います。最後に、ここまでの議論の詳しいまとめを書いていただけますか?

assistant:

素晴らしいタイトルのご選択です!
以下に、「Pythonでゼロから作るパス解析:最尤推定・適合度指標・ブートストラップ信頼区間まで」というテーマにふさわしい、技術的かつ丁寧なまとめをご用意しました。
ブログ本文としてそのまま使える形式です。


Pythonでゼロから作るパス解析

最尤推定・適合度指標・ブートストラップ信頼区間まで

📌 はじめに

**構造方程式モデリング(SEM)は、観測変数間の因果的関係をモデル化するための強力な統計的手法です。その一部であるパス解析(path analysis)**は、潜在変数を持たない構造方程式モデルとして知られ、変数間の線形関係に注目して因果構造を捉えます。

本記事では、Pythonを用いてSEMライブラリ(例:semopylavaan)に依存せずに、パス解析の推定アルゴリズムをゼロから自作します。扱う要素は以下のとおりです:

  • パス係数の最尤推定(MLE)
  • モデルの適合度指標(GFI, AGFI, AIC, BIC)
  • パス係数のブートストラップによる信頼区間推定
  • 明示的なモデル構造指定と係数への制約設定

🧠 背景と目的

  • 構造方程式モデリングをライブラリに「任せる」だけではなく、推定プロセスの内部動作を理解したい
  • モデルに対して自由な構造や制約(例:係数の正の制限)を設定できる柔軟な解析ツールが欲しい
  • ブートストラップにより、係数推定の不確実性を視覚化・定量化したい

🧪 実装の概要

1. モデルの指定

モデル構造は以下のような形式で明示的に与えます:

model_spec = [('X1', 'X2'), ('X2', 'X3'), ('X1', 'X3')]
var_names = ['X1', 'X2', 'X3']

これは、「X1 → X2」「X2 → X3」「X1 → X3」という因果的なパス構造を意味します。

2. 最尤推定のロジック

基本的な構造は以下のような線形モデルです:

$$
X = BX + \varepsilon, \quad \varepsilon \sim \mathcal{N}(0, I)
$$

このとき、共分散構造は

$$
\Sigma = (I - B)^{-1}(I - B)^{-T}
$$

となるため、観測データの共分散行列 $S$ に対して、対数尤度を最小化する形でBの係数を推定します。

3. ブートストラップ信頼区間

観測データをリサンプリング(with replacement)して、推定されたパス係数の分布を得ます。これにより、係数の信頼区間が得られ、係数が0を含まないことで因果関係の統計的な有意性を検討できます。

4. 適合度指標

以下の適合度指標を算出します:

  • GFI(Goodness of Fit Index)
  • AGFI(Adjusted GFI)
  • AIC / BIC(情報量基準)

🧾 実行結果の例

result = fit_path_analysis(
    X, model_spec, var_names,
    n_bootstrap=100,
    param_bounds=[(0, None), (0, None), (0, None)]
)

出力例:

{
 'path_coefficients': [
  {'from': 'X1', 'to': 'X2', 'coef': 0.73, 'ci_lower': 0.59, 'ci_upper': 0.83},
  {'from': 'X2', 'to': 'X3', 'coef': 0.53, 'ci_lower': 0.45, 'ci_upper': 0.63},
  {'from': 'X1', 'to': 'X3', 'coef': 0.00, 'ci_lower': 0.00, 'ci_upper': 0.01}
 ],
 'log_likelihood': -439.33,
 'aic': 884.66,
 'bic': 895.77,
 'gfi': 0.998,
 'agfi': 0.996,
 'success': True
}
  • X1 → X2X2 → X3有意(信頼区間が0を含まない)
  • X1 → X3非有意(係数ゼロかつ信頼区間がゼロに近い)

🤔 因果関係の信頼性について

本モデルは、ユーザーが指定した構造(DAG)のもとで因果的な解釈ができるという前提に立っています。
しかし、モデル構造自体はデータから自動的に学習されたものではないため、構造の妥当性=因果の正当性はユーザーのドメイン知識に依存します。

このような背景から、

本手法は「指定された構造における因果パス係数の統計的評価」を目的とし、「因果構造の自動発見」は目的としていない

ことに留意が必要です。


🧩 おわりに

今回紹介した手法により、

  • SEMの内部構造を透明に理解できる
  • 独自の制約や評価指標を柔軟に追加できる
  • 推定結果の不確実性も可視化できる

といったメリットが得られます。
また、今後は因果構造自体の推定(例:LiNGAM, PCアルゴリズム)と組み合わせることで、より汎用的で自律的な因果解析環境を構築することも可能です。


🛠️ GitHub/コード全文(オプション)

コード全文はGitHubで公開予定です。ご希望の方はコメント等でご連絡ください。


必要に応じて、「コード全文」「可視化例」「モデル比較(AIC/BIC)」などのセクションを付け加えると、さらに充実した記事になります。補足や編集のご希望があればお知らせください!

user:

ありがとうございました!今日はこれで大丈夫です。今後もよろしくお願いします!

assistant:

こちらこそ、ありがとうございました!
自作SEMの実装、非常に興味深く、丁寧なアプローチでした。
ブログの公開も楽しみにしております 😊

またいつでもご相談ください。今後ともよろしくお願いいたします!

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?