目的
今回作成した処理では、SPSS Modelerで実行したデータ分析の結果を自動でGoogle Chatに通知する仕組みを構築することを目的としています。
具体的には、SPSS Modelerの処理結果をAWS Lambdaで受け取り、LambdaからGoogle ChatのWebhook機能を使用してチャットへ通知を行う仕組みとなっています。
対象読者
本記事の内容は、以下の方を対象としています:
- AWS: Lambda、IAMの基本的な使用方法を理解している人
- SPSS Modeler: python拡張ノードの使用やパラメータを用いたストリームの作成をしたことがある人
目次
1. 事前準備
2. Google Chatの設定
3. Lambda関数の作成
4. SPSS Modelerストリームの作成
5. テスト実行
6. あとがき
1. 事前準備
今回の環境では、SPSS Modelerの拡張ノード内でLambda関数の呼び出しを行います。
そのため、事前にローカル環境のSPSS ModelerにLamda関数呼び出し用のライブラリとしてboto3をインストールしています。
Modeler Client環境へのboto3のインストール方法に関しては、下記記事「boto3を使用したSPSS Modeler上でのS3への出力方法について」の「2章 SPSS Modeler環境の設定内容について」内の「各種ライブラリのインストール」をご参照ください。
2. Google Chatの設定
初めに、Webhookを使用してGoogle Chatへ通知を行うための設定を行います。
Google Chatで通知を受け取りたいスペースを開き、スペース名横のドロップダウンリストを展開して、「アプリと統合」を選択します。
画面右下の「Webhookを追加」をクリックすると、Webhook作成のポップアップが表示されるため、任意の名前を入力して「保存」をクリックします。(URLは自動生成されるため、入力しなくても問題ありません。)
Webhook用のURLが作成されたことを確認します。ここで作成しURLに、LambdaからHTTP経由でPOSTリクエストを送信する形になります。
3. Lambda関数の作成
次に、ストリーム実行結果の受け取りとGoogle Chatへの通知を行うためのLambda関数を作成します。
新規でLambda関数を作成し、下記コードを記載後「Deploy」をクリックします。
import json
import os
import requests
def lambda_handler(event, context):
# イベント内容のログ出力
print("Received event:", json.dumps(event, indent=4, ensure_ascii=False))
# Webhook URLを取得
webhook_url = os.getenv('GOOGLE_CHAT_WEBHOOK_URL')
if not webhook_url:
return {
"statusCode": 500,
"body": json.dumps({"error": "GOOGLE_CHAT_WEBHOOK_URL is not set"})
}
if "body" in event:
body = json.loads(event["body"])
else:
body = event
# SPSS Modelerからの分析結果の取得
spss_result = body.get("message")
model_results = body.get("model_results")
print("spss_result:", spss_result)
print("model_results:", model_results)
# 出力用データ加工
table_header = "顧客番号\t$R-成約フラグ\t$RRP-成約フラグ\n" + "-" * 40
table_rows = "\n".join([f"{row['customerNo']}\t{row['R-constractedFlag']}\t{row['RRP-constractedFlag']:.3f}" for row in model_results])
# 送信データ作成
payload = {
"text": f"📊 **SPSS Modeler 処理完了通知**\n\n{spss_result}\n\n```\n{table_header}\n{table_rows}\n```"
}
# Google Chatへリクエスト送信
headers = {"Content-Type": "application/json"}
response = requests.post(webhook_url, data=json.dumps(payload), headers=headers)
# レスポンス確認
if response.status_code == 200:
return {
"statusCode": 200,
"body": json.dumps({"message": "通知を送信しました"})
}
else:
return {
"statusCode": response.status_code,
"body": json.dumps({"error": "Google Chatへの送信に失敗しました", "details": response.text})
}
上記コードではWebhook URLを環境変数から取得する形としているため、環境変数の設定を行います。 設定タブから「環境変数」にアクセスし、「編集」をクリックします。
「環境変数の追加」をクリックして、キーに「GOOGLE_CHAT_WEBHOOK_URL」、値に2. Google Chatの設定で作成したWebhook URLを入力後、「保存」をクリックします。
続けて、作成したLambda関数をSPSS Modelerから呼び出す際に使用するLambda関数のURLを発行します。
設定タブの「関数URL」から「関数URLを作成」をクリックします。
認証タイプで「AWS_IAM」を選択して「保存」をクリックし、関数URLが作成されたことを確認します。
SPSS ModelerからLambda関数を呼び出す際には、このURLへIAM認証を使用してアクセスする形となります。
4. SPSS Modelerストリームの作成
最後に、SPSS ModelerでLambda関数を呼び出す処理を作成します。
今回は既存のスコアリング用ストリームの最後に、3. Lambda関数の作成で作成したLambda関数を呼び出し、ストリームのスコアリング結果を作成したLamda関数へ渡す処理を追加しています。
Lambda関数のURLへのアクセスではAWS_IAMを選択しているため、ストリームのパラメータとして「access_key」「secret_access_key」「lambda_url」を設定し、拡張ノードでLamda関数を呼び出す際の認証情報を外部から実行時に入力する形で作成します。
拡張の出力ノードでは「Python」を選択し、以下のコードを記載します。
※Lambda関数のリージョンに関してはコード内に直書きしているため、必要に応じて変更してください。
import json
import requests
from requests_aws4auth import AWS4Auth
import boto3
import modelerpy
# 分析結果の取得
modelerData = modelerpy.readPandasDataframe()
customer_data = modelerData[["customerNo","$R-constractedFlag","$RRP-constractedFlag"]]
customer_data = customer_data.rename(columns={
"$R-constractedFlag": "R-constractedFlag",
"$RRP-constractedFlag": "RRP-constractedFlag"
}).to_dict(orient="records")
#print(customer_data)
# パラメータ入力値の取得
try:
access_key = modelerData.loc[0, "access_key"]
secret_access_key = modelerData.loc[0, "secret_access_key"]
lambda_url = modelerData.loc[0, "lambda_url"]
if not access_key or not secret_access_key or not lambda_url:
raise ValueError("AWS認証情報が取得できませんでした。")
except (KeyError, IndexError, ValueError) as e:
print(f"Error: {e}")
# 認証オブジェクト作成
aws_region = "us-east-1"
aws_service = "lambda"
auth = AWS4Auth(access_key, secret_access_key, aws_region, aws_service, session_token=None)
# 送信データ作成
payload = {
"message": "SPSS Modeler の処理が完了しました。成約率の高い上位5人は以下の通りです。",
"model_results": customer_data
}
headers = {"Content-Type": "application/json"}
# Lambda関数URLへリクエスト送信
response = requests.post(lambda_url, auth=auth, json=payload, headers=headers)
# レスポンス確認
print("Status Code:", response.status_code)
print("Response:", response.text)
5. テスト実行
作成したストリームを実行して、一連の処理に関する稼働確認を行います。
拡張の出力ノード実行時にストリームのパラメータとして「access_key」「secret_access_key」「lambda_url」を入力し、「OK」を押下して実行します。
※Lambda関数実行権限のある「access_key」「secret_access_key」を入力してください。
処理完了後に、Google Chatの該当スペースにスコアリング結果の通知が届いていることを確認します。
6. あとがき
今回構築した仕組みでは、SPSS Modelerの処理完了通知をリアルタイムで複数人へ送ることができるため、これによって分析結果共有の迅速化が期待できるかと思います。
Webhookの機能自体はSlackやTeamsなどの様々なツールで実装されているため、個々の環境に合わせたカスタマイズが可能であり、分析結果の展開の自動化や業務効率化の一つとしては有効だと感じました。
また、Lambda関数の処理部分をカスタマイズすることで、通知フォーマットの変更や、特定の条件を満たした場合にのみ通知するフィルタリング機能を追加することも可能であるため、結果通知以外にも様々な活用方法を見出していけるのではないかと思います。