以前SPSS ModelerでR拡張ノードやRDBの関数をつかって正規表現を使うという記事を書きました。
SPSS Modelerで正規表現を使う - Qiita
今回は同様のことをPython拡張ノードで行います。
以下のようにNAMEという列の文字列から前株の「株式会社」という文字列を「(株)」に置き換えるNAME_REという列を追加したいと思います。
サンプルストリームは以下です。
テスト環境
- Modeler 18.3
- Windows10 64bit
- Python 3.8.10
#1. Rの拡張ノードを使う
python拡張ノードでstr.replaceのregex=Trueで正規表現を行います。
#ライブラリのインポート
import spss.pyspark.runtime
from pyspark.sql.types import *
import pandas as pd
#コンテキストオブジェクト定義
asContext = spss.pyspark.runtime.getContext()
sqlContext = asContext.getSparkSQLContext()
target_col_name='NAME'
res_col_name=target_col_name+'_RE'
old='^株式会社'
new=r'(株)'
if asContext.isComputeDataModelOnly():
#スキーマ設定
inputSchema = asContext.getSparkInputSchema()
outputSchema = inputSchema
outputSchema.fields.append(StructField(res_col_name, StringType(), nullable=True))
asContext.setSparkOutputSchema(outputSchema)
else:
#データ読込
indf = asContext.getSparkInputData()
df = indf.toPandas()
#正規表現加工
df[res_col_name]=df[target_col_name].str.replace(old, new, regex=True)
#データ書出し
outdf = sqlContext.createDataFrame(df)
asContext.setSparkOutputData(outdf)
少し解説します。
以下は定型的な文です。SPSSとpySparkのライブラリを読み込み、コンテキストオブジェクトを取得しています。
#ライブラリのインポート
import spss.pyspark.runtime
from pyspark.sql.types import *
#コンテキストオブジェクト定義
asContext = spss.pyspark.runtime.getContext()
sqlContext = asContext.getSparkSQLContext()
以下は出力データのスキーマを定義しています。
まず、入力データのスキーマをasContext.getSparkInputSchema()で取得し、
fields.append(StructField(res_col_name, StringType(), nullable=True))
で新たな列を追加しています。
if asContext.isComputeDataModelOnly():
#スキーマ設定
inputSchema = asContext.getSparkInputSchema()
outputSchema = inputSchema
outputSchema.fields.append(StructField(res_col_name, StringType(), nullable=True))
asContext.setSparkOutputSchema(outputSchema)
次にデータの処理をします。
入力データをasContext.getSparkInputData()を取得します。
SparkDataFrameとして取得されるので、toPandas()でSparkDataFrameをPandasのDataFrameに変換します。
正規表現による列追加は以下で行っています。
df[res_col_name]=df[target_col_name].str.replace(old, new, regex=True)
そして、sqlContext.createDataFrame(df)でpandasのDataFrameをSparkDataFrameに変換しなおして、asContext.setSparkOutputData(outdf)で出力データとして設定しています。
else:
#データ読込
indf = asContext.getSparkInputData()
df = indf.toPandas()
#正規表現加工
df[res_col_name]=df[target_col_name].str.replace(old, new, regex=True)
#データ書出し
outdf = sqlContext.createDataFrame(df)
asContext.setSparkOutputData(outdf)