目的
SPSS Modelerで作成したCHAIDモデルについて、それぞれのレコードの分岐条件とスコアを出力する方法をご紹介します。
SPSS Modeler18.5から追加されたネイティブPythonの機能を活用してXMLファイルの読み込み、読み込んだ情報とナゲットの結合方法についてまとめています。
part2では分岐条件を1列にまとめて、ナゲットと結合して出力する方法をご紹介します。
対象読者
・日常業務でSPSS Modeler 18.5を使用しているデータ分析初級者~中級者の方
・今後SPSS Modeler18.5の導入を検討されている方
・作成したCHAIDモデルの現場への活用方法を検討されている方
前提
下記記事「CHAIDモデルのルール識別子毎の分岐条件出力方法について part1」の続きになります。
目次
1章 分岐条件の集約について
前回のpart1ではPMMLファイルから決定木のそれぞれのノードの分岐条件を抽出しました。
今回は階層列・親ノード列の値を使用して、それぞれの親ノードを辿る形で各階層の条件を列として追加するスクリプトを作成します。
2章 スクリプト作成方法について
データ操作のためのpandasと配列操作のためのnumpyをライブラリとしてインポートします。
import pandas as pd
import numpy as np
part1で作成した「_nodes.csv」を読み込みます。
# CSVファイルを読み込む
df = pd.read_csv(r'D:\adult\flag_nodes.csv', encoding ="shift-jis")
読み込んだデータの階層列について最大の深さの値を取得して、その値に応じて列を作成します。
各行について、親ノードを参照しながら、分岐条件を列に格納していき、根まで条件を取得していく関数「build_path_conditions」を作成します。
# 最大階層数を取得
max_hierarchy = df['階層'].max()
# 各ノードの親子関係を追跡して条件を構築する関数
def build_path_conditions(node_id):
path_conditions = [None] * max_hierarchy # 各階層の条件を格納する配列
# 現在のノードから根まで遡る
current_id = node_id
while current_id != '-':
node_row = df[df['ノードID'] == int(current_id)]
if len(node_row) == 0:
break
level = node_row['階層'].values[0]
condition = node_row['分岐条件'].values[0]
# 階層に対応する位置に条件を設定(階層は1から始まるので、インデックスは-1する)
path_conditions[level-1] = condition
# 親ノードへ移動
current_id = node_row['親ノード'].values[0]
return path_conditions
子ノードを持たないノードをリーフノードと定義した上で、リーフノードについてデータフレーム形式でデータをまとめます。
ルールIDには自身のノードID、スコアについても自身の値を保持します。
分岐条件については先ほど作成したbuild_path_conditions関数を使用して、それぞれの階層の位置の分岐条件を列として取得してデータフレーム形式にします。
ノードによっては必ずしも最大の分岐条件数を持つわけではないため、分岐条件のない階層の値については空文字に変換しています。
# リーフノードを特定(他のノードの親になっていないノード)
all_node_ids = set(df['ノードID'])
parent_node_ids = set(df['親ノード']) - {'-'} # ルートノードを除外
leaf_node_ids = all_node_ids - parent_node_ids
# 結果を格納するための新しいデータフレーム
result_rows = []
# 各リーフノードについて処理を行う
for leaf_id in leaf_node_ids:
leaf_row = df[df['ノードID'] == leaf_id]
# リーフノードの情報を取得
rule_id = leaf_id
score = leaf_row['スコア (成約確率)'].values[0]
# パス上の全条件を取得
conditions = build_path_conditions(leaf_id)
# 結果行を作成
result_row = {
'ルールID': rule_id,
'スコア': score
}
# 各階層の条件を列として追加(0から始める)
for i in range(max_hierarchy):
result_row[f'条件_階層{i}'] = conditions[i]
result_rows.append(result_row)
# 結果のデータフレームを作成
result_df = pd.DataFrame(result_rows)
# 条件がNoneの場合は空文字列に置き換え
result_df = result_df.fillna('')
作成したresult_dfをcsvファイルとして出力します。
ここではpart1で「_nodes.csv」を出力したフォルダに出力しています。
また、result_dfにはルートノードの情報のみを含んだ条件_階層0という列が含まれているため、この列を含むファイルと含まないファイルの二つを出力しています。
# すべての列を含むCSVファイルとして保存
result_df.to_csv(r'D:\adult\flag.csv', index=False)
# まず全ての列名を取得
all_columns = list(result_df.columns)
# 「条件_階層0」を除外
columns_without_level0 = [col for col in all_columns if col != '条件_階層0']
# 指定した列だけを含むデータフレームを作成
filtered_df = result_df[columns_without_level0]
# フィルタリングした結果をCSVファイルとして保存
filtered_df.to_csv(r'D:\adult\flag_filter.csv', index=False)
3章 SPSS Modelerでの操作について
作成したスクリプトを実行して、分岐条件をまとめたcsvファイルを出力してみます。
入力パレットのユーザー入力ノードをキャンバスに配置して、ダミーの入力ノードを作成します。
出力パレットの拡張の出力ノードをキャンバスに配置して、ユーザー入力ノードからリンクします。
pythonを選択し、2章で作成したスクリプトを貼り付けて実行します。
dfにはpart1で作成した「_nodes.csv」を保存したパスを指定してください。
(作成したスクリプトは下記です。)
付録(2章のスクリプト)
result_df.to_csvとfiltered_df.to_csvには、作成するファイルを保存するパスを指定してください。
実行すると分岐条件をまとめたcsvファイルが作成されます。
作成したファイルを読み込んでみましょう。
ここではルートノードを除外したfilterd_dfを読み込みます。
エンコードは「utf-8」を指定してください。
テーブルノードを実行すると各ノードのルートノードまでの分岐条件が1行になって出力されます。
次にナゲットと作成したファイルを結合します。
ナゲットをダブルクリックして設定タブで「ルール識別子」にチェックを入れます。
キーフィールドとなる「$RI-flag」のフィールド名を変更します。
ナゲットからフィルターノードへリンクし、「$RI-flag」を「ルールID」へ変更してください。
データの結合を行います。
レコード結合ノードをキャンバスに配置して、フィルターノードと「_filter.csv」からリンクしてください。
結合方法にキーを選択し、結合キーにルールIDを選択してください。
レコード結合ノードからテーブルノードへリンクして、実行してください。
各ルールID(ルール識別子)毎の分岐条件を出力に追加することが出来ます。
ここまでがルール識別子毎の分岐条件を表形式で出力する方法になります。
次の章では2つの拡張の出力ノードの処理を一つにまとめてみたいと思います。
4章 処理のまとめかたについて
この章では拡張の入力ノードを使用して処理を1つにまとめる方法をご紹介します。
1つにまとめる事で次回以降XMLファイルのパスを指定するだけで、SPSS Modelerで分岐条件を読み込めるようになります。
スクリプトをご紹介します。
2つの処理で使用したライブラリを記載します。
後程使用するSPSS Modeler上でPythonを使用するためのmodelerpyというライブラリもインポートします。
import xml.etree.ElementTree as ET
import pandas as pd
import numpy as np
import modelerpy
PMMLファイルからモデルの分岐条件を取得する部分については変更点はありません。
# XMLファイルを読み込む
def parse_decision_tree(xml_file_path):
# XMLファイルを解析
tree = ET.parse(xml_file_path)
root = tree.getroot()
# 名前空間を処理
ns = {'pmml': 'http://www.dmg.org/PMML-4_3'}
# 重要度情報を取得
importance_dict = {}
mining_fields = root.findall('.//pmml:MiningField', ns)
for field in mining_fields:
name = field.get('name')
importance = field.get('importance')
if importance:
importance_dict[name] = float(importance)
# 述語(条件)をテキストに変換する関数
def get_predicate_text(predicate):
if predicate is None:
return "条件なし"
# 複合条件(CompoundPredicate)の処理
if predicate.tag.endswith('CompoundPredicate'):
# まずは直下のCompoundPredicateのbooleanOperatorを取得
boolean_operator = predicate.get('booleanOperator', 'and')
# surrogateの場合、内側のCompoundPredicateのbooleanOperatorを優先
if boolean_operator == 'surrogate':
inner_predicates = predicate.findall('.//pmml:CompoundPredicate', ns)
if inner_predicates:
# 最初に見つかった内側のCompoundPredicateのbooleanOperatorを使用
boolean_operator = inner_predicates[0].get('booleanOperator', 'and')
# 条件を格納するリスト
conditions = []
# 内部のSimplePredicateを取得
for simple_predicate in predicate.findall('.//pmml:SimplePredicate', ns):
conditions.append(get_simple_predicate_text(simple_predicate))
return f" {boolean_operator} ".join(conditions)
# 単一条件(SimplePredicate)の処理
else:
return get_simple_predicate_text(predicate)
# 単純な述語をテキストに変換
def get_simple_predicate_text(predicate):
field = predicate.get('field')
operator = predicate.get('operator')
value = predicate.get('value')
# Shift-JIS対応の演算子にマッピング
op_map = {
'equal': '=',
'lessOrEqual': '≦', # ≤ → ≦
'greaterThan': '>',
'greaterOrEqual': '≧', # ≥ → ≧
'lessThan': '<',
'notEqual': '≠'
}
op_symbol = op_map.get(operator, operator)
return f"{field} {op_symbol} {value}"
# ノード情報を取得するための関数
def get_node_info(node, parent_id=None, level=1):
node_id = node.get('id')
score = node.get('score')
record_count = node.get('recordCount')
# 分岐条件を取得
predicates = node.findall('.//pmml:CompoundPredicate', ns) + node.findall('.//pmml:SimplePredicate', ns)
condition = "ルートノード" if parent_id is None else get_predicate_text(predicates[0] if predicates else None)
# スコア分布(確率)を取得
score_distributions = node.findall('.//pmml:ScoreDistribution', ns)
probability = next((sd.get('probability') for sd in score_distributions if sd.get('value') == '1'), "0")
return {
'ノードID': node_id,
'階層': level,
'親ノード': parent_id if parent_id is not None else '-',
'分岐条件': condition,
'スコア (成約確率)': f"{score} ({float(probability) * 100:.1f}%)"
}
# 再帰的にノードを処理してデータフレームを作成
def process_nodes(node, parent_id=None, level=1):
result = [get_node_info(node, parent_id, level)]
# 子ノードを処理
for child in node.findall('./pmml:Node', ns):
result.extend(process_nodes(child, node.get('id'), level + 1))
return result
# ルートノードを取得
root_node = root.find('.//pmml:Node', ns)
# 全ノードの情報を取得
node_data = process_nodes(root_node)
# 重要度ランキングのデータフレームを作成
importance_data = [{'変数名': k, '重要度': v} for k, v in importance_dict.items()]
importance_df = pd.DataFrame(importance_data).sort_values('重要度', ascending=False)
return pd.DataFrame(node_data), importance_df
「#出力形式を選択してファイルを作成」という部分は削除し、読み込むPMMLファイルを指定して、作成した関数を実行する形へ変更します。
また、3行目については後半の処理内容と合わせるために「nodes_df」の部分を「df」へ変更しています。
ここまでが1つ目の拡張の出力ノードの処理内容になります。
# 実行例
xml_file_path = r'D:/adult/flag.xml'
df, importance_df = parse_decision_tree(xml_file_path)
前半部分で作成した「df」を使用するため、csvファイルの読み込みは削除しています。
また、「#現在のノードから根まで遡る」の部分の「node_row = df[df['ノードID'] == (current_id)]」について(current_id)の前のintを削除しています。
xml.etree.ElementTreeで読み込むと数値であっても値は文字列として読み込まれます。
先ほどまでは1度csvファイルに落としてから読み込んでいたため数値へ変換しましたが、今回は不要です。
# 最大階層数を取得
max_hierarchy = df['階層'].max()
# 各ノードの親子関係を追跡して条件を構築する関数
def build_path_conditions(node_id):
path_conditions = [None] * max_hierarchy # 各階層の条件を格納する配列
# 現在のノードから根まで遡る
current_id = node_id
while current_id != '-':
node_row = df[df['ノードID'] == (current_id)]
if len(node_row) == 0:
break
level = node_row['階層'].values[0]
condition = node_row['分岐条件'].values[0]
# 階層に対応する位置に条件を設定(階層は1から始まるので、インデックスは-1する)
path_conditions[level-1] = condition
# 親ノードへ移動
current_id = node_row['親ノード'].values[0]
return path_conditions
後半も処理内容については変更点はありません。
# リーフノードを特定(他のノードの親になっていないノード)
all_node_ids = set(df['ノードID'])
parent_node_ids = set(df['親ノード']) - {'-'} # ルートノードを除外
leaf_node_ids = all_node_ids - parent_node_ids
# 結果を格納するための新しいデータフレーム
result_rows = []
# 各リーフノードについて処理を行う
for leaf_id in leaf_node_ids:
leaf_row = df[df['ノードID'] == leaf_id]
# リーフノードの情報を取得
rule_id = leaf_id
score = leaf_row['スコア (成約確率)'].values[0]
# パス上の全条件を取得
conditions = build_path_conditions(leaf_id)
# 結果行を作成
result_row = {
'ルールID': rule_id,
'スコア': score
}
# 各階層の条件を列として追加(0から始める)
for i in range(max_hierarchy):
result_row[f'条件_階層{i}'] = conditions[i]
result_rows.append(result_row)
# 結果のデータフレームを作成
result_df = pd.DataFrame(result_rows)
# 条件がNoneの場合は空文字列に置き換え
result_df = result_df.fillna('')
# 条件_階層0を除外した列のみを含むデータフレームを作成
# まず全ての列名を取得
all_columns = list(result_df.columns)
# 「条件_階層0」を除外
columns_without_level0 = [col for col in all_columns if col != '条件_階層0']
# 指定した列だけを含むデータフレームを作成
filtered_df = result_df[columns_without_level0]
拡張の入力ノードを使用してデータを読み込む際にはデータモデルを作成する必要があります。
データモデルについては以下の記事にまとめていますので、ご興味のある方は参考にしてください。
# 修正された変換表を作成
mapping_data = {
'dtypes': ['int64', 'float64', 'object', 'bool', 'datetime64[ns]'],
'StorageType': ['integer', 'real', 'string', 'integer', 'timestamp'],
'Measurement': ['ordinal', 'continuous', 'nominal', 'flag', 'continuous']
}
mapping_df = pd.DataFrame(mapping_data)
# データモデル生成関数
def generate_data_model(df, mapping):
# Convert mapping DataFrame to a dictionary for easier lookup
type_mapping = mapping.set_index('dtypes').to_dict(orient='index')
# Generate modelerpy.Field entries
field_entries = []
for column in df.columns:
dtype = str(df[column].dtypes)
if dtype in type_mapping:
storage_type = type_mapping[dtype]['StorageType']
measurement = type_mapping[dtype]['Measurement']
field_entries.append(
f"modelerpy.Field('{column}', '{storage_type}', '{measurement}')"
)
else:
print(f"Warning: No mapping found for dtype '{dtype}' in column '{column}'")
# Create the dataModel string
data_model = "modelerpy.DataModel([\n " + ",\n ".join(field_entries) + "\n])"
return data_model
data_model_script = generate_data_model(filtered_df , mapping_df)
# データモデルの参照時
if modelerpy.isComputeDataModelOnly():
outputDataModel = None
### Compute output data model here
### 出力データモデルの設定
eval_data_model = eval(data_model_script) # 文字列を評価してオブジェクトとして扱う
modelerpy.setOutputDataModel(eval_data_model)
# データ出力時
else:
outputData = None
### ファイルからデータの読み込み
outputData = filtered_df
### データの出力
modelerpy.writePandasDataframe(filtered_df)
以上が処理を一つにまとめたスクリプトです。
SPSS Modelerで拡張の入力ノードをキャンバスに配置し、Pythonを選択した上でスクリプトを貼り付けてください。
(作成したスクリプトは下記です。)
付録(4章のスクリプト)
ご自身の環境に合わせて「xml_file_path」にPMMLファイルの保存しているパスを指定してください。
データ型ノードを配置して値を読み込んでください。
ここでフィールド名が表示されない場合にはデータの読み込み時にエラーが出ています。
XMLファイルのパスまたはデータモデル作成時の変数名が合っているかを確認してください。
置換ノードを配置して「ルールID」列の値を整数へ変換します。
xml.etree.ElementTreeで読み込むと数値であっても値は文字列として読み込まれます。
ナゲット側のルールIDのデータ型と揃えるために変換しています。
データ加工の作業は以上になります。
3章と同じようにレコード結合ノードを使用して、フィルターノードと結合します。
結合方法はキーを選択し、結合キーにはルールIDを選択します。
テーブルノードで出力すると3章と同じ結果になります。
あとがき
2回に渡ってCHAIDモデルのルール識別子を使用して、分岐条件を出力する方法をご紹介いたしました。今後はPMMLファイルの出力についてもスクリプトで操作し、より自動化に近づけていきたいと思っています。
記事の内容につきましてご不明点、ご指摘ございましたらお気軽にコメントください。
拙い文章ではございますが、最後までお付き合いいただきありがとうございました。
付録(2章のスクリプト)
import pandas as pd
import numpy as np
# CSVファイルを読み込む
df = pd.read_csv(r'D:\adult\flag_nodes.csv', encoding ="shift-jis")
# 最大階層数を取得
max_hierarchy = df['階層'].max()
# 各ノードの親子関係を追跡して条件を構築する関数
def build_path_conditions(node_id):
path_conditions = [None] * max_hierarchy # 各階層の条件を格納する配列
# 現在のノードから根まで遡る
current_id = node_id
while current_id != '-':
node_row = df[df['ノードID'] == int(current_id)]
if len(node_row) == 0:
break
level = node_row['階層'].values[0]
condition = node_row['分岐条件'].values[0]
# 階層に対応する位置に条件を設定(階層は1から始まるので、インデックスは-1する)
path_conditions[level-1] = condition
# 親ノードへ移動
current_id = node_row['親ノード'].values[0]
return path_conditions
# リーフノードを特定(他のノードの親になっていないノード)
all_node_ids = set(df['ノードID'])
parent_node_ids = set(df['親ノード']) - {'-'} # ルートノードを除外
leaf_node_ids = all_node_ids - parent_node_ids
# 結果を格納するための新しいデータフレーム
result_rows = []
# 各リーフノードについて処理を行う
for leaf_id in leaf_node_ids:
leaf_row = df[df['ノードID'] == leaf_id]
# リーフノードの情報を取得
rule_id = leaf_id
score = leaf_row['スコア (成約確率)'].values[0]
# パス上の全条件を取得
conditions = build_path_conditions(leaf_id)
# 結果行を作成
result_row = {
'ルールID': rule_id,
'スコア': score
}
# 各階層の条件を列として追加(0から始める)
for i in range(max_hierarchy):
result_row[f'条件_階層{i}'] = conditions[i]
result_rows.append(result_row)
# 結果のデータフレームを作成
result_df = pd.DataFrame(result_rows)
# 条件がNoneの場合は空文字列に置き換え
result_df = result_df.fillna('')
# すべての列を含むCSVファイルとして保存
result_df.to_csv(r'D:\adult\flag.csv', index=False)
# まず全ての列名を取得
all_columns = list(result_df.columns)
# 「条件_階層0」を除外
columns_without_level0 = [col for col in all_columns if col != '条件_階層0']
# 指定した列だけを含むデータフレームを作成
filtered_df = result_df[columns_without_level0]
# フィルタリングした結果をCSVファイルとして保存
filtered_df.to_csv(r'D:\adult\flag_filter.csv', index=False)
付録(4章のスクリプト)
import xml.etree.ElementTree as ET
import pandas as pd
import numpy as np
import modelerpy
# XMLファイルを読み込む
def parse_decision_tree(xml_file_path):
# XMLファイルを解析
tree = ET.parse(xml_file_path)
root = tree.getroot()
# 名前空間を処理
ns = {'pmml': 'http://www.dmg.org/PMML-4_3'}
# 重要度情報を取得
importance_dict = {}
mining_fields = root.findall('.//pmml:MiningField', ns)
for field in mining_fields:
name = field.get('name')
importance = field.get('importance')
if importance:
importance_dict[name] = float(importance)
# 述語(条件)をテキストに変換する関数
def get_predicate_text(predicate):
if predicate is None:
return "条件なし"
# 複合条件(CompoundPredicate)の処理
if predicate.tag.endswith('CompoundPredicate'):
# まずは直下のCompoundPredicateのbooleanOperatorを取得
boolean_operator = predicate.get('booleanOperator', 'and')
# surrogateの場合、内側のCompoundPredicateのbooleanOperatorを優先
if boolean_operator == 'surrogate':
inner_predicates = predicate.findall('.//pmml:CompoundPredicate', ns)
if inner_predicates:
# 最初に見つかった内側のCompoundPredicateのbooleanOperatorを使用
boolean_operator = inner_predicates[0].get('booleanOperator', 'and')
# 条件を格納するリスト
conditions = []
# 内部のSimplePredicateを取得
for simple_predicate in predicate.findall('.//pmml:SimplePredicate', ns):
conditions.append(get_simple_predicate_text(simple_predicate))
return f" {boolean_operator} ".join(conditions)
# 単一条件(SimplePredicate)の処理
else:
return get_simple_predicate_text(predicate)
# 単純な述語をテキストに変換
def get_simple_predicate_text(predicate):
field = predicate.get('field')
operator = predicate.get('operator')
value = predicate.get('value')
# Shift-JIS対応の演算子にマッピング
op_map = {
'equal': '=',
'lessOrEqual': '≦', # ≤ → ≦
'greaterThan': '>',
'greaterOrEqual': '≧', # ≥ → ≧
'lessThan': '<',
'notEqual': '≠'
}
op_symbol = op_map.get(operator, operator)
return f"{field} {op_symbol} {value}"
# ノード情報を取得するための関数
def get_node_info(node, parent_id=None, level=1):
node_id = node.get('id')
score = node.get('score')
record_count = node.get('recordCount')
# 分岐条件を取得
predicates = node.findall('.//pmml:CompoundPredicate', ns) + node.findall('.//pmml:SimplePredicate', ns)
condition = "ルートノード" if parent_id is None else get_predicate_text(predicates[0] if predicates else None)
# スコア分布(確率)を取得
score_distributions = node.findall('.//pmml:ScoreDistribution', ns)
probability = next((sd.get('probability') for sd in score_distributions if sd.get('value') == '1'), "0")
return {
'ノードID': node_id,
'階層': level,
'親ノード': parent_id if parent_id is not None else '-',
'分岐条件': condition,
'スコア (成約確率)': f"{score} ({float(probability) * 100:.1f}%)"
}
# 再帰的にノードを処理してデータフレームを作成
def process_nodes(node, parent_id=None, level=1):
result = [get_node_info(node, parent_id, level)]
# 子ノードを処理
for child in node.findall('./pmml:Node', ns):
result.extend(process_nodes(child, node.get('id'), level + 1))
return result
# ルートノードを取得
root_node = root.find('.//pmml:Node', ns)
# 全ノードの情報を取得
node_data = process_nodes(root_node)
# 重要度ランキングのデータフレームを作成
importance_data = [{'変数名': k, '重要度': v} for k, v in importance_dict.items()]
importance_df = pd.DataFrame(importance_data).sort_values('重要度', ascending=False)
return pd.DataFrame(node_data), importance_df
# 実行例
xml_file_path = r'D:/adult/flag.xml'
df, importance_df = parse_decision_tree(xml_file_path)
# 最大階層数を取得
max_hierarchy = df['階層'].max()
# 各ノードの親子関係を追跡して条件を構築する関数
def build_path_conditions(node_id):
path_conditions = [None] * max_hierarchy # 各階層の条件を格納する配列
# 現在のノードから根まで遡る
current_id = node_id
while current_id != '-':
node_row = df[df['ノードID'] == (current_id)]
if len(node_row) == 0:
break
level = node_row['階層'].values[0]
condition = node_row['分岐条件'].values[0]
# 階層に対応する位置に条件を設定(階層は1から始まるので、インデックスは-1する)
path_conditions[level-1] = condition
# 親ノードへ移動
current_id = node_row['親ノード'].values[0]
return path_conditions
# リーフノードを特定(他のノードの親になっていないノード)
all_node_ids = set(df['ノードID'])
parent_node_ids = set(df['親ノード']) - {'-'} # ルートノードを除外
leaf_node_ids = all_node_ids - parent_node_ids
# 結果を格納するための新しいデータフレーム
result_rows = []
# 各リーフノードについて処理を行う
for leaf_id in leaf_node_ids:
leaf_row = df[df['ノードID'] == leaf_id]
# リーフノードの情報を取得
rule_id = leaf_id
score = leaf_row['スコア (成約確率)'].values[0]
# パス上の全条件を取得
conditions = build_path_conditions(leaf_id)
# 結果行を作成
result_row = {
'ルールID': rule_id,
'スコア': score
}
# 各階層の条件を列として追加(0から始める)
for i in range(max_hierarchy):
result_row[f'条件_階層{i}'] = conditions[i]
result_rows.append(result_row)
# 結果のデータフレームを作成
result_df = pd.DataFrame(result_rows)
# 条件がNoneの場合は空文字列に置き換え
result_df = result_df.fillna('')
# 条件_階層0を除外した列のみを含むデータフレームを作成
# まず全ての列名を取得
all_columns = list(result_df.columns)
# 「条件_階層0」を除外
columns_without_level0 = [col for col in all_columns if col != '条件_階層0']
# 指定した列だけを含むデータフレームを作成
filtered_df = result_df[columns_without_level0]
# 修正された変換表を作成
mapping_data = {
'dtypes': ['int64', 'float64', 'object', 'bool', 'datetime64[ns]'],
'StorageType': ['integer', 'real', 'string', 'integer', 'timestamp'],
'Measurement': ['ordinal', 'continuous', 'nominal', 'flag', 'continuous']
}
mapping_df = pd.DataFrame(mapping_data)
# データモデル生成関数
def generate_data_model(df, mapping):
# Convert mapping DataFrame to a dictionary for easier lookup
type_mapping = mapping.set_index('dtypes').to_dict(orient='index')
# Generate modelerpy.Field entries
field_entries = []
for column in df.columns:
dtype = str(df[column].dtypes)
if dtype in type_mapping:
storage_type = type_mapping[dtype]['StorageType']
measurement = type_mapping[dtype]['Measurement']
field_entries.append(
f"modelerpy.Field('{column}', '{storage_type}', '{measurement}')"
)
else:
print(f"Warning: No mapping found for dtype '{dtype}' in column '{column}'")
# Create the dataModel string
data_model = "modelerpy.DataModel([\n " + ",\n ".join(field_entries) + "\n])"
return data_model
data_model_script = generate_data_model(filtered_df , mapping_df)
# データモデルの参照時
if modelerpy.isComputeDataModelOnly():
outputDataModel = None
### Compute output data model here
### 出力データモデルの設定
eval_data_model = eval(data_model_script) # 文字列を評価してオブジェクトとして扱う
modelerpy.setOutputDataModel(eval_data_model)
# データ出力時
else:
outputData = None
### ファイルからデータの読み込み
outputData = filtered_df
### データの出力
modelerpy.writePandasDataframe(filtered_df)