機能
障害発生時間帯、障害発生内容を入力すると指定した時間帯のログを分析して結果を表示してくれる。
アーキテクチャ
EC2でStreamlitを起動。サーバやネットワークのログをCloudWatchLogsに取得しておく。
2つの処理があり1つ目の処理でCloudWatchLogsから指定した時間帯のログを取得する。2つ目の処理で取得したログ情報と障害発生内容をBedrockにインプットして分析結果を取得する。といった流れです。
Lambdaのコード
import boto3
import json
from datetime import datetime, timedelta
def lambda_handler(event, context):
# CloudWatchのクライアントを作成
logs_client = boto3.client('logs')
# イベントから日時を取得
evt_start_date = event["evt_start_date"]
evt_end_date = event["evt_end_date"]
# UTCに9時間ずらして設定
start_time = int((datetime.strptime(evt_start_date, '%Y-%m-%d %H:%M:%S') - timedelta(hours=9)).timestamp() * 1000)
end_time = int((datetime.strptime(evt_end_date, '%Y-%m-%d %H:%M:%S') - timedelta(hours=9)).timestamp() * 1000)
# ロググループ名を指定
log_group_name = 'linux-syslog'
try:
# ログイベントを取得
response = logs_client.filter_log_events(
logGroupName=log_group_name,
startTime=start_time,
endTime=end_time,
limit=30 # 必要に応じて調整
)
# ログメッセージを抽出
log_events = [
{
'timestamp': event['timestamp'],
'message': event['message'],
'logStreamName': event['logStreamName']
}
for event in response['events']
]
return {
'statusCode': 200,
'body': json.dumps({
'logEvents': log_events,
'startTime': evt_start_date,
'endTime': evt_end_date
}),
'headers': {
'Content-Type': 'application/json'
}
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({
'error': str(e)
}),
'headers': {
'Content-Type': 'application/json'
}
}
APIGatewayの設定
REST APIを作成後、GETメソッドを作成し、上記のLambda関数を指定する。
- マッピングテンプレートで変数をマッピング
#set($inputRoot = $input.path('$'))
{
"evt_start_date":"$input.params('evt_start_date')",
"evt_end_date":"$input.params('evt_end_date')",
"evt_content":"$input.params('evt_content')"
}
Streamlitのコード
import streamlit as st
import requests
from datetime import datetime
import json
import boto3
# タイトル
st.title("自動障害対応君")
# 入力値
start_date = st.date_input("開始日")
end_date = st.date_input("終了日")
start_time = st.time_input("開始時間")
end_time = st.time_input("終了時間")
evt_content = st.text_input("イベント内容")
# ボタンがクリックされたときにAPIを実行
if st.button('ログ分析'):
#### CloudWatch用APIを実行 ####
evt_start_date = f"{start_date} {start_time}"
evt_end_date = f"{end_date} {end_time}"
# 終了日時が開始日時より前の場合のチェック
if datetime.combine(start_date, start_time) >= datetime.combine(end_date, end_time):
st.error("終了日時は開始日時より後に設定してください。")
else:
# URLエンコード(スペースを %20 に変換)
evt_start_date_encoded = requests.utils.quote(evt_start_date)
evt_end_date_encoded = requests.utils.quote(evt_end_date)
# APIのエンドポイント
api_url = f"https://xxxx.execute-api.ap-northeast-1.amazonaws.com/xxx/xxx?evt_start_date={evt_start_date_encoded}&evt_end_date={evt_end_date_encoded}"
api_response = requests.get(api_url)
api_data = api_response.json()
body_data = json.loads(api_data['body'])
message_contents = [event['message'] for event in body_data['logEvents']]
user_content = ""
for message in message_contents:
user_content += message + "\n"
# Bedrock API用設定
bedrock_runtime = boto3.client(service_name='bedrock-runtime', region_name='ap-northeast-1')
model_id = 'anthropic.claude-3-haiku-20240307-v1:0'
max_tokens = 1000
# system_promptを設定
system_prompt = (
"入力情報はシステムログです。"
f"evt_contentには障害内容が入力されます。現在の障害内容: '{evt_content}'。"
"システムログの情報を分析して障害内容の原因を調査してください。"
)
# Bedrock APIのリクエスト用メッセージ
user_message = {"role": "user", "content": user_content}
messages = [user_message]
body = json.dumps(
{
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"system": system_prompt,
"messages": messages
}
)
# Step 3: bedrock APIにデータを送信し、結果を取得
response = bedrock_runtime.invoke_model(body=body, modelId=model_id)
response_body = json.loads(response.get('body').read())
assistant_text = response_body['content'][0]['text']
st.write(assistant_text)