はじめに
効果検証入門 ~正しい比較のための因果推論/計量経済学の基礎内のソースコードをPythonで再現します。
既に素晴らしい先人の実装例がありますが、自分の勉強用のメモとして残しておきます。
この記事では、3章について記載します。
コードは、githubにも掲載しています。
なお、変数名や処理内容は、基本的に書籍内に寄せて実装します。
ロジスティック回帰
ロジスティック回帰の実装は、scikit-learnでもstatsmodelsでも可能。
scikit-learn
scikit-learnは、ダミー化の必要あり。
from sklearn.linear_model import LogisticRegression
X = pd.get_dummies(biased_data[['recency', 'history', 'channel']]) # channelをダミー変数化
treatment = biased_data['treatment']
model = LogisticRegression().fit(X, treatment)
statsmodels
glm
もしくはlogit
クラスを使う。
statsmodelsはダミー化の必要はないが、モデル内で一部のカテゴリ値が使われないことがある様子。(以下の例ではchannel=Multichannel
が使われない。原因はよく分からない。)
気になる場合は、ダミー化してやればsklearnの結果と一致する。
from statsmodels.formula.api import glm, logit
model = glm('treatment ~ history + recency + channel', data=biased_data, family=sm.families.Binomial()).fit()
# model = logit('treatment ~ history + recency + channel', data=biased_data).fit() # 上と同様の結果
傾向スコアを用いた推定
傾向スコアを用いた推定方法としてとりあげられている「傾向スコアマッチング」と「IPW」は、DoWhyで実装できる。
各分析を行う前に、共通の因果推論用のインスタンスを定義する。
その際に、介入変数をbool化する必要があることに注意。
from dowhy import CausalModel
biased_data = biased_data.astype({"treatment":'bool'}, copy=False) # treatmentをbool化
model=CausalModel(
data = biased_data,
treatment='treatment',
outcome='spend',
common_causes=['recency', 'history', 'channel']
)
傾向スコアマッチング
最近傍マッチング
最近傍マッチングは、書籍内で説明されている介入有サンプルと介入無サンプルを傾向スコアが近いもの同士で1:1にマッチングする手法だ。
本書ではATTの推定のみ行っているが、DoWhyではATEの推定も可能。デフォルトはATEなので注意。内部の実装で両方計算しているから両方出してくれればいいのだが・・・。
なお、このATEは、ATTとATC(介入無サンプルへの介入効果の期待値)の2つから算出しているみたい。
nearest_ate = model.estimate_effect(
identified_estimand,
method_name="backdoor.propensity_score_matching",
target_units='ate',
)
nearest_att = model.estimate_effect(
identified_estimand,
method_name="backdoor.propensity_score_matching",
target_units='att',
)
print(f'ATE: {nearest_ate.value}')
print(f'ATT: {nearest_att.value}')
層別マッチング
本書内で登場していないが、層別マッチングも実装されている。
ソースコードを確認すると、データを同数件ずつにnum_strata
層に分割して、推定するようだ。
clipping_threshold
は、各層で介入有/無それぞれの件数を比較し、この値以下の層を除外する。(恐らく極端にどちらかの介入データが少ない層を除外して推定するため)
stratification_ate = model.estimate_effect(
identified_estimand,
method_name="backdoor.propensity_score_stratification",
target_units='ate',
method_params={'num_strata':50, 'clipping_threshold':5},
)
stratification_att = model.estimate_effect(
identified_estimand,
method_name="backdoor.propensity_score_stratification",
target_units='att',
method_params={'num_strata':50, 'clipping_threshold':5},
)
print(f'ATE: {stratification_ate.value}')
print(f'ATT: {stratification_att.value}')
IPW
ipw_estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.propensity_score_weighting",
target_units='ate',
)
print(f'ATE: {ipw_estimate.value}')
(おまけ)回帰分析
2章で行った回帰分析も可能である。
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression", #test_significance=True
)
print('Causal Estimate is ' + str(estimate.value))
標準化平均差の可視化
共変量のバランスを確認するための標準化平均差の可視化は対応していない。
そのため、手間であるが、自前で算出する。
傾向スコアマッチング
傾向スコアマッチングの対応関係を参照できないようなので、自分でもマッチングを実装する。
実装はこの辺りを参考にしている。
distance
の求め方は分からず適当に算出したので、あまり参考にしないように。
from sklearn.neighbors import NearestNeighbors
# ASAMの算出
def calculate_asam(data, treatment_column, columns):
treated_index = data[treatment_column]
data = pd.get_dummies(data[columns])
asam = data.apply(lambda c: abs(c[treated_index].mean() - c[~treated_index].mean()) / c.std())
asam['distance'] = np.sqrt(np.sum(asam**2)) # 定義が分からん
return asam
# 傾向スコアマッチング
def get_matching_data(data, treatment_column, propensity_score):
data['ps'] = propensity_score
treated = data[data[treatment_column] == 1]
control = data[data[treatment_column] == 0]
# 介入有サンプルのマッチング
control_neighbors = NearestNeighbors(n_neighbors=1, algorithm='ball_tree').fit(control['ps'].values.reshape(-1, 1))
distances, indices = control_neighbors.kneighbors(treated['ps'].values.reshape(-1, 1))
# distancesでマッチングの閾値を設定できる(未対応)
matching_data = pd.concat([treated, control.iloc[indices.flatten()]])
return matching_data
matching_data = get_matching_data(biased_data[['treatment', 'recency', 'channel', 'history']], 'treatment', nearest_ate.propensity_scores)
unadjusted_asam = calculate_asam(biased_data, 'treatment', ['recency', 'history', 'channel'])
matching_adjusted_asam = calculate_asam(matching_data, 'treatment', ['recency', 'history', 'channel'])
IPW
DescrStatsW
を使うと、重み付きの統計値が簡単に算出できる。が、重みをしてない時の値が、numpyの結果と若干ずれる?ので扱うときは注意。
from statsmodels.stats.weightstats import DescrStatsW
def calculate_asam_weighted(data, treatment_column, columns, propensity_score):
data = pd.get_dummies(data[[treatment_column] + columns])
data['ps'] = propensity_score
data['weight'] = data[[treatment_column, 'ps']].apply(lambda x: 1 / x['ps'] if x[treatment_column] else 1 / (1 - x['ps']), axis=1)
asam_dict = dict()
for column_name, column_value in data.drop([treatment_column, 'ps', 'weight'], axis=1).iteritems():
treated_stats = DescrStatsW(column_value[data[treatment_column]], weights=data[data[treatment_column]]['weight'])
control_stats = DescrStatsW(column_value[~data[treatment_column]], weights=data[~data[treatment_column]]['weight'])
asam_dict[column_name] = abs(treated_stats.mean - control_stats.mean) / treated_stats.std
asam = pd.Series(asam_dict)
asam['distance'] = np.sqrt(np.sum(asam**2)) # 定義が分からん
return asam
ipw_adjusted_asam = calculate_asam_weighted(biased_data, 'treatment', ['recency', 'history', 'channel'], ipw_estimate.propensity_scores)
可視化
%matplotlib inline
import matplotlib.pyplot as plt
plt.scatter(unadjusted_asam, unadjusted_asam.index, label='unadjusted')
plt.scatter(matching_adjusted_asam, matching_adjusted_asam.index, label='adjusted(matching)')
plt.scatter(ipw_adjusted_asam, ipw_adjusted_asam.index, label='adjusted(ipw)')
plt.vlines(0.1, 0, len(unadjusted_asam)-1, linestyles='dashed')
plt.legend()
plt.show()
なんとなくIPWの方が良さげだが、恐らく傾向スコアマッチングでは最近傍マッチングでマッチングするサンプルの重複を許容していて、一部のサンプルに偏っているためだと思われる。