記事内容
本記事では、Lexを使用して取得したユーザーの入力値をLambda関数で保持し、その値を前回作成したSPSS Modelerのストリーム実行用バッチファイルを呼び出すLambda関数にパラメータとして渡す仕組みについて解説します。
※前回の記事内容については、以下をご参照ください。
目次
1. Amazon Lexの設定
2. AWS Lambdaの設定
3. Lex・Lambda連携
4. Lex・Lambda連携テスト実行
5. あとがき
1. Amazon Lexの設定
はじめに、Lexでチャットボットを作成します。
今回、Lexではユーザーから以下の情報を取得するチャットボットを作成します:
- 店舗名
- 集計開始年月日
- 集計終了年月日
Lexのサービスにアクセスし、「ボットを作成」から日本語で新規ボットを作成します。
※今回はLexでのボット作成時の詳細設定は割愛します。
ボットの作成が完了すると、インテント作成ページに移ります。
はじめに、「インテントの詳細」で任意のインテント名を入力します。
次に、サンプル発話からインテントを呼び出すためのワードとして「店舗売上を集計したいです」を追加します。
続けて、ユーザーが回答として入力する、売上集計時に必要となる情報をスロットに登録します。
今回は「ShopName・StratDate・EndDate・Confirmation」の4つを登録しています。
各値に対しては、適切なスロットタイプを選択する必要があります。
最後に、コードフックオプションで「初期化と検証にLambda関数を使用」を選択します。
2. AWS Lambdaの設定
Lexでのボットの作成が完了したら、ボットから呼び出されるLambda関数を作成します。
新規でLambda関数を作成し、下記コードを記載後「Deploy」をクリックします。
import json
import boto3
# Lambdaクライアントを作成
lambda_client = boto3.client('lambda')
# インスタンスID
instance_id = 'i-XXXXXXXXXXXXXXXXX'
def lambda_handler(event, context):
# デバッグ用ログ出力
print("Received event:", json.dumps(event, indent=2))
# lex設定情報取得
intent_name = event['sessionState']['intent']['name']
slots = event['sessionState']['intent']['slots']
user_input = event['inputTranscript']
# スロットが未入力の場合、対応する質問を実施
if not slots or not get_slot_value(slots, 'ShopName'):
return elicit_slot_response(intent_name, slots, 'ShopName', "売上データの集計を実施します。集計対象の店舗名を教えてください。")
shop_name = get_slot_value(slots, 'ShopName')
if not get_slot_value(slots, 'StartDate'):
return elicit_slot_response(intent_name, slots, 'StartDate', "集計開始日付を教えてください。")
start_date = get_slot_value(slots, 'StartDate')
if not get_slot_value(slots, 'EndDate'):
return elicit_slot_response(intent_name, slots, 'EndDate', "集計終了日付を教えてください。")
end_date = get_slot_value(slots, 'EndDate')
if not get_slot_value(slots, 'Confirmation'):
message = f"{shop_name}店の{start_date}から{end_date}までの売上の集計でよろしいですか?"
return elicit_slot_response(intent_name, slots, 'Confirmation', message)
confirmation = get_slot_value(slots, 'Confirmation').lower()
# test-kick-bat呼び出し
if confirmation in ['はい', 'yes', 'よいです']:
try:
payload = {'shop_name': shop_name, 'start_date': start_date, 'end_date': end_date}
response = lambda_client.invoke(
FunctionName='test-kick-bat',
InvocationType='RequestResponse',
Payload=json.dumps(payload)
)
raw_response = response['Payload'].read().decode('utf-8')
response_payload = json.loads(raw_response)
# レスポンス確認
if 'body' in response_payload:
body = json.loads(response_payload['body'])
status = body.get('status')
if status == 'success':
response_text = f"{shop_name}店の{start_date}から{end_date}までの売上データを集計しました。集計結果のcsvファイルは、SPSSサーバ上の 'D:\\test\\apl\\output' 配下に作成されています。"
else:
response_text = f"集計中にエラーが発生しました: {body.get('message')}"
else:
response_text = "集計処理中に予期しないエラーが発生しました。"
except Exception as e:
print("Lambda呼び出しエラー:", str(e))
response_text = "売上データの集計中にエラーが発生しました。"
else:
response_text = "売上データの集計をキャンセルします。"
return close_response(intent_name, slots, response_text, 'Failed')
return close_response(intent_name, slots, response_text, 'Fulfilled')
# スロット値を取得
def get_slot_value(slots, slot_name):
if slots is None:
return None
if slot_name in slots and slots[slot_name] and slots[slot_name].get('value'):
return slots[slot_name]['value']['interpretedValue']
return None
# スロット値入力を促すレスポンスを生成
def elicit_slot_response(intent_name, slots, slot_to_elicit, message):
return {
'sessionState': {
'dialogAction': {
'type': 'ElicitSlot',
'slotToElicit': slot_to_elicit
},
'intent': {
'name': intent_name,
'slots': slots,
'state': 'InProgress'
}
},
'messages': [{'contentType': 'PlainText', 'content': message}]
}inputTranscript
# 対話を終了するレスポンスを生成
def close_response(intent_name, slots, message, fulfillment_state):
return {
'sessionState': {
'dialogAction': {
'type': 'Close'
},
'intent': {
'name': intent_name,
'slots': slots,
'state': fulfillment_state
},
'fulfillmentState': fulfillment_state
},
'messages': [{'contentType': 'PlainText', 'content': message}]
}
実装したコードの内、ポイントとなる箇所について以降で解説します。
Lambda呼び出しと集計処理
以下の部分で、前回作成したSPSS Modelerのストリーム実行を行うLambda関数(test-kick-bat)の呼び出しを実施しています。
今回のストリームは実行にそれほど時間がかからないため、InvocationTypeをRequestResponseに指定して、同期実行を実施しています。
呼び出したLambda関数に関しては、実行結果のステータスを取得し、成功した場合は集計処理によって作成されたファイルパスを出力し、失敗した場合はエラーメッセージを出力しています。
# test-kick-bat呼び出し
if confirmation in ['はい', 'yes', 'よいです']:
try:
payload = {'shop_name': shop_name, 'start_date': start_date, 'end_date': end_date}
response = lambda_client.invoke(
FunctionName='test-kick-bat',
InvocationType='RequestResponse',
Payload=json.dumps(payload)
)
raw_response = response['Payload'].read().decode('utf-8')
response_payload = json.loads(raw_response)
if 'body' in response_payload:
body = json.loads(response_payload['body'])
status = body.get('status')
if status == 'success':
response_text = f"{shop_name}店の{start_date}から{end_date}までの売上データを集計しました。集計結果のcsvファイルは、SPSSサーバ上の 'D:\\test\\apl\\output' 配下に作成されています。"
else:
response_text = f"集計中にエラーが発生しました。: {body.get('message')}"
else:
response_text = "集計処理中に予期しないエラーが発生しました。"
except Exception as e:
print("Lambda呼び出しエラー:", str(e))
response_text = "売上データの集計中にエラーが発生しました。"
3. Lex・Lambda連携
作成したLambda関数をLexのチャットボットに設定します。
Lexの「ボット」>「test-bot(作成ボット名)」>「エイリアス」>「TestBotAlias(作成エイリアス名)」>「Japanese (Japan)」から、LexとLambdaを紐づけることが可能です。
4. Lex・Lambda連携テスト実行
最後に、Lexから一連の処理が正しく稼働することを確認します。
作成したインテントにアクセスし、「テスト」をクリック後、サンプル発話として登録したセンテンスを入力して、意図した流れでチャットが進むことを確認します。
Lexでのテスト実施後、チャットの返信に記載されたEC2上のパスにストリーム実行結果のファイルが作成されていることを確認します。
5. あとがき
LambdaからSSMを使用してModeler Batchを実行する仕組みに関しては、ストリーム実行の汎用性を高めることができ、AWS上でSPSS Modelerを使用した分析環境を構築する際にはとても実用的だと感じています。
今回はLexのチャットボットを通じてユーザーパラメータの入力を取得しましたが、他のAWSサービスを活用して入力方法や連携プロセスをさらに柔軟に拡張することで、よりユーザーのニーズに応えた多様な分析環境の実現が可能になると思います。