2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MCMC × SEM × DAG :目的変数に到達する因果構造をPythonで探索する

Posted at

user:

0か1かの隣接行列で表されるDAG(有向非巡回グラフ)があります。そのDAGには評価関数があり、その値は小さいほど優れているものとします。

ノード(頂点)には、ラグ(遅延)という属性があります。ノードには、説明変数か目的変数のどちらかの属性があります。目的変数ノードのラグ属性はゼロです。目的変数ノードには、他の変数からの矢印(辺)が入ってきますが、目的変数ノードから出る矢印はありません。

説明変数ノードのラグ属性は、ゼロまたは正の整数の値があります。説明変数ノードは、他の説明変数ノード、または目的変数ノードに向けて矢印(辺)を伸ばすことができますが、制限があります。一つの目的変数ノードから伸びる矢印は、1本または2本のどちらかです。また、自分と同じラグ属性のノード、あるいは自分より小さい値ラグ属性のノードにしか矢印を伸ばせません。

以上のような制限を満たすDAGのうち、評価関数を最適化するDAGをMCMCで発見するPython関数を書いてください。

assistant:

以下は、指定された制約条件を満たしつつ、評価関数を最小化するDAGを MCMC(マルコフ連鎖モンテカルロ法)で探索する Python 関数の例です。

この関数は次の要素を含みます:

  1. DAG構造の生成と更新
  2. 構造制約のチェック(ラグ条件、目的変数の制限など)
  3. MCMCによる遷移提案と受容判断
  4. 評価関数に基づくスコア計算(ユーザー定義)

Python コード例

import numpy as np
import random
import copy

def evaluate_dag(adj_matrix):
    """評価関数(小さい方が良い): 例としてエッジ数を返す"""
    return np.sum(adj_matrix)

def is_valid_dag(adj_matrix, lags, targets):
    """制約条件をすべてチェックする関数"""
    num_nodes = len(adj_matrix)

    # 有向非巡回かチェック(トポロジカルソート)
    def has_cycle(matrix):
        visited = [0] * num_nodes
        def dfs(v):
            if visited[v] == 1:
                return True
            if visited[v] == 2:
                return False
            visited[v] = 1
            for u in range(num_nodes):
                if matrix[v][u] and dfs(u):
                    return True
            visited[v] = 2
            return False
        return any(dfs(i) for i in range(num_nodes))

    if has_cycle(adj_matrix):
        return False

    for i in range(num_nodes):
        for j in range(num_nodes):
            if adj_matrix[i][j]:
                # 自分と同じラグまたは小さいラグのノードにしか矢印を伸ばせない
                if lags[j] > lags[i]:
                    return False

                # 目的変数ノードから矢印を出せない
                if targets[i] == 1:
                    return False

                # 目的変数ノードには矢印が1本または2本しか入らない
                if targets[j] == 1:
                    in_edges = sum(adj_matrix[:, j])
                    if in_edges > 2 or in_edges == 0:
                        return False

    return True

def propose_new_dag(adj_matrix):
    """辺の追加・削除・反転をランダムに提案"""
    new_adj = copy.deepcopy(adj_matrix)
    i, j = random.sample(range(len(adj_matrix)), 2)
    if new_adj[i][j] == 1:
        new_adj[i][j] = 0  # remove edge
    else:
        new_adj[i][j] = 1  # add edge
        new_adj[j][i] = 0  # prevent bidirectional edge
    return new_adj

def mcmc_dag_search(lags, targets, iterations=1000, temp=1.0):
    """
    MCMC による最適 DAG 探索

    Parameters:
    - lags: 各ノードのラグ(例: [1, 0, 2, 0])
    - targets: 目的変数なら1, 説明変数なら0(例: [0, 1, 0, 1])
    """
    num_nodes = len(lags)
    adj_matrix = np.zeros((num_nodes, num_nodes), dtype=int)
    best_matrix = adj_matrix.copy()
    best_score = evaluate_dag(best_matrix)

    for step in range(iterations):
        proposal = propose_new_dag(adj_matrix)
        if is_valid_dag(proposal, lags, targets):
            score = evaluate_dag(proposal)
            delta = score - best_score
            acceptance_prob = min(1, np.exp(-delta / temp))
            if random.random() < acceptance_prob:
                adj_matrix = proposal
                if score < best_score:
                    best_score = score
                    best_matrix = proposal.copy()

    return best_matrix, best_score

