v18.5以外のユーザー向け-Python for Sparkのシンタックスを書いてみる
前回で最後の予定でしたが、v18.5のユーザーさんしか、今のところネイティブPythonでのシンタックスは使えません、v18.4以前のユーザーさんにも参考になるようにPython for Spark(PySpark)のシンタックスも紹介しようと思います。
本記事では、v18.5で実施していますがv18.4以前でも同様の手順で可能です。
Pythonの対応バージョン等が異なりますので別途確認してください。
2024/10月時点 - IBM System Requirements
・IBM SPSS Modeler
- Supported Sofrware - Compilers and Languages の項目に詳細が記載されています。
PythonとRのバージョンについては"以上"としている通り、「Prerequisite Minimum」となります。
サポート対象は、下記バージョンからの「Future Fix Packs」が対象となります。
「Future Versions, Releases」はサポート対象外となります。
Modeler | Python | R |
---|---|---|
18.2.1 | 2.7.0 以上 | 3.3.0 以上 |
18.2.2 | 3.7.7 以上 | 3.5.1 以上 |
18.3 | 3.8.6 以上 | 4.4.0 以上 |
18.4 | 3.8.6 以上 | 4.4.0 以上 |
18.5 | 3.10.0 以上 | 4.4.0 以上 |
うーん。v18.3以上ではRが4.4.0以上になっていますね。リリース当初より結構サポートバージョンが上がっています。
※.前回の記事においてコレスポンデンス分析でRは"4.2.3"を使っていましたがサポート対象のバージョンがリリース時より上がっていました。m(_ _)m
※.記事は動作確認を行い、Rを4.4.1に変更しております。
1. 準備
Python for Sparkのシンタックスを使う場合は、ModelerのデフォルトPython環境ではなく、ユーザーが用意したPython環境をできるだけ利用しましょう。(必須ではありませんが、アクセス権などで融通が利くので)
①.Pythonの導入
以下のサイトから対応するPythonをダウンロードしてインストールしてください。
インストールパスはModelerとは異なる場所にインストールしましょう。
ここでは、v18.5がv3.10.X系をサポートしているので、v3.10.11を「C:\Python\310」へインストールしました。
②.Modelerから利用するPythonのパス設定
Modelerに同梱されているPythonも使えますが、別環境にインストールしたPythonを使う設定をします。
Modelerのインストールフォルダにあるconfigフォルダ内の「options.cfg」ファイルをテキストエディタで編集します。(アクセス権が必要です。)
デフォルトでは、以下のフォルダになりす。
"C:\Program Files\IBM\SPSS\Modeler\18.5\config"
options.cfgファイルを開き、「eas_pyspark_python_path」に、インストールしたpython.exeへのフルパスを記述します。
以下は、"C:\Python\310"へインストールした場合の例です。
"\"は"\\"のように2つ続けて記載してください。
例 : eas_pyspark_python_path, "C:\\Python\\310\\python.exe"
ファイルを更新したら、Modelerは再起動してください。
③.各種ライブラリやアプリケーションの導入
準備編に記載しているライブラリやアプリケーションをPython環境に導入してください。
2. 動作確認用シンタックス
準備編の動作確認用のシンタックスをPython for Spark用に修正します。
拡張ノードで利用するシンタックスは、"Python for Spark"を選択してください。
①.形態素解析動作確認用シンタックス
Mecabを使った形態素解析のシンタックスについては、動作確認用は修正の必要はありません。
ネイティブPythonのシンタックスと同じ内容で動作します。
import MeCab
mecab = MeCab.Tagger()
sample = "私は幸せです。"
print(mecab.parse(sample ))
②.感情分析動作確認用シンタックス
こちらもほとんど修正は必要ありません。唯一修正したのは、
"import modelerpy"の記述をコメントアウトしたことのみです。
modelerpyはネイティブPython用のライブラリのため不要なためです。
#-------------------------------------------------------
# ライブラリインポート
#-------------------------------------------------------
# Modeler用ライブラリ
#import modelerpy
# モデル構築用 Piplineタスク
from transformers import pipeline
# モデルロード用
from transformers import AutoModelForSequenceClassification
# Bert Tokenizer
from transformers import BertJapaneseTokenizer
#-------------------------------------------------------
# 感情分析モデル前処理
#-------------------------------------------------------
# daigo/bert-base-japanese-sentiment は非公開になっている
# koheiduck/bert-japanese-finetuned-sentimentを使います
model = AutoModelForSequenceClassification.from_pretrained('koheiduck/bert-japanese-finetuned-sentiment')
# 東北大学の乾研究室が作成したtokenizerを使いますす
tokenizer = BertJapaneseTokenizer.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')
# 感情分析のPipline定義
nlp = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer)
sample = "私は、幸せです。"
result = nlp(sample)
# 結果を出力
print(result)
3. 感情分析用シンタックス - Python for Spark
さて続いて、感情分析用のシンタックスをPython for Spark用に修正します。
変更箇所は、大きく分けて以下の通りです。
Pythonであることに変わりはないので、Modelerとやり取りをする部分に修正が入るイメージです。
①. Modeler専用ライブラリ - Python for Spark用のライブラリを使います。
②. データモデル定義(スキーマ定義) - Python for Spark用に書き換えます。
③. データ入出力 - Python for Spark用に書き換えます。
①.感情分析用 Python for Spark全体
#-------------------------------------------------------
# ライブラリインポート
#-------------------------------------------------------
# Modeler用ライブラリ
import spss.pyspark.runtime
# SparkSQL用ライブラリ
from pyspark.sql.types import *
# モデル構築用 Piplineタスク
from transformers import pipeline
# モデルロード用
from transformers import AutoModelForSequenceClassification
# Bert Tokenizer
from transformers import BertJapaneseTokenizer
# Pandas
import pandas as pd
#-------------------------------------------------------
# Sparkコンテキスト生成
#-------------------------------------------------------
#Sparkコンテキスト生成
asCxt = spss.pyspark.runtime.getContext()
#-------------------------------------------------------
# 出力用データ定義処理
#-------------------------------------------------------
#スキーマ取得
outputSchema = asCxt.getSparkInputSchema()
#--------------------------------------------------------------
#出力用変数フィールドを追加
#--------------------------------------------------------------
#データモデルにフィールドを追加
field_name_1 = "label"
field_name_2 = "score"
#フィールド追加
outputSchema.fields.append(StructField(field_name_1,StringType(), nullable=True))
outputSchema.fields.append(StructField(field_name_2,FloatType(), nullable=True))
#スキーマをセット
asCxt.setSparkOutputSchema(outputSchema)
#-------------------------------------------------------
# 感情分析モデル作成
#-------------------------------------------------------
if not asCxt.isComputeDataModelOnly():
#---------------------------------------------------------
#入力データの取得
#---------------------------------------------------------
#スコアリングデータをSparkデータフレームで読み込み
modelerSPDF = asCxt.getSparkInputData()
#Pandasに変換
modelerPDF = modelerSPDF.toPandas()
#-------------------------------------------------------
# 感情分析モデル前処理
#-------------------------------------------------------
# daigo/bert-base-japanese-sentiment は非公開になっている
# koheiduck/bert-japanese-finetuned-sentimentを使います
model = AutoModelForSequenceClassification.from_pretrained('koheiduck/bert-japanese-finetuned-sentiment')
# 東北大学の乾研究室が作成したtokenizerを使いますす
tokenizer = BertJapaneseTokenizer.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')
# 感情分析のPipline定義
nlp = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer)
#-------------------------------------------------------
# 感情分析処理
#-------------------------------------------------------
# 感情分析実行 - Sentenceフィールドを分析
result = modelerPDF['Sentence'].apply(lambda x: nlp(x)[0])
#-------------------------------------------------------
# 感情分析結果まとめ処理
#-------------------------------------------------------
#入力データに結果フィールドを追加
#field_name_1 = "label"
#field_name_2 = "score"
# label と score のフィールドに分けてデータフレームに変換
df_result = pd.DataFrame(result.tolist(), columns=[field_name_1, field_name_2])
# 結果を結合
outputPDF = pd.concat([modelerPDF, df_result], axis=1)
#-------------------------------------------------------
# Modelerに戻す
#-------------------------------------------------------
#SparkSQLコンテキストを生成
sqlCtx = asCxt.getSparkSQLContext()
#出力スキーマに基づきSparkデータフレーム形式にデータを変換
outSPDF = sqlCtx.createDataFrame( outputPDF, schema = outputSchema )
#データをSPSSに戻す
asCxt.setSparkOutputData( outSPDF )
②.変更点
では、変更点をみていきます。
a. ライブラリインポート部分
ライブラリは、Python for Spark用のライブラリを読み込みます。
# Modeler用ライブラリ
import spss.pyspark.runtime
# SparkSQL用ライブラリ
from pyspark.sql.types import *
b. Sparkオブジェクトの作成
Modelerとやり取りするためのコンテキストを生成する必要があります。
#Sparkコンテキスト生成
asCxt = spss.pyspark.runtime.getContext()
c. データモデル定義
データモデル定義もPython for Spark用に変更します。ただ文法が違うだけで、やっていることは同じです。
#-------------------------------------------------------
# 出力用データ定義処理
#-------------------------------------------------------
#スキーマ取得
outputSchema = asCxt.getSparkInputSchema()
#--------------------------------------------------------------
#出力用変数フィールドを追加
#--------------------------------------------------------------
#データモデルにフィールドを追加
field_name_1 = "label"
field_name_2 = "score"
#フィールド追加
outputSchema.fields.append(StructField(field_name_1,StringType(), nullable=True))
outputSchema.fields.append(StructField(field_name_2,FloatType(), nullable=True))
#スキーマをセット
asCxt.setSparkOutputSchema(outputSchema)
大きく異なる点は、このデータモデル定義処理を記述する場所です。
一番最後にModelerに分析結果データを戻すのですが、そこでスキーマ定義を参照する必要があるので、「if not asCxt.isComputeDataModelOnly():」のif文の外側で処理を記述している点です。
本来なら、以下のように記述するところを
ーーーーーーーーーーーーーーーーーーーーーーー
if asCxt.isComputeDataModelOnly():
[データモデル定義処理]
else:
[感情分析処理]
ーーーーーーーーーーーーーーーーーーーーーーー
以下のように変えています。(条件も not が入っていることに注意してください)
ーーーーーーーーーーーーーーーーーーーーーーー
[データモデル定義処理]
if not asCxt.isComputeDataModelOnly():
[感情分析処理]
ーーーーーーーーーーーーーーーーーーーーーーー
d. データ入力処理
Sparkデータフレームでの入力となるため、入力後にPandasへ変換します。
#---------------------------------------------------------
#入力データの取得
#---------------------------------------------------------
#スコアリングデータをSparkデータフレームで読み込み
modelerSPDF = asCxt.getSparkInputData()
#Pandasに変換
modelerPDF = modelerSPDF.toPandas()
e. データ出力処理
Modelerへデータを戻すためにPandasからSparkデータフレーム変換しています。
#-------------------------------------------------------
# Modelerに戻す
#-------------------------------------------------------
#SparkSQLコンテキストを生成
sqlCtx = asCxt.getSparkSQLContext()
#出力スキーマに基づきSparkデータフレーム形式にデータを変換
outSPDF = sqlCtx.createDataFrame( outputPDF, schema = outputSchema )
#データをSPSSに戻す
asCxt.setSparkOutputData( outSPDF )
4. 形態素解析用シンタックス - Python for Spark
形態素解析用のシンタックスをPython for Spark用に修正します。
変更箇所は、感情分析の場合と同じでModelerとのやり取り部分です。そのため詳細は省略します。
シンタックス全体は以下の通りです。
#------------------------------------------------------
# ライブラリ定義
#------------------------------------------------------
# Modeler用ライブラリ
import spss.pyspark.runtime
# SparkSQL用ライブラリ
from pyspark.sql.types import *
# 形態素解析用 MeCab - 事前にインストールしておくこと
import MeCab
# Pandas - 事前にインストールしておくこと
import pandas as pd
# MeCabのTaggerを作成(chasen辞書を指定)
mecab = MeCab.Tagger("-Ochasen")
#-------------------------------------------------------
# Sparkコンテキスト生成
#-------------------------------------------------------
#Sparkコンテキスト生成
asCxt = spss.pyspark.runtime.getContext()
#-------------------------------------------------------
# 出力用データ定義処理
#-------------------------------------------------------
#スキーマ取得
outputSchema = asCxt.getSparkInputSchema()
#--------------------------------------------------------------
#出力用変数フィールドを追加
#--------------------------------------------------------------
#データモデルにフィールドを追加
#形態素解析アウトプット用のフィール定義
field_name_1 = "表層形"
field_name_2 = "原形"
field_name_3 = "品詞"
#フィールド追加
outputSchema.fields.append(StructField(field_name_1,StringType(), nullable=True))
outputSchema.fields.append(StructField(field_name_2,StringType(), nullable=True))
outputSchema.fields.append(StructField(field_name_3,StringType(), nullable=True))
#スキーマをセット
asCxt.setSparkOutputSchema(outputSchema)
#----------------------------------------------------
# 形態素解析用関数
#----------------------------------------------------
def mecab_parse(text):
# mecabで形態素解析を実行
parsed_text = mecab.parse(text)
# 結果を行ごとに分割
lines = parsed_text.splitlines()
# 最後のEOF行を除外
lines = [line for line in lines if line != 'EOS']
# 結果をリストに変換
results = []
for line in lines:
# 各行をタブで分割してリストに変換
parts = line.split("\t")
# タブ区切りの要素が正しく含まれているか確認
if len(parts) >= 4:
surface = parts[0] # 表層形
base = parts[2] # 原形
pos = parts[3] # 品詞
results.append((surface, base, pos))
#結果を戻す
return results
#-------------------------------------------------------
# 形態素解析処理
#-------------------------------------------------------
if not asCxt.isComputeDataModelOnly():
#---------------------------------------------------------
#入力データの取得
#---------------------------------------------------------
#スコアリングデータをSparkデータフレームで読み込み
modelerSPDF = asCxt.getSparkInputData()
#Pandasに変換
modelerPDF = modelerSPDF.toPandas()
#-------------------------------------------------------
# 形態素解析処理実行
#-------------------------------------------------------
# DataFrameの「Sentence」カラムに対して形態素解析を実行し、解析結果を追加
modelerPDF['parsed'] = modelerPDF['Sentence'].apply(mecab_parse)
#-------------------------------------------------------
# 形態素解析処理結果をまとめる
#-------------------------------------------------------
# parsed列を縦持ちにする(リストを展開する)
modelerPDF_exploded = modelerPDF.explode('parsed')
#field_name_1 = "表層形"
#field_name_2 = "原形"
#field_name_3 = "品詞"
# parsed列を「表層形」「原形」「品詞」に分割
modelerPDF_exploded[[ field_name_1, field_name_2, field_name_3]] = pd.DataFrame(modelerPDF_exploded['parsed'].tolist(), index=modelerPDF_exploded.index)
# 不要なカラムを削除
modelerPDF_exploded = modelerPDF_exploded.drop(columns=['parsed'])
#-------------------------------------------------------
# Modelerに戻す
#-------------------------------------------------------
#SparkSQLコンテキストを生成
sqlCtx = asCxt.getSparkSQLContext()
#出力スキーマに基づきデータフレーム形式にデータを変換
outSPDF = sqlCtx.createDataFrame( modelerPDF_exploded, schema = outputSchema )
#データをSPSSに戻す
asCxt.setSparkOutputData( outSPDF )
5. コレスポンデンス分析用シンタックス - Python for Spark
ついでに、コレスポンデンス分析用のシンタックスもRからPython for Spark用に変更してみます。
前回の記事ではRで記述しました。
コレスポンデンス分析は拡張の出力ノードに記載します。
前提として、コレスポンデンス分析ではmcaを使います。
必要なライブラリは、以下の2つになります。それぞれ事前に導入をお願いします。
・mca
・matplotlib
C:\Python\310> python.exe -m pip install mca matplotlib
以下が、シンタックス全体になります。
#-------------------------------------------------------
# ライブライインポート
#-------------------------------------------------------
# Modeler用ライブラリ
import spss.pyspark.runtime
# コレスポンデンス分析用 MCA
import mca
# 描画用
import matplotlib.pyplot as plt
# フォント表示用
import matplotlib.font_manager as fm
# Pandas
import pandas as pd
#-------------------------------------------------------
# データの入力
#-------------------------------------------------------
#Sparkコンテキスト生成
asCxt = spss.pyspark.runtime.getContext()
#データをSparkデータフレームで読み込み
modelerSPDF = asCxt.getSparkInputData()
#Pandasに変換
modelerPDF = modelerSPDF.toPandas()
#-------------------------------------------------------
# データ加工
#-------------------------------------------------------
# 'label' 列をインデックスに設定
modelerPDF.set_index('label', inplace=True)
#-------------------------------------------------------
# コレスポンデンス分析実行
#-------------------------------------------------------
# コレスポンデンス分析の実行
mca_model = mca.MCA(modelerPDF)
# 主成分得点の取得
rows = mca_model.fs_r(N=2) # 行ラベルの座標
cols = mca_model.fs_c(N=2) # 列ラベルの座標
#-------------------------------------------------------
# 描画処理
#-------------------------------------------------------
# 日本語フォントを指定(Windows環境の例)
plt.rcParams['font.family'] = 'MS Gothic' # または 'Meiryo' や 'Yu Gothic'
# 結果の可視化 (バイプロット: 行と列の関係を可視化)
plt.figure(figsize=(10, 7))
# 行のプロット(ラベル)
plt.scatter(rows[:, 0], rows[:, 1], color='blue', label='Rows')
for i, txt in enumerate(modelerPDF.index):
plt.annotate(txt, (rows[i, 0], rows[i, 1]), color='blue')
# 列のプロット(特徴)
plt.scatter(cols[:, 0], cols[:, 1], color='red', label='Columns')
for i, txt in enumerate(modelerPDF.columns):
plt.annotate(txt, (cols[i, 0], cols[i, 1]), color='red')
plt.title('MCA Biplot')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.legend()
plt.grid(True)
plt.show()
上記シンタックスを実行すると下記画面にグラフが表示されます。
"Python for Sparkではサポートされていません"と表示されていますが、一応、拡張出力結果でグラフは表示できています。
Rの場合はグラフの表示範囲が狭かったのかもしれませんね。若干描画内容が異なります。
おまけ ネイティブPythonの場合
ネイティブPythonで記述した時は、下記のように別画面でグラフが表示されます。(Modelerの拡張出力結果画面ではありません。)
ここで、別画面で表示されたグラフを閉じないと、Modelerがずっと実行中となる(実行中ウィンドウが消えない)ので確認できたら、グラフは閉じましょう。
でネイティブPythonの場合は、データ入力部分のみ以下のように変更することで動くようになります。
#データ入力を以下にようにかえてください。
#Pandasで入力できるので、いちいち変換はいりません。
import modelerpy
modelerPDF = modelerpy.readPandasDataframe()
5. まとめ
Modeler v18.4以前のバージョンでも参考になるようにPython for Spark用のシンタックスを紹介しました。コレスポンデンス分析についてもPythonで記述できるように追加で紹介しました。
Modelerの標準機能では実現が難しい場合でも、拡張ノードを使えば補完が可能です。ぜひ参考にして様々な分析に挑戦してください!!
参考
SPSS Modeler ノードリファレンス目次
SPSS Modeler 逆引きストリーム集
SPSS funさん記事集
SPSS連載ブログバックナンバー
SPSSヒモトクブログ