SPSS ModelerではJSONファイルをJSONファイルノードで読み込むことができます。しかし、階層が深くなっていたり、繰り返し項目があったり、読み出しルートを変えたいということがあると、JSONファイルノードでは、読み込みたい形式で読めないことがあります。
そのため、ここではPythonの拡張ノードを使って、以下の4つのパターンのJSONファイルを読み込んでみます。
①シンプルな構造のJSON
②ネスト構造のJSON
③ネスト構造とリストを含むJSON
④ルートが読み出しルートではないJSON
■環境
Modeler 18.4
python 3.8.10
pandas 1.5.2
json 2.0.9
Windows 11
サンプルストリーム
サンプルデータ
事前作業
以下の記事を参考にSPSSの拡張ノードでPythonを利用できるようにしておきます。
SPSS Modelerの拡張ノードでPythonを利用する(ミニマム設定) - Qiita
パッケージの導入
個人フォルダに作っている場合は普通にpip installできますが、All Userで利用できるPythonを"C:\Program Files\Python37"に導入しているので、管理者権限でpip installを行ってください。
python -m pip install pandas
python -m pip install json
拡張のインポートノードの作成
拡張のインポートノードをキャンバスに配置して、「シンタックス」タブで「Python for Spark」を選び、Pythonシンタックスに以下のコードをコピーペーストで貼り付けてください。
このスクリプトで4パターンのJSONファイルを読むことができます。
#入力JSONファイル名
jsonfile='C:/temp/json/01simple.json'
#df化したい辞書リストのルート要素
rowsroot = ""
#列名をフラット化する際の区切り文字
sep = '.'
#--------------------------------------
#JSON読込関数
import pandas as pd
import json
# フラット化
def flatten(d, parent_key='', sep='.'):
items = []
for k, v in d.items():
# 列名の生成
new_key = parent_key + sep + k if parent_key else k
# 辞書型項目のフラット化
if isinstance(v, dict):
items.extend(flatten(v, new_key, sep=sep).items())
# リスト項目のフラット化
elif isinstance(v, list):
new_key_tmp = new_key
for i, elm in enumerate(v):
new_key = new_key_tmp + sep + str(i)
# リストの中の辞書
if isinstance(elm, dict):
items.extend(flatten(elm, new_key, sep=sep).items())
# 単なるリスト
else:
items.append((new_key, elm))
# 値追加
else:
items.append((new_key, v))
return dict(items)
def flattenJsonFile(jsonfile, rowsroot, sep='.'):
"""
JSONファイルを読み込み2次元のpandas DataFrameに変換する
Parameters
----------
jsonfile : string
JSONファイルパス
rowsroot : string
フラット化するルートエレメント名。トップからでいい場合は空文字を入力する
sep : string
ノーマライズされていないエレメントを区切る文字
Returns
-------
df : pandas.DataFrame
フラット化されたpandas DataFrame
"""
# JSONファイルを読込
with open(jsonfile, encoding='utf-8') as f:
d = json.load(f)
# df化したい辞書リストのルート項目を指定
if rowsroot != '':
d = d[rowsroot]
# フラット化
dlist = []
for di in d:
dlist.append(flatten(di, sep=sep))
# フラット化された辞書をpandasデータフレームに変換
return pd.DataFrame.from_dict(dlist)
#--------------------------------------
#Modelerで使用
import spss.pyspark.runtime
from pyspark.sql.types import *
asContext = spss.pyspark.runtime.getContext()
sqlContext = asContext.getSparkSQLContext()
# フラット化
df = flattenJsonFile(jsonfile, rowsroot, sep)
#Booleanはsparkdfに変換できないのでobjectはstrに変換
for col in df.columns:
if df[col].dtype.name=='object':
df[col]=df[col].astype(str)
# pandasdfからsparkdfに変換
outdf = sqlContext.createDataFrame(df)
outputSchema = outdf.schema
#print(type(outputSchema))
#print(outputSchema)
asContext.setSparkOutputSchema(outputSchema)
#SparkのDataFrameをModelerに返却
if not asContext.isComputeDataModelOnly():
asContext.setSparkOutputData(outdf)
JSONファイルの読み込み
それぞれのパターンのファイルを読み込んでみます。
①シンプルな構造のJSONの読込
以下のようなシンプルなJSONはJSONファイルノードでも読むことができます。
[
{
"id":1000,
"UP_TIME":0,
"POWER":948,
"TEMP":250
},
{
"id":1000,
"UP_TIME":1,
"POWER":945,
"TEMP":251,
"ERR_CD":1
}
]
JSONファイルノードで読み込む
拡張のインポートノードで読み込む
拡張のインポートノードでも読んでみます。
Pythonシンタックスで読み込むファイルを指定します。2行目のファイル名を「01simple.json」に設定してください。
#入力JSONファイル名
jsonfile='C:/temp/json/01simple.json'
JSONファイルノードと同様に読み込むことができますが、NULLの形式が異なります。
以下の置換ノードでSparkのNULLを「to_string(@FIELD)='nan'」で判定し、ModelerのNULL(undef)に置き換えます。
②ネスト構造を持つJSONの読込
次のデータはidの下にM_CDとUP_TIMEという2つのデータがぶら下がっています。
[
{
"id":{
"M_CD":1000,
"UP_TIME":0
},
"sensor":{"POWER":948,
"TEMP":250
}
},
{
"id":{
"M_CD":1000,
"UP_TIME":1
},
"sensor":{"POWER":945,
"TEMP":251
},
"code":{
"ERR_CD":1
}
}
]
JSONファイルノードで読み込む
idの下にぶら下がっている、M_CDやUP_TIMEが一つの列の中に入ってしまいます。
拡張のインポートノードで読み込む
Pythonシンタックスで読み込むファイルを指定します。2行目のファイル名を「02notnormal.json」に設定してください。
#入力JSONファイル名
jsonfile='C:/temp/json/02notnormal.json'
やはり置換ノードでSparkのNULLを「to_string(@FIELD)='nan'」で判定し、ModelerのNULL(undef)に置き換えます。
「id」をid.M_CDとid.UP_TIMEの2列に切り出して読み込めました。
③ネスト構造とリストを含むJSON
次のデータは、ネスト化された辞書があるだけではなく、さらにcodeの下にERR_CDとMESSAGEのデータがリストで複数含まれています。
[
{
"id":{
"M_CD":1000,
"UP_TIME":0
},
"sensor":{"POWER":948,
"TEMP":250
}
},
{
"id":{
"M_CD":1000,
"UP_TIME":1
},
"sensor":{"POWER":945,
"TEMP":251
},
"code":[
{
"ERR_CD":1,
"MESSAGE":"part1"
}
]
},
{
"id":{
"M_CD":1000,
"UP_TIME":2
},
"sensor":{"POWER":943,
"TEMP":255
},
"code":[
{
"ERR_CD":2,
"MESSAGE":"part2"
},
{
"ERR_CD":3,
"MESSAGE":"part3"
}
]
}
]
JSONファイルノードで読み込む
拡張のインポートノードで読み込む
Pythonシンタックスで読み込むファイルを指定します。2行目のファイル名を「03list.json」に設定してください。
#入力JSONファイル名
jsonfile='C:/temp/json/03list.json'
やはり置換ノードでSparkのNULLを「to_string(@FIELD)='nan'」で判定し、ModelerのNULL(undef)に置き換えます。
code列が、'code.0.ERR_CD', 'code.0.MESSAGE', 'code.1.ERR_CD', 'code.1.MESSAGE'の4つの列としてフラット化されました。
④ルートが読み出しルートではないJSONの読込
特に、REST APIでJSONデータを取得した場合の多くは、今まで見てきたようなリスト化されたJSONデータだけではなく、ヘッダーのような情報と組み合わせた一つの辞書型構造のデータであることが多いと思います。
例えば以下のようなJSONです。total_rowsという文書全体の属性をあらわすヘッダー的な項目があり、実際のデータはrowsの中のリストとして含まれています。
{"total_rows":3,"rows":
[
{
"id":{
"M_CD":1000,
"UP_TIME":0
},
"sensor":{"POWER":948,
"TEMP":250
}
},
{
"id":{
"M_CD":1000,
"UP_TIME":1
},
"sensor":{"POWER":945,
"TEMP":251
},
"code":[
{
"ERR_CD":1,
"MESSAGE":"part1"
}
]
},
{
"id":{
"M_CD":1000,
"UP_TIME":2
},
"sensor":{"POWER":943,
"TEMP":255
},
"code":[
{
"ERR_CD":2,
"MESSAGE":"part2"
},
{
"ERR_CD":3,
"MESSAGE":"part3"
}
]
}
]
}
JSONファイルノードで読み込む
拡張のインポートノードで読み込む
Pythonシンタックスで読み込むファイルを指定します。2行目のファイル名を「04notroot.json」に設定してください。また、4行目の「rowsroot = "rows"」で"rows"
をルートとして読み込むという設定をします。
#入力JSONファイル名
jsonfile='C:/temp/json/04notroot.json'
#df化したい辞書リストのルート要素
rowsroot = "rows"
やはり置換ノードでSparkのNULLを「to_string(@FIELD)='nan'」で判定し、ModelerのNULL(undef)に置き換えます。
スクリプトの解説
以下で基本設定をしています。
jsonfileで入力ファイル名を指定しています。
rowsrootで読み出し始めるルートの要素を指定しています。
sepでネストした列を接続する際の区切り文字を指定しています。
#入力JSONファイル名
jsonfile='C:/temp/json/01simple.json'
#df化したい辞書リストのルート要素
rowsroot = ""
#列名をフラット化する際の区切り文字
sep = '.'
以下でflattenとflattenJsonFileという関数を定義しています。この解説はこのページを参考にしてください。
#--------------------------------------
#JSON読込関数
---省略----
以下で定義したflattenJsonFile関数でJSONファイルをフラット化してPandasのDataFrameに変換しています。
# フラット化
df = flattenJsonFile(jsonfile, rowsroot, sep)
PandasのBoolean型はSparkのDataFrameに変換できないのでstr型に変換しています。
#Booleanはsparkdfに変換できないのでobjectはstrに変換
for col in df.columns:
if df[col].dtype.name=='object':
df[col]=df[col].astype(str)
PandasのDataFrameからSparkのDataFrameに変換しています。
outdf = sqlContext.createDataFrame(df)
outputSchema = outdf.schema
asContext.setSparkOutputSchema(outputSchema)
SparkのDataFrameをModelerに返しています。
if not asContext.isComputeDataModelOnly():
asContext.setSparkOutputData(outdf)
参考
SPSS ModelerのPython拡張ノード関連記事へのリンク - Qiita
JSONファイルをフラットなpandasデータフレームに変換 - Qiita
Python for Spark を使用したスクリプト