目的
今回作成した処理では、エンドユーザーがModeler Clientを直接操作することなく、チャット形式の対話で必要なパラメータを取得し、そのパラメータを使ってストリームを自動実行して、結果を取得できるようにすることを目的としています。
具体的には、Amazon Lexを活用したチャットボットを通じてユーザーから必要な入力をAWS Lambdaで受け取り、取得した入力値をパラメータとして、EC2上で稼働しているSPSS Modelerに引き渡し、ストリームを動的に実行する仕組みとなっています。
対象読者
本記事の内容は、以下の方を対象としています:
- AWS: LexやLambda、EC2、IAMの基本的な使用方法を理解している人
- SPSS Modeler: パラメータを用いたストリームの作成やModeler Batchの使用方法を理解している人
構成図
今回作成した環境の構成図は以下の通りです。
EC2はWindows Server2019、Modeler Server・Modeler Batchは18.3を使用しています。
記事内容
今回作成した処理では、Lexのチャットから対象店舗名および集計期間(開始年月日・終了年月日)を取得し、その値をLambdaからEC2上のSPSS Modelerに引数として渡すことで、対象店舗の特定期間の売上集計結果を自動的に出力する形となっています。
今回の処理では、以下の内容を実施しています:
- LexのLambda連携
- Lexでの入力情報をLambdaでパラメータとして保持
- LambdaからEC2上のSPSS Modelerのストリームを引数付きで実行
本処理に関する記事は、前後編の2部構成となっています。
本記事では、
- LambdaからEC2上のSPSS Modelerのストリームを引数付きで実行
に関して解説していきます。
目次
1. 環境について
2. SPSS Modelerストリーム・バッチ作成
3. Lambda関数作成
4. Lambda関数テスト実行
1. 環境について
今回の環境では、LambdaからEC2上のModeler Batchを実行する際に、AWS Systems Managerを使用します。
そのため、事前にEC2へSSM Agentをインストールし、SSMコマンドが使用可能な状態としておく必要があります。
また、今回使用するAWSサービスへのIAMポリシーの設定については、事前に完了している状態となります。
2. SPSS Modelerストリーム・バッチ作成
はじめに、Modeler Clientで売上集計を実施するストリームを作成します。
ここでは、ストリームのパラメータとして「shop_name」「start_date」「end_date」を作成し、集計結果はcsvとして所定のフォルダ配下に出力するように設定しています。
あわせて、作成したストリームをModeler Batchで実行するためのバッチファイルを作成します。
今回作成したバッチファイルの内容は、以下の通りです。
ストリーム内で設定したパラメータの値に関しては、外部から引数として渡す形での実行を想定しています。
rem ***Login***
set USERNAME=Administrator
set PASSWORD=XXXXXXXXXXX
set HOSTNAME=XXX.XX.XX.XXX
rem ***Parameter***
set SHOP_NAME=%~1
set START_DATE=%~2
set END_DATE=%~3
rem ***LogFile***
set LOGFILE="D:\test\apl\log\testbat.log"
rem ****SPSSModelerSetting***
set STR="D:\test\apl\str\sample_stream.str"
set EXE_CMD="D:\SPSS\modelerbatch_install_folder\bin\clemb.exe"
set SRVCONN= -hostname %HOSTNAME% -password "%PASSWORD%" -username %USERNAME% -port 28060
rem ****StreamExecution***
%EXE_CMD% %SRVCONN% -stream %STR% -execute -appendlog -log %LOGFILE% -Pshop_name=%SHOP_NAME% -Pstart_date=%START_DATE% -Pend_date=%END_DATE%
3. Lambda関数作成
続いて、2. SPSS Modelerストリーム・バッチ作成で作成したバッチファイルを呼び出すためのLambda関数を作成します。
新規でLambda関数を作成し、下記コードを記載後「Deploy」をクリックします。
import boto3
import json
import time
from datetime import datetime
# SSMクライアント作成
ssm = boto3.client('ssm')
# インスタンスID指定
instance_id = 'i-XXXXXXXXXXXXXXX'
def lambda_handler(event, context):
# イベントデータのログ出力(デバッグ用)
print("Received event:", json.dumps(event, indent=2))
# lex入力情報取得
shop_name = event.get('shop_name')
start_date = event.get('start_date')
end_date = event.get('end_date')
# 引数チェック
if not (shop_name and start_date and end_date):
return {
'statusCode': 400,
'body': json.dumps('必要な引数が不足しています。')
}
try:
# 実行するコマンドを文字列として作成
command_string = r'D:\test\apl\bat\sample_str_exe.bat {0} {1} {2}'.format(start_date, end_date)
# バッチ実行用のSSMコマンド送信
response = ssm.send_command(
InstanceIds=[instance_id],
DocumentName='AWS-RunPowerShellScript',
Parameters={
'commands': [command_string],
'workingDirectory': [r'C:\Users\Administrator\temp']
}
)
# バッチ実行完了待機
command_id = response['Command']['CommandId']
for _ in range(30):
time.sleep(1)
invocation_response = ssm.get_command_invocation(
CommandId=command_id,
InstanceId=instance_id
)
status = invocation_response['Status']
# 実行ステータスを確認
if status == 'Success':
return {
"statusCode": 200,
"body": json.dumps({
"status": "success",
"message": invocation_response['StandardOutputContent']
}, default=str)
}
elif status in ['Failed', 'Cancelled']:
return {
"statusCode": 500,
"body": json.dumps({
"status": "failure",
"message": invocation_response['StandardErrorContent']
}, default=str)
}
# タイムアウトした場合
return {
"statusCode": 500,
"body": json.dumps({"error": "指定した待機時間内にコマンドが完了しませんでした。"})
}
except Exception as e:
return {
"statusCode": 500,
"body": json.dumps({"error": str(e)})
}
実装したコードの内、ポイントとなる箇所について以降で解説します。
ストリームパラメータの取得
バッチ実行時の引数として使用するLexでのユーザ入力値を取得します。
ユーザ入力値を確認し、不足している場合はエラーを返すようにしています。
# lex入力情報取得
shop_name = event.get('shop_name')
start_date = event.get('start_date')
end_date = event.get('end_date')
# 引数チェック
if not (shop_name and start_date and end_date):
return {
'statusCode': 400,
'body': json.dumps('必要な引数が不足しています。')
}
実行コマンドの作成
「2. SPSS Modelerストリーム・バッチ作成」で作成したsample_str_exe.batを実行するためのコマンドを文字列として作成します。
「SSMコマンドの送信」のcommandsに直接記載しても問題ないと思うのですが、外出ししないとエラーになったので事前に文字列として作成しています。
# 実行するコマンドを文字列として作成
command_string = r'D:\test\apl\bat\sample_str_exe.bat {0} {1} {2}'.format(shop_name, start_date, end_date)
SSMコマンドの送信
SSMコマンドを使用して、EC2インスタンス上のバッチを実行します。
※SSMコマンドを実行するためには、事前に使用ポートの解放、SSM実行のアクセス許可付与、SSM Agentのインストールを実施する必要があります。
# バッチ実行用のSSMコマンド送信
response = ssm.send_command(
InstanceIds=[instance_id],
DocumentName='AWS-RunPowerShellScript',
Parameters={
'commands': [command_string],
'workingDirectory': [r'C:\Users\Administrator\temp']
}
)
実行・結果確認
ssm.get_command_invocationで、コマンドの実行結果を取得します。
今回のストリームは10秒程度で処理が終わるため、コマンド終了までの最大待機時間は30秒で設定しています。
成功時にはHTTPステータスコード200を返し、失敗時には500とエラーメッセージを返すようにしています。
# バッチ実行完了待機
command_id = response['Command']['CommandId']
for _ in range(30):
time.sleep(1)
invocation_response = ssm.get_command_invocation(
CommandId=command_id,
InstanceId=instance_id
)
status = invocation_response['Status']
# 実行ステータスを確認
if status == 'Success':
return {
"statusCode": 200,
"body": json.dumps({
"status": "success",
"message": invocation_response['StandardOutputContent']
}, default=str)
}
elif status in ['Failed', 'Cancelled', 'TimedOut']:
return {
"statusCode": 500,
"body": json.dumps({
"status": "failure",
"message": invocation_response['StandardErrorContent']
}, default=str)
}
# タイムアウトした場合
return {
"statusCode": 500,
"body": json.dumps({"error": "指定した待機時間内にコマンドが完了しませんでした。"})
}
4. Lambda関数テスト実行
デプロイが完了したら、Lambda関数のテストタブで「shop_name」「start_date」「end_date」を指定して「テスト」をクリックし、成功することを確認します。
Lambda関数の実行完了を確認後、ストリーム実行結果のファイルが指定したフォルダに作成されていることを確認します。
以上で、SPSS Modelerで作成したストリームを実行するバッチファイルに対し、Lambdaからパラメータを渡して実行する処理が完了しました。
次回の記事では、今回作成したLambda関数に、Lexと連携して取得したユーザー入力値を渡す仕組みについて解説します。