使用例

lags = [1, 0, 2, 0]      # 各ノードのラグ(目的変数はラグ0)
targets = [0, 1, 0, 1]   # 目的変数ノード = 1

best_dag, score = mcmc_dag_search(lags, targets, iterations=5000)
print("最良DAG:\n", best_dag)
print("スコア:", score)

補足

  • evaluate_dag() は、用途に応じてユーザーがカスタム評価関数に置き換えてください。
  • 提案方法(propose_new_dag)や温度(temp)パラメータの調整で、探索性能を改善できます。
  • is_valid_dag() は制約を厳密にチェックしていますが、パフォーマンス改善の余地があります。

ご希望に応じて、以下のような拡張も可能です:

  • 評価関数を回帰誤差などのモデルベースで定義
  • ラグ順に DAG ノードを自動並び替え
  • networkx による可視化

必要があれば、続きもお手伝いします。

user:

ありがとうございます。バッチリです。では、上記のコードをベースに、次の改変をしたコードを全文書いてください。

・入力は pandas dataframe 形式のデータであり、変数名は列ラベルに書いてある。
・説明変数は、feature_cols という変数に、列ラベルのリストとして入っている。
・目的変数は、target_cols という変数に、列ラベルのリストとして入っている。
・評価関数evaluate_dagは、矢印ごとに回帰誤差を算出してその平均を求めた値とする。

assistant:

以下は、指定された条件に基づいて pandas DataFrame を入力とし、feature_cols および target_cols を用いて変数の種別を判断し、矢印(エッジ)ごとに線形回帰誤差を計算した平均値を評価関数とするように改変した MCMC DAG 探索の完全なコードです。


✅ 改変点まとめ

  • 入力が pandas.DataFrame 形式に対応
  • feature_cols, target_cols をもとにノード情報を構築
  • evaluate_dag() が各エッジごとに線形回帰を実行し、平均MSEをスコアとする

🔧 改変済みコード

import numpy as np
import pandas as pd
import random
import copy
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

def evaluate_dag(adj_matrix, data, col_names):
    """DAGにおける各エッジで回帰誤差を計算し、その平均を返す"""
    num_nodes = len(col_names)
    total_error = 0
    edge_count = 0

    for i in range(num_nodes):
        parents = [j for j in range(num_nodes) if adj_matrix[j][i] == 1]
        if parents:
            X = data[[col_names[j] for j in parents]].values
            y = data[col_names[i]].values
            model = LinearRegression()
            model.fit(X, y)
            preds = model.predict(X)
            mse = mean_squared_error(y, preds)
            total_error += mse
            edge_count += 1

    return total_error / edge_count if edge_count > 0 else np.inf

def is_valid_dag(adj_matrix, lags, targets):
    num_nodes = len(adj_matrix)

    # DAG 判定(トポロジカルソートで巡回確認)
    def has_cycle(matrix):
        visited = [0] * num_nodes
        def dfs(v):
            if visited[v] == 1:
                return True
            if visited[v] == 2:
                return False
            visited[v] = 1
            for u in range(num_nodes):
                if matrix[v][u] and dfs(u):
                    return True
            visited[v] = 2
            return False
        return any(dfs(i) for i in range(num_nodes))

    if has_cycle(adj_matrix):
        return False

    for i in range(num_nodes):
        for j in range(num_nodes):
            if adj_matrix[i][j]:
                # ラグ条件: jのラグ <= iのラグ
                if lags[j] > lags[i]:
                    return False
                # 目的変数ノードからは出られない
                if targets[i] == 1:
                    return False
                # 目的変数ノードには1〜2本の入力のみ
                if targets[j] == 1:
                    in_edges = sum(adj_matrix[:, j])
                    if in_edges > 2 or in_edges == 0:
                        return False

    return True

def propose_new_dag(adj_matrix):
    new_adj = copy.deepcopy(adj_matrix)
    i, j = random.sample(range(len(adj_matrix)), 2)
    if new_adj[i][j] == 1:
        new_adj[i][j] = 0
    else:
        new_adj[i][j] = 1
        new_adj[j][i] = 0  # 双方向防止
    return new_adj

