※本記事は、「2025 Japan AWS Jr. Champions Qiitaリレー夏」57日目の記事です。
過去の投稿はこちらからご覧ください!
https://qiita.com/natsumi_a/items/80539843482fed4cd648
はじめに
皆さん、AWS運用に 生成AI を活用していますか?
「生成AIを導入する」と聞くと、
- 「セキュリティは大丈夫なの?」
- 「導入に時間や教育コストがかかるのでは?」
といった不安を感じる方もいるかもしれません。
確かに仕組みによってはその懸念もありますが、今回はできるだけシンプルかつセキュアに導入できる、AWS運用に役立つ インシデント要約Bot を作っていこうと思います!
なお、本記事は 応用編 です。
基礎編にあたる記事も公開しているので、未読の方はぜひそちらもご覧ください。
https://qiita.com/Na_Ki_1869/items/3ce4e8b0c20b89f85b37
概要
今回作成する「インシデント要約Bot」では、以下のAWSサービスを組み合わせて利用します。
- Amazon Bedrock : 生成AIによる要約や診断を担当
- Amazon CloudWatch : インシデント検知(今回はCPUアラーム)
- AWS Systems Manager (SSM) : コマンド実行や追加調査を自動化
仕組みとしては、CloudWatch Alarm が発火した際に、初動調査をほぼ自動化してしまおうというものです。
汎用性の高いBotを目指しましたが、まずはシンプルに CPU関連インシデント専用 として構成しました。
(ただしメモリやディスクについても同じ仕組みで対応可能なので、ぜひ試してみてください!)
今回やること
対象は EC2インスタンスのCPU監視 です。
CloudWatch でCPU使用率が高騰したときに自動的に以下の流れで調査が進み、最後にレポートがTeamsに届きます。
- CloudWatchアラームをトリガーにBotを起動
- (今回は「CPU使用率高騰」を想定)
- 対象EC2の状況をBedrockが初期診断
- 初期診断を踏まえ、追加で必要なコマンドをBedrockが生成
- そのコマンドをSSM Run Commandで実行
- 初期診断+コマンド+結果 を取りまとめてレポート生成
- Teamsに最終レポートを通知
アーキテクチャ図
実装手順
では、いよいよ実装に入っていきます 🚀
1. SSM Run Command の準備
まずは Bot(Lambda)からEC2にコマンドを実行できるようにする 下準備です。
必要なのは以下の2点:
- SSM Agent のインストール
- IAMロールの付与
Amazon Linux 2 / 2023 であれば SSM Agent はプリインストール済み なので、特別な作業は不要です。
今回は Amazon Linux 2023 を監視対象にするので、IAMロールだけ設定しておきます。
IAMロールには AWS 管理ポリシー AmazonSSMManagedInstanceCore をアタッチして作成し、EC2 に割り当てましょう。
2. Amazon Bedrock の準備
次に 生成AIの頭脳となる Bedrock を有効化します。
AWSコンソールの Bedrock ページから、
[モデルアクセス] → [モデルアクセスを変更] に進み、利用したいモデルをリクエストするだけです。
今回は Claude 3.5 Sonnet を使用しました。
リクエスト後、「アクセスが付与されました」と表示されていれば準備完了です ✅
3. AWS Lambda(インシデントBot)の実装
ここからが今回の主役! Lambda関数 を作っていきます。
IAMロール作成
Lambda から CloudWatch・SSM・Bedrock を操作するための権限を付与します。
以下のようなカスタマー管理ポリシーを作成し、さらに AWSLambdaBasicExecutionRole
を合わせて付与しましょう。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ssm:SendCommand",
"ssm:GetCommandInvocation",
"cloudwatch:GetMetricData",
"ec2:DescribeInstances",
"bedrock:InvokeModel"
],
"Resource": "*"
}
]
}
目的別にさらに絞るなら、
"Resource"
を特定のインスタンスやロググループに限定してもOKです。
Lambda の作成
- ランタイム: Python 3.13
- タイムアウト: デフォルト5秒 → 10分 に延長(SSM 実行待ちのため余裕を持たせる)
コードは長いので折りたたみ形式で掲載します。
必要に応じてコピーして利用してください。
Lambdaコード(Python)
# ここにフルの Lambda コードを貼り付けてください
# (この記事の上部で提示した完成版をそのまま入れればOKです)
import boto3
import json
import os
import datetime
import time
import urllib.request
import re
from typing import List, Dict, Tuple
# AWS クライアント初期化
ssm = boto3.client('ssm')
ec2 = boto3.client('ec2')
cloudwatch = boto3.client('cloudwatch')
bedrock = boto3.client('bedrock-runtime')
# 環境変数
TEAMS_WEBHOOK_URL = os.environ['TEAMS_WEBHOOK_URL']
BEDROCK_INFERENCE_PROFILE_ARN = os.environ.get('BEDROCK_INFERENCE_PROFILE_ARN')
FALLBACK_MODEL_ID = os.environ.get('BEDROCK_FALLBACK_MODEL_ID', 'anthropic.claude-3-5-sonnet-20240620-v1:0')
ALLOW_DESTRUCTIVE = os.environ.get('ALLOW_DESTRUCTIVE', 'false').lower() == 'true'
# ===== ユーティリティ =====
def _format_cpu_list(cpu_points: List[Dict]) -> str:
if not cpu_points:
return "(なし)"
return ", ".join(f"{round(p['value'], 1)}%" for p in cpu_points)
def _truncate_lines(s: str, max_lines: int = 25, max_chars: int = 4000) -> Tuple[str, bool]:
"""長出力を安全に省略。戻り値: (本文, 省略したか)"""
s = (s or "").replace("\r\n", "\n").replace("\r", "\n")
lines = s.split("\n")
truncated = False
if len(lines) > max_lines:
lines = lines[:max_lines]
truncated = True
body = "\n".join(lines)
if len(body) > max_chars:
body = body[:max_chars]
truncated = True
return body, truncated
def _soft_wrap_long_tokens(text: str, token_threshold: int = 80, insert_every: int = 20) -> str:
def _ins(m):
t = m.group(0)
return '\u200B'.join(t[i:i+insert_every] for i in range(0, len(t), insert_every))
return re.sub(rf"\S{{{token_threshold+1},}}", _ins, text or "")
def _safe_cmd_normalize(cmd: str) -> str:
"""軽微な改善(sudo除去、grep自己ヒット回避など)"""
cmd = cmd.strip()
if cmd.startswith("sudo "):
cmd = cmd[5:]
# grep 自己ヒット回避
cmd = cmd.replace("grep stress", "grep [s]tress")
return cmd
# 破壊的コマンドの除外(deny-list)+ 調査系の許可(allow-prefix)
_DESTRUCTIVE_PATTERNS = [
r'\bkill(all)?\b', r'\bpkill\b', r'\bsystemctl\s+(stop|restart|disable)\b',
r'\breboot\b', r'\bshutdown\b', r'\bhalt\b', r'\bpoweroff\b',
r'\brm\s', r'\bmv\s', r'\bchmod\b', r'\bchown\b', r'\bchattr\b',
r'\bdd\s', r'\bmkfs\b', r'\bmount\b', r'\bumount\b',
r'\byum\s', r'\bdnf\s', r'\bapt(-get)?\s', r'\bzypper\s', r'\bpip\s',
r'\bdocker\s+(rm|stop|kill|rmi|system prune)\b',
r'\bkubectl\s+(delete|scale|cordon|drain|uncordon|apply|patch)\b',
r'>\s*(/|[A-Za-z0-9_./-]+)', # リダイレクトでの上書き/ファイル生成
r'\|?\s*(bash|sh)\s*-c\s*["\'].*["\']' # パイプでbash実行
]
_ALLOW_PREFIXES = [
"ps", "top", "cat", "grep", "egrep", "fgrep", "journalctl", "ls", "lsof",
"netstat", "ss", "df", "du", "uptime", "free", "who", "w", "uname",
"rpm", "dpkg", "systemctl status", "find", "stat", "readlink",
"sha256sum", "head", "tail", "awk", "sed -n", "dmesg"
]
def _is_destructive(cmd: str) -> bool:
for pat in _DESTRUCTIVE_PATTERNS:
if re.search(pat, cmd):
return True
return False
def _is_allowed_prefix(cmd: str) -> bool:
c = cmd.strip()
for p in _ALLOW_PREFIXES:
if c.startswith(p):
return True
return False
def _filter_investigation_commands(commands: List[str]) -> Tuple[List[str], List[str]]:
"""非破壊&調査系のみ通す。除外されたコマンドも返す。"""
safe, dropped = [], []
for raw in commands or []:
c = _safe_cmd_normalize(raw)
if ALLOW_DESTRUCTIVE:
safe.append(c)
continue
if _is_destructive(c) or not _is_allowed_prefix(c):
dropped.append(raw)
else:
safe.append(c)
return safe, dropped
def _wrap_for_ssm_with_markers(commands: List[str]) -> List[str]:
"""
各コマンドを
__CMD_START__ <idx> :: <cmd>
<出力 (stdout+stderr)>
__CMD_END__ <idx> :: exit_code=<n>
の形式で出力するようラップする。
"""
wrapped = []
for idx, cmd in enumerate(commands, 1):
# シェル安全に単引用符で包む(' → '"'"')
quoted = cmd.replace("'", "'\"'\"'")
wrapped.append(
f"echo '__CMD_START__ {idx} :: {cmd}'; "
f"timeout 30 bash -lc '{quoted}' 2>&1; "
f"ec=$?; echo '__CMD_END__ {idx} :: exit_code='${{ec}}"
)
return wrapped
def _parse_marked_output(stdout: str) -> List[Dict]:
"""
_wrap_for_ssm_with_markersで吐いたstdoutを解析し、
[{'index':1, 'cmd':'...', 'exit_code':0, 'output':'...'}, ...] に整形
"""
lines = (stdout or "").splitlines()
results = []
cur = None
for ln in lines:
if ln.startswith("__CMD_START__"):
# 例: "__CMD_START__ 1 :: ps aux"
m = re.match(r"__CMD_START__\s+(\d+)\s+::\s+(.*)$", ln)
if m:
if cur: # 取りこぼし防止
results.append(cur)
cur = {"index": int(m.group(1)), "cmd": m.group(2), "output": []}
elif ln.startswith("__CMD_END__"):
# 例: "__CMD_END__ 1 :: exit_code=0"
m = re.match(r"__CMD_END__\s+(\d+)\s+::\s+exit_code=(\d+)", ln)
if m and cur and cur.get("index") == int(m.group(1)):
cur["exit_code"] = int(m.group(2))
cur["output"] = "\n".join(cur["output"]).strip()
results.append(cur)
cur = None
else:
if cur is not None:
cur["output"].append(ln)
# 最終閉じ忘れ対策
if cur:
cur["exit_code"] = None
cur["output"] = "\n".join(cur["output"]).strip()
results.append(cur)
# index順で並べ替え
results.sort(key=lambda x: x.get("index", 0))
return results
# ===== Lambdaハンドラ =====
def lambda_handler(event, context):
# 受信イベントの形を自動判別(EventBridge or 直接アラーム)
alarm_ctx = _normalize_alarm_event(event)
if not alarm_ctx['ok']:
_send_teams_error("(unknown)", f"イベント解析に失敗: {alarm_ctx['error']}\n{safe_preview(event)}")
return {'statusCode': 400}
alarm_name = alarm_ctx['alarm_name']
alarm_reason = alarm_ctx['alarm_reason']
instance_id = alarm_ctx['instance_id']
if not instance_id:
_send_teams_error(alarm_name, "インスタンスIDが取得できませんでした。(アラームのディメンションに InstanceId を含めてください)")
return {'statusCode': 400}
# 診断に必要な素材
inst = _get_instance_info(instance_id)
cpu_data = _get_cpu_metrics(instance_id)
top_out = _get_top_output(instance_id)
# ① 初期診断+コマンド候補(最大5件、非破壊)
res = _bedrock_initial_and_commands(alarm_reason, inst, cpu_data, top_out, max_cmds=5)
initial = res['initial']
commands = res['commands']
# ② 「調査のみ」ポリシーでフィルタ
safe_cmds, dropped_cmds = _filter_investigation_commands(commands)
# ③ SSM 実行(コマンド毎にマーカー付与→結果をペア化)
cmd_results = _execute_commands_structured(instance_id, safe_cmds)
# ④ 最終分析(“提案のみ”。自動停止などは書かないようプロンプトで明示)
final = _bedrock_final(alarm_reason, inst, cpu_data, top_out, initial, safe_cmds, cmd_results)
# ⑤ Teams 通知(コマンド→結果の対応を明示/長文は省略表示)
_send_teams_report(
alarm_name, inst, cpu_data, top_out,
initial, safe_cmds, dropped_cmds, cmd_results, final
)
return {'statusCode': 200}
# ---- イベント正規化(EventBridge/直接アラーム 両対応) ----
def _normalize_alarm_event(event: dict) -> dict:
try:
# (A) EventBridge(detail-type=CloudWatch Alarm State Change)
if 'detail' in event and isinstance(event['detail'], dict):
d = event['detail']
alarm_name = d.get('alarmName') or "(unknown)"
alarm_reason = (d.get('state') or {}).get('reason') or "(no reason)"
instance_id = _extract_instance_id_from_detail(d)
return {'ok': True, 'alarm_name': alarm_name, 'alarm_reason': alarm_reason, 'instance_id': instance_id, 'error': None}
# (B) CloudWatch アラーム → Lambda 直接
if 'alarmData' in event and isinstance(event['alarmData'], dict):
d = event['alarmData']
alarm_name = d.get('alarmName') or "(unknown)"
alarm_reason = (d.get('state') or {}).get('reason') or "(no reason)"
instance_id = _extract_instance_id_from_detail(d)
return {'ok': True, 'alarm_name': alarm_name, 'alarm_reason': alarm_reason, 'instance_id': instance_id, 'error': None}
# (C) 旧来の直接形式
if 'AlarmName' in event or 'NewStateReason' in event or 'Trigger' in event:
alarm_name = event.get('AlarmName') or "(unknown)"
alarm_reason = event.get('NewStateReason') or event.get('Reason') or "(no reason)"
instance_id = _extract_instance_id_from_trigger(event.get('Trigger'))
return {'ok': True, 'alarm_name': alarm_name, 'alarm_reason': alarm_reason, 'instance_id': instance_id, 'error': None}
return {'ok': False, 'alarm_name': None, 'alarm_reason': None, 'instance_id': None, 'error': "未対応のイベント形式"}
except Exception as e:
return {'ok': False, 'alarm_name': None, 'alarm_reason': None, 'instance_id': None, 'error': str(e)}
def _extract_instance_id_from_detail(detail: dict):
cfg = detail.get('configuration', {})
metrics = cfg.get('metrics') or []
for m in metrics:
ms = m.get('metricStat') or m.get('metric_stat') or {}
metric = ms.get('metric') or {}
dims = metric.get('dimensions')
if isinstance(dims, dict):
v = dims.get('InstanceId')
if v: return v
if isinstance(dims, list):
for d in dims:
n = d.get('name') or d.get('Name')
if n == 'InstanceId':
return d.get('value') or d.get('Value')
return None
def _extract_instance_id_from_trigger(trigger: dict):
if not trigger or not isinstance(trigger, dict):
return None
dims = trigger.get('Dimensions') or trigger.get('dimensions')
if isinstance(dims, list):
for d in dims:
n = d.get('name') or d.get('Name')
if n == 'InstanceId':
return d.get('value') or d.get('Value')
if isinstance(dims, dict):
return dims.get('InstanceId')
return None
def safe_preview(event: dict) -> str:
try:
js = json.dumps(event)[:2000]
return js
except Exception:
return "(event preview unavailable)"
# ---- EC2/CloudWatch/SSM ----
def _get_instance_info(instance_id):
try:
r = ec2.describe_instances(InstanceIds=[instance_id])
i = r['Reservations'][0]['Instances'][0]
tags = {t['Key']: t['Value'] for t in i.get('Tags', [])}
return {
'InstanceId': instance_id,
'Name': tags.get('Name', 'Unknown'),
'Type': i.get('InstanceType', ''),
'State': i.get('State', {}).get('Name', ''),
'LaunchTime': str(i.get('LaunchTime'))
}
except Exception:
return {'InstanceId': instance_id, 'Name': 'Unknown'}
def _get_cpu_metrics(instance_id):
now = datetime.datetime.utcnow()
past = now - datetime.timedelta(minutes=30)
resp = cloudwatch.get_metric_data(
MetricDataQueries=[{
'Id': 'cpu',
'MetricStat': {
'Metric': {
'Namespace': 'AWS/EC2',
'MetricName': 'CPUUtilization',
'Dimensions': [{'Name': 'InstanceId', 'Value': instance_id}]
},
'Period': 300,
'Stat': 'Average'
},
'ReturnData': True
}],
StartTime=past, EndTime=now
)
r = resp['MetricDataResults'][0]
data = sorted(zip(r['Timestamps'], r['Values']), reverse=True)[:5]
return [{'timestamp': t.strftime('%Y-%m-%dT%H:%M:%SZ'), 'value': v} for t, v in data]
def _get_top_output(instance_id):
try:
resp = ssm.send_command(
InstanceIds=[instance_id],
DocumentName='AWS-RunShellScript',
Parameters={'commands': ['top -b -n 1 | head -n 60']}
)
cid = resp['Command']['CommandId']
for _ in range(12):
time.sleep(5)
inv = ssm.get_command_invocation(CommandId=cid, InstanceId=instance_id)
if inv['Status'] in ['Success', 'Failed', 'Cancelled', 'TimedOut']:
break
return inv.get('StandardOutputContent', '').strip() or f"(SSM status: {inv['Status']})"
except Exception as e:
return f"top コマンド取得エラー: {e}"
# ---- Bedrock問い合わせ(非破壊コマンドのみを強く指示) ----
def _bedrock_initial_and_commands(reason, inst, cpu, top_out, max_cmds=5):
prompt = f"""
あなたは AWS EC2 のトラブルシュート専門家です。
これから出力する「commands」は **非破壊(読み取り専用)の調査コマンド** のみとしてください。
禁止例: kill/pkill/killall, systemctl stop/restart, rm/mv/chmod/chown, パッケージインストール, 再起動, ファイル上書き, リダイレクト(>), curl|bash 等。
許可例: ps/top/cat/grep/journalctl/ls/lsof/df/du/uptime/free/uname/systemctl status/find/stat/readlink/sha256sum/head/tail/awk/sed -n/dmesg など。
出力フォーマットは以下の厳格なJSONのみ:
{{
"initial": "(問題概要と仮説を簡潔に)",
"commands": ["cmd1", "cmd2", "...(最大{max_cmds}件)"]
}}
-- アラーム理由: {reason}
-- インスタンス: {inst['Name']} ({inst['InstanceId']})
-- 過去 5 回の CPU 使用率: {_format_cpu_list(cpu)}
-- top コマンド結果(先頭のみ): {top_out[:1500]}
"""
text = _invoke_bedrock(prompt, max_tokens=600)
try:
body = text[text.find('{'): text.rfind('}') + 1]
data = json.loads(body)
cmds = (data.get('commands') or [])[:max_cmds]
return {'initial': data.get('initial', ''), 'commands': cmds}
except Exception:
return {'initial': '(解析エラー)', 'commands': []}
def _bedrock_final(reason, inst, cpu, top_out, initial, cmds, cmd_results):
# 最終報告も「提案のみ。自動実施しない」ことを明示
joined_cmds = ", ".join(cmds or [])
summarized_results = []
for r in cmd_results or []:
preview, truncated = _truncate_lines(r.get("output",""), max_lines=10, max_chars=1200)
summarized_results.append(
f"[cmd#{r.get('index')} exit={r.get('exit_code')}]: {r.get('cmd')}\n{preview}\n{'(…省略…) ' if truncated else ''}".strip()
)
results_text = "\n\n".join(summarized_results) if summarized_results else "(結果なし)"
prompt = f"""
あなたはトラブルシュートの専門家です。以下をもとに、**提案のみ(自動実行なし)**で
1) 問題概要 2) 根本原因の仮説 3) 追加調査/恒久対策の提案(実行はしない) を簡潔にまとめてください。
-- アラーム理由: {reason}
-- インスタンス: {inst['Name']} ({inst['InstanceId']})
-- 過去 CPU: {_format_cpu_list(cpu)}
-- top 抜粋: {top_out[:1500]}
-- 初期診断: {initial}
-- 実行した調査コマンド: {joined_cmds or '(なし)'}
-- コマンド結果サマリ:
{results_text}
"""
return _invoke_bedrock(prompt, max_tokens=700)
def _invoke_bedrock(prompt, max_tokens=300):
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}]
}
kwargs = {
"contentType": "application/json",
"accept": "application/json",
"body": json.dumps(body),
}
if BEDROCK_INFERENCE_PROFILE_ARN:
kwargs["modelId"] = BEDROCK_INFERENCE_PROFILE_ARN
else:
kwargs["modelId"] = FALLBACK_MODEL_ID
resp = bedrock.invoke_model(**kwargs)
data = json.loads(resp['body'].read())
return data['content'][0]['text']
# ---- SSM 実行(構造化結果で返す) ----
def _execute_commands_structured(instance_id: str, commands: List[str]) -> List[Dict]:
if not commands:
return []
try:
wrapped = _wrap_for_ssm_with_markers(commands)
resp = ssm.send_command(
InstanceIds=[instance_id],
DocumentName='AWS-RunShellScript',
Parameters={'commands': wrapped}
)
cid = resp['Command']['CommandId']
inv = None
for _ in range(18): # 最大約90秒
time.sleep(5)
inv = ssm.get_command_invocation(CommandId=cid, InstanceId=instance_id)
if inv['Status'] in ['Success', 'Failed', 'Cancelled', 'TimedOut']:
break
stdout = inv.get('StandardOutputContent', '') or ''
# stderrは2>&1でstdoutに混ぜているので参考程度に追記
stderr = inv.get('StandardErrorContent', '') or ''
if stderr.strip():
stdout = f"{stdout}\n[stderr]\n{stderr}"
parsed = _parse_marked_output(stdout)
# 長文をカード向けに軽く整形
for item in parsed:
out = item.get("output", "")
out = _soft_wrap_long_tokens(out)
preview, truncated = _truncate_lines(out, max_lines=40, max_chars=6000)
item["output_preview"] = preview + ("\n(…省略…)" if truncated else "")
return parsed
except Exception as e:
return [{"index": 1, "cmd": "(実行エラー)", "exit_code": None, "output_preview": f"SSM 実行エラー: {e}"}]
# ---- Teams 送信 ----
def _send_teams_error(alarm, msg):
payload = {
"type": "message",
"attachments": [
{"contentType": "application/vnd.microsoft.card.adaptive",
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard", "version": "1.2",
"body": [
{"type": "TextBlock", "text": f"[Error] {alarm}", "weight": "Bolder"},
{"type": "TextBlock", "text": msg, "wrap": True}
]
}}
]
}
_post_teams(payload)
def _send_teams_report(alarm, inst, cpu, top_out, initial, cmds, dropped_cmds, cmd_results, final):
def _tb(text, bold=False, size=None, monospace=False):
tb = {"type": "TextBlock", "text": text, "wrap": True}
if bold: tb["weight"] = "Bolder"
if size: tb["size"] = size
if monospace: tb["fontType"] = "Monospace"
return tb
facts = [
{"title": "InstanceId", "value": inst.get("InstanceId", "")},
{"title": "モード", "value": "調査のみ(非破壊)" if not ALLOW_DESTRUCTIVE else "調査+対処(許可済)"},
{"title": "CPU(最新5)", "value": _format_cpu_list(cpu)}
]
body_elements = [
_tb(f"EC2 {inst.get('Name','')} の CPU アラート", bold=True, size="Large"),
{"type": "FactSet", "facts": facts},
_tb("初期診断", bold=True),
_tb(initial or "(なし)"),
_tb("実行コマンド(調査のみ)", bold=True),
_tb("\n".join(cmds) if cmds else "(なし)", monospace=True),
]
if dropped_cmds:
body_elements += [
_tb("※除外されたコマンド(安全ポリシーにより未実行)", bold=True),
_tb("\n".join(dropped_cmds), monospace=True)
]
# コマンド結果(コマンドごと)
body_elements.append(_tb("コマンド結果(コマンド→結果)", bold=True))
if cmd_results:
for r in cmd_results:
body_elements += [
_tb(f"cmd#{r.get('index')} (exit={r.get('exit_code')})", bold=True),
_tb(r.get("cmd", ""), monospace=True),
_tb(r.get("output_preview", "(出力なし)"), monospace=True)
]
else:
body_elements.append(_tb("(結果なし)"))
# top 抜粋(参考)
if top_out:
top_preview, top_trunc = _truncate_lines(top_out, max_lines=20, max_chars=2000)
body_elements += [
_tb("top 抜粋(参考)", bold=True),
_tb(top_preview + ("\n(…省略…)" if top_trunc else ""), monospace=True)
]
# 最終分析(提案のみ)
body_elements += [
_tb("最終分析(提案のみ・自動実行なし)", bold=True),
_tb(final or "(なし)")
]
body = {
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.2",
"msteams": { "width": "full" },
"body": body_elements
}
}
]
}
_post_teams(body)
def _post_teams(payload):
req = urllib.request.Request(
TEAMS_WEBHOOK_URL,
data=json.dumps(payload).encode('utf-8'),
headers={'Content-Type': 'application/json'}
)
with urllib.request.urlopen(req):
pass
環境変数の設定
Lambda に以下の環境変数を設定します。
名前 | 推奨値 / 例 | 説明 |
---|---|---|
ALLOW_DESTRUCTIVE | false |
false にすると破壊的コマンドをフィルタ(本番推奨)。true は検証時のみ。 |
BEDROCK_INFERENCE_PROFILE_ARN | arn:aws:bedrock:... |
Cross-region inference を使う場合の ARN。未使用なら空でOK。 |
TEAMS_WEBHOOK_URL | https://... |
Teams 通知用 Webhook URL |
4. CloudWatch Alarm の設定
最後に CloudWatch Alarm を作成し、トリガー先に先ほど作成した Lambda を設定します。
今回は「CPU使用率が 80% を超えたら アラーム発報」というシンプルな条件にしました。
これで一通りの準備が整いました 🎉
実際に動かしてみた
では実際に動かしてみましょう。
対象EC2で以下のコマンドを実行し、CPUを強制的に100%にしてみます。
stress --cpu 2 --timeout 900
すると、CloudWatch Alarm → Lambda → SSM → Bedrock という流れで処理が進み…
最終的に Teams にレポートが届きました!
初期診断 → コマンド生成 → 結果収集 → 最終分析 まで自動でまとめてくれるので、まさに「Botが一次調査を代行してくれる」感覚です。
まとめ
今回ご紹介したのは、
「AWS運用 × 生成AI × サーバレス構成」で実現するインシデント要約Bot です。
- CloudWatch がアラートを検知
- Lambda が初動調査を自動化
- Bedrock が調査内容を要約
- Teams にわかりやすく通知
これまで人手で行っていた「サーバに入って状態を確認 → サマリを作成 → 報告」という流れをほとんど自動化できます。
今回は CPU に限定しましたが、メモリ/ディスク 監視にも拡張可能。
通知先を Slack にする、ServiceNow と連携する、などの発展もできるかなと思います!
ぜひ「AWS運用における初動調査の自動化」から、生成AI活用を始めてみてください ✨