def mcmc_dag_search_from_dataframe(df, feature_cols, target_cols, lags_dict, iterations=1000, temp=1.0):
    """
    DataFrame から MCMC により最適 DAG を探索

    Parameters:
    - df: pandas.DataFrame
    - feature_cols: 説明変数の列名リスト
    - target_cols: 目的変数の列名リスト
    - lags_dict: 変数名 → ラグ(int) の辞書。目的変数は自動的に0に修正される。
    """
    col_names = feature_cols + target_cols
    num_nodes = len(col_names)

    # ノードごとのラグとターゲット属性を構築
    lags = []
    targets = []
    for col in col_names:
        if col in target_cols:
            lags.append(0)
            targets.append(1)
        else:
            lags.append(lags_dict.get(col, 0))
            targets.append(0)

    # 初期状態
    adj_matrix = np.zeros((num_nodes, num_nodes), dtype=int)
    best_matrix = adj_matrix.copy()
    best_score = evaluate_dag(best_matrix, df, col_names)

    for step in range(iterations):
        proposal = propose_new_dag(adj_matrix)
        if is_valid_dag(proposal, lags, targets):
            score = evaluate_dag(proposal, df, col_names)
            delta = score - best_score
            acceptance_prob = min(1, np.exp(-delta / temp))
            if random.random() < acceptance_prob:
                adj_matrix = proposal
                if score < best_score:
                    best_score = score
                    best_matrix = proposal.copy()

    return best_matrix, best_score, col_names

✅ 使い方の例

import pandas as pd

# データフレーム例
df = pd.DataFrame({
    'X1': np.random.rand(100),
    'X2': np.random.rand(100),
    'X3': np.random.rand(100),
    'Y1': np.random.rand(100),
    'Y2': np.random.rand(100),
})

feature_cols = ['X1', 'X2', 'X3']
target_cols = ['Y1', 'Y2']
lags_dict = {'X1': 2, 'X2': 1, 'X3': 0}  # Y1, Y2 は自動的にラグ0になる

dag, score, cols = mcmc_dag_search_from_dataframe(df, feature_cols, target_cols, lags_dict, iterations=5000)
print("最良DAG(隣接行列):\n", pd.DataFrame(dag, index=cols, columns=cols))
print("平均回帰誤差スコア:", score)

必要であれば、可視化 (networkxpyvis を使った DAG の描画) も追加できます。ご希望があれば続けて対応いたします。

user:

ありがとうございます。何度か計算してみましたが、矢印を伸ばしていない説明変数が出てきます。そのようなDAGにペナルティを与えるようなことはできませんか?

assistant:

はい、矢印をまったく出していない説明変数(孤立ノード)にペナルティを与えることは可能です。特に説明変数が何の因果的役割も果たしていない場合、それを評価関数でマイナス評価したいという要望はもっともです。


✅ 対応方針

以下のように evaluate_dag() を修正します:

  1. 説明変数ノードのうち、出力エッジ(矢印)を持たないノードをカウント。
  2. その数に応じてペナルティを加算。
  3. 総評価値にペナルティを足して返す。

🔧 修正済み evaluate_dag 関数

以下は、ペナルティ付き評価関数です:

def evaluate_dag(adj_matrix, data, col_names, feature_cols, penalty_weight=1.0):
    """
    各エッジで回帰誤差を計算し、その平均に「矢印を出していない説明変数ノードのペナルティ」を加算。
    """
    num_nodes = len(col_names)
    total_error = 0
    edge_count = 0

    # 回帰誤差の計算
    for i in range(num_nodes):
        parents = [j for j in range(num_nodes) if adj_matrix[j][i] == 1]
        if parents:
            X = data[[col_names[j] for j in parents]].values
            y = data[col_names[i]].values
            model = LinearRegression()
            model.fit(X, y)
            preds = model.predict(X)
            mse = mean_squared_error(y, preds)
            total_error += mse
            edge_count += 1

    avg_error = total_error / edge_count if edge_count > 0 else np.inf

    # ペナルティ項: 説明変数で出力のないノードの数
    isolated_features = 0
    for i, var in enumerate(col_names):
        if var in feature_cols:
            out_edges = sum(adj_matrix[i, :])
            if out_edges == 0:
                isolated_features += 1

    penalty = penalty_weight * isolated_features
    return avg_error + penalty

🧩 呼び出し側での変更点

上の関数を使うために、以下の点だけ呼び出し関数で修正します:

# 修正前
best_score = evaluate_dag(best_matrix, df, col_names)

# 修正後(feature_cols を渡す)
best_score = evaluate_dag(best_matrix, df, col_names, feature_cols)

# 同様にスコアを計算する箇所すべてで:
score = evaluate_dag(proposal, df, col_names, feature_cols)

⚙ ペナルティの強さの調整

  • penalty_weight=1.0 は1変数あたり1.0の誤差を加える設定です。
  • もっと強く penalize したければ値を増やしてください(例: penalty_weight=5.0)。

✅ 補足

  • この方法は、「説明変数が出力を持っているかどうか」による構造的妥当性を重視する場合に非常に有効です。
  • より高度なアプローチとしては、事前分布のように罰則項をベイズ的に解釈することも可能です(ご希望があればその方向でも対応できます)。

ご希望に応じて、コード全体の再構築(修正済み関数を含む)も提供可能です。ご希望ですか?

user:

「事前分布のように罰則項をベイズ的に解釈」とは、どういうことでしょうか?

assistant:

とても良い質問です。「事前分布のように罰則項をベイズ的に解釈する」というのは、MCMCの評価関数(すなわち目的関数や「擬似尤度」)にペナルティを加える方法を、ベイズ統計における事前分布(prior)として解釈するという考え方です。


🔍 もう少し具体的に説明すると

MCMCでは、通常は次のような量を最大化(あるいは、負の対数を最小化)しようとします:

通常のMCMCの目標

$$
\text{posterior} \propto \text{likelihood} \times \text{prior}
$$

対数をとると:

$$
\log p(\text{model} \mid \text{data}) = \log p(\text{data} \mid \text{model}) + \log p(\text{model})
$$

このとき、以下のように構成できます:

  • log p(data | model)データに対する回帰誤差など、適合度
  • log p(model)モデル(構造)に対する事前信念(=罰則)

🔧 今回の問題に当てはめると:

  • 「矢印をまったく出していない説明変数はおかしい」
     → 「そういう構造のDAGは事前的にありえにくい
     → それを log p(model) で低く評価する(=ペナルティ)

✅ 実装上の違いは?

現在のような手法では:

score = 回帰誤差 + ペナルティλ × 説明変数の孤立数

ベイズ的に考えるなら:

log_posterior = -回帰誤差 + log_prior= -λ × 孤立数   負の形で考えることが多い

つまり、ペナルティ項は事前分布に由来するものと考えられます。
(ペナルティ項 ≒ -log p(model)


🎓 メリット

  • 「この構造の方が信念として好ましい」という事前知識を反映できる
  • モデルの柔軟な制御が可能になる
  • ペナルティをチューニングするときに**ベイズ的な解釈(確率的な意味)**を持たせられる

🛠 例:ベイズ的な事前分布で孤立ノードを罰する

孤立ノード数を $k$、ペナルティ強度を $\lambda$ としたとき、

p(model)  \exp(-λ × k)   事前分布
log p(model) = -λ × k

これを評価関数に組み込むと:

score = data_loss - log_prior = 回帰誤差 + λ × 孤立数

つまり、「ベイズ的なモデル選択」と、いま行っている「罰則付き目的関数の最小化」は、実質的に同じことを違う言葉で表しているのです。


💡 まとめ

観点 解釈
回帰誤差 尤度(likelihood)
罰則項 事前分布(prior)に対応
評価関数全体 負の対数事後分布(-log posterior)
MCMCでの目的 「良い適合度」かつ「自然な構造」を探す

ベイズ的解釈を念頭におけば、「説明変数は必ずどこかに矢印を出すべき」というような構造に対する信念を自然に組み込むことができます。

さらに興味があれば、「ベイズネットワーク構造学習におけるスパース事前分布」や「構造事前分布の定式化」などもご紹介できます。興味ありますか?

user:

説明ありがとうございます。現在は次のようなコードになっていますが、ベイズ的に解釈すると、このコードがどこか変わるんでしょうか?

def mcmc_dag_search_from_dataframe(df, feature_cols, target_cols, lags_dict, iterations=1000, temp=1.0):
    """
    DataFrame から MCMC により最適 DAG を探索

    Parameters:
    - df: pandas.DataFrame
    - feature_cols: 説明変数の列名リスト
    - target_cols: 目的変数の列名リスト
    - lags_dict: 変数名 → ラグ(int) の辞書。目的変数は自動的に0に修正される。
    """
    col_names = feature_cols + target_cols
    num_nodes = len(col_names)

    # ノードごとのラグとターゲット属性を構築
    lags = []
    targets = []
    for col in col_names:
        if col in target_cols:
            lags.append(0)
            targets.append(1)
        else:
            lags.append(lags_dict.get(col, 0))
            targets.append(0)

    # 初期状態
    adj_matrix = np.zeros((num_nodes, num_nodes), dtype=int)
    best_matrix = adj_matrix.copy()
    best_score = evaluate_dag(best_matrix, df, col_names, feature_cols)

    for step in range(iterations):
        proposal = propose_new_dag(adj_matrix)
        if is_valid_dag(proposal, lags, targets):
            score = evaluate_dag(proposal, df, col_names, feature_cols)
            delta = score - best_score
            acceptance_prob = min(1, np.exp(-delta / temp))
            if random.random() < acceptance_prob:
                adj_matrix = proposal
                if score < best_score:
                    best_score = score
                    best_matrix = proposal.copy()

    return best_matrix, best_score, col_names

assistant:

ありがとうございます。
コード自体はベイズ的解釈を意識した設計になっていると言えますが、明示的に「尤度 × 事前分布」として扱いたい場合、次のようなポイントが変わってくる(または明示的に区別して書くことが多い)ので解説します。


1. 現状コードの解釈

  • evaluate_dag() の返す値を 「損失関数(回帰誤差+ペナルティ)」 として扱い、
  • MCMCの受容確率は $\min(1, e^{-\Delta / T})$ で決めている。

これをベイズ的に見ると、

  • score は負の対数事後確率(あるいは擬似的な対数事後)に比例する量のマイナスで、
  • $\exp(-\Delta / T)$ は事後確率の比(= $\frac{p(\text{model}|\text{data})}{p(\text{model}_{\text{old}}|\text{data})}$)に対応しています。

2. ベイズ的に「尤度」と「事前分布」を分けて書く例

例えば、評価関数を

def log_likelihood(adj_matrix, data, col_names, feature_cols):
    # 回帰誤差の負の対数尤度部分を計算して返す
    pass

def log_prior(adj_matrix, feature_cols):
    # 孤立ノード数などから事前のlog確率を計算して返す
    pass

def log_posterior(adj_matrix, data, col_names, feature_cols, lambda_prior=1.0):
    return log_likelihood(adj_matrix, data, col_names, feature_cols) + \
           lambda_prior * log_prior(adj_matrix, feature_cols)

として、

score = -log_posterior(...)  # MCMCの目的関数は負の対数事後を最小化

のように分けて書きます。


3. MCMC受容判定の例(現状とほぼ同じ)

delta = score_proposal - score_current
acceptance_prob = min(1, np.exp(-delta / temp))

この形は負の対数事後に基づくメトロポリス法の典型形で変わりません。


4. まとめていうと

  • 現状のコードは、負の対数事後を「損失関数(回帰誤差+ペナルティ)」としてまとめて計算しているので、
  • 明示的に尤度と事前を分けているだけで、アルゴリズムや受容判定の計算式は変わりません。

5. 補足

もし以下のような利点を得たい場合は、

  • 事前の強さ(ペナルティ重み)をパラメータとして明示的に調整したい
  • 複数の種類の事前分布(ペナルティ)を混在させたい
  • 尤度関数や事前関数を別々に解析・デバッグしたい

このように 尤度・事前を分離 してコードを書くことが推奨されます。


もし、分離した形のサンプルコードもご希望でしたらご連絡ください!

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?