8
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS運用×生成AI!Amazon Bedrock を使ってインシデント要約Botを作ってみた(応用編)

Posted at

※本記事は、「2025 Japan AWS Jr. Champions Qiitaリレー夏」57日目の記事です。
過去の投稿はこちらからご覧ください!
https://qiita.com/natsumi_a/items/80539843482fed4cd648

はじめに

皆さん、AWS運用に 生成AI を活用していますか?

「生成AIを導入する」と聞くと、

  • 「セキュリティは大丈夫なの?」
  • 「導入に時間や教育コストがかかるのでは?」

といった不安を感じる方もいるかもしれません。

確かに仕組みによってはその懸念もありますが、今回はできるだけシンプルかつセキュアに導入できる、AWS運用に役立つ インシデント要約Bot を作っていこうと思います!

なお、本記事は 応用編 です。
基礎編にあたる記事も公開しているので、未読の方はぜひそちらもご覧ください。
https://qiita.com/Na_Ki_1869/items/3ce4e8b0c20b89f85b37


概要

今回作成する「インシデント要約Bot」では、以下のAWSサービスを組み合わせて利用します。

  • Amazon Bedrock : 生成AIによる要約や診断を担当
  • Amazon CloudWatch : インシデント検知(今回はCPUアラーム)
  • AWS Systems Manager (SSM) : コマンド実行や追加調査を自動化

仕組みとしては、CloudWatch Alarm が発火した際に、初動調査をほぼ自動化してしまおうというものです。

汎用性の高いBotを目指しましたが、まずはシンプルに CPU関連インシデント専用 として構成しました。
(ただしメモリやディスクについても同じ仕組みで対応可能なので、ぜひ試してみてください!)


今回やること

対象は EC2インスタンスのCPU監視 です。
CloudWatch でCPU使用率が高騰したときに自動的に以下の流れで調査が進み、最後にレポートがTeamsに届きます。

  1. CloudWatchアラームをトリガーにBotを起動
    • (今回は「CPU使用率高騰」を想定)
  2. 対象EC2の状況をBedrockが初期診断
  3. 初期診断を踏まえ、追加で必要なコマンドをBedrockが生成
  4. そのコマンドをSSM Run Commandで実行
  5. 初期診断+コマンド+結果 を取りまとめてレポート生成
  6. Teamsに最終レポートを通知

アーキテクチャ図

image.png


実装手順

では、いよいよ実装に入っていきます 🚀


1. SSM Run Command の準備

まずは Bot(Lambda)からEC2にコマンドを実行できるようにする 下準備です。

必要なのは以下の2点:

  • SSM Agent のインストール
  • IAMロールの付与

Amazon Linux 2 / 2023 であれば SSM Agent はプリインストール済み なので、特別な作業は不要です。
今回は Amazon Linux 2023 を監視対象にするので、IAMロールだけ設定しておきます。

IAMロールには AWS 管理ポリシー AmazonSSMManagedInstanceCore をアタッチして作成し、EC2 に割り当てましょう。

(参考スクショ)
image.png


2. Amazon Bedrock の準備

次に 生成AIの頭脳となる Bedrock を有効化します。

AWSコンソールの Bedrock ページから、
[モデルアクセス] → [モデルアクセスを変更] に進み、利用したいモデルをリクエストするだけです。

今回は Claude 3.5 Sonnet を使用しました。
リクエスト後、「アクセスが付与されました」と表示されていれば準備完了です ✅

(参考スクショ)
image.png
image.png


3. AWS Lambda(インシデントBot)の実装

ここからが今回の主役! Lambda関数 を作っていきます。

IAMロール作成

Lambda から CloudWatch・SSM・Bedrock を操作するための権限を付与します。
以下のようなカスタマー管理ポリシーを作成し、さらに AWSLambdaBasicExecutionRole を合わせて付与しましょう。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "ssm:SendCommand",
        "ssm:GetCommandInvocation",
        "cloudwatch:GetMetricData",
        "ec2:DescribeInstances",
        "bedrock:InvokeModel"
      ],
      "Resource": "*"
    }
  ]
}

目的別にさらに絞るなら、"Resource" を特定のインスタンスやロググループに限定してもOKです。

(参考スクショ)
image.png

Lambda の作成

  • ランタイム: Python 3.13
  • タイムアウト: デフォルト5秒 → 10分 に延長(SSM 実行待ちのため余裕を持たせる)

コードは長いので折りたたみ形式で掲載します。
必要に応じてコピーして利用してください。

Lambdaコード(Python)
# ここにフルの Lambda コードを貼り付けてください
# (この記事の上部で提示した完成版をそのまま入れればOKです)
import boto3
import json
import os
import datetime
import time
import urllib.request
import re
from typing import List, Dict, Tuple

# AWS クライアント初期化
ssm        = boto3.client('ssm')
ec2        = boto3.client('ec2')
cloudwatch = boto3.client('cloudwatch')
bedrock    = boto3.client('bedrock-runtime')

# 環境変数
TEAMS_WEBHOOK_URL = os.environ['TEAMS_WEBHOOK_URL']
BEDROCK_INFERENCE_PROFILE_ARN = os.environ.get('BEDROCK_INFERENCE_PROFILE_ARN')
FALLBACK_MODEL_ID = os.environ.get('BEDROCK_FALLBACK_MODEL_ID', 'anthropic.claude-3-5-sonnet-20240620-v1:0')
ALLOW_DESTRUCTIVE = os.environ.get('ALLOW_DESTRUCTIVE', 'false').lower() == 'true'

# ===== ユーティリティ =====
def _format_cpu_list(cpu_points: List[Dict]) -> str:
    if not cpu_points:
        return "(なし)"
    return ", ".join(f"{round(p['value'], 1)}%" for p in cpu_points)

def _truncate_lines(s: str, max_lines: int = 25, max_chars: int = 4000) -> Tuple[str, bool]:
    """長出力を安全に省略。戻り値: (本文, 省略したか)"""
    s = (s or "").replace("\r\n", "\n").replace("\r", "\n")
    lines = s.split("\n")
    truncated = False
    if len(lines) > max_lines:
        lines = lines[:max_lines]
        truncated = True
    body = "\n".join(lines)
    if len(body) > max_chars:
        body = body[:max_chars]
        truncated = True
    return body, truncated

def _soft_wrap_long_tokens(text: str, token_threshold: int = 80, insert_every: int = 20) -> str:
    def _ins(m):
        t = m.group(0)
        return '\u200B'.join(t[i:i+insert_every] for i in range(0, len(t), insert_every))
    return re.sub(rf"\S{{{token_threshold+1},}}", _ins, text or "")

def _safe_cmd_normalize(cmd: str) -> str:
    """軽微な改善(sudo除去、grep自己ヒット回避など)"""
    cmd = cmd.strip()
    if cmd.startswith("sudo "):
        cmd = cmd[5:]
    # grep 自己ヒット回避
    cmd = cmd.replace("grep stress", "grep [s]tress")
    return cmd

# 破壊的コマンドの除外(deny-list)+ 調査系の許可(allow-prefix)
_DESTRUCTIVE_PATTERNS = [
    r'\bkill(all)?\b', r'\bpkill\b', r'\bsystemctl\s+(stop|restart|disable)\b',
    r'\breboot\b', r'\bshutdown\b', r'\bhalt\b', r'\bpoweroff\b',
    r'\brm\s', r'\bmv\s', r'\bchmod\b', r'\bchown\b', r'\bchattr\b',
    r'\bdd\s', r'\bmkfs\b', r'\bmount\b', r'\bumount\b',
    r'\byum\s', r'\bdnf\s', r'\bapt(-get)?\s', r'\bzypper\s', r'\bpip\s',
    r'\bdocker\s+(rm|stop|kill|rmi|system prune)\b',
    r'\bkubectl\s+(delete|scale|cordon|drain|uncordon|apply|patch)\b',
    r'>\s*(/|[A-Za-z0-9_./-]+)',           # リダイレクトでの上書き/ファイル生成
    r'\|?\s*(bash|sh)\s*-c\s*["\'].*["\']' # パイプでbash実行
]

_ALLOW_PREFIXES = [
    "ps", "top", "cat", "grep", "egrep", "fgrep", "journalctl", "ls", "lsof",
    "netstat", "ss", "df", "du", "uptime", "free", "who", "w", "uname",
    "rpm", "dpkg", "systemctl status", "find", "stat", "readlink",
    "sha256sum", "head", "tail", "awk", "sed -n", "dmesg"
]

def _is_destructive(cmd: str) -> bool:
    for pat in _DESTRUCTIVE_PATTERNS:
        if re.search(pat, cmd):
            return True
    return False

def _is_allowed_prefix(cmd: str) -> bool:
    c = cmd.strip()
    for p in _ALLOW_PREFIXES:
        if c.startswith(p):
            return True
    return False

def _filter_investigation_commands(commands: List[str]) -> Tuple[List[str], List[str]]:
    """非破壊&調査系のみ通す。除外されたコマンドも返す。"""
    safe, dropped = [], []
    for raw in commands or []:
        c = _safe_cmd_normalize(raw)
        if ALLOW_DESTRUCTIVE:
            safe.append(c)
            continue
        if _is_destructive(c) or not _is_allowed_prefix(c):
            dropped.append(raw)
        else:
            safe.append(c)
    return safe, dropped

def _wrap_for_ssm_with_markers(commands: List[str]) -> List[str]:
    """
    各コマンドを
      __CMD_START__ <idx> :: <cmd>
      <出力 (stdout+stderr)>
      __CMD_END__   <idx> :: exit_code=<n>
    の形式で出力するようラップする。
    """
    wrapped = []
    for idx, cmd in enumerate(commands, 1):
        # シェル安全に単引用符で包む(' → '"'"')
        quoted = cmd.replace("'", "'\"'\"'")
        wrapped.append(
            f"echo '__CMD_START__ {idx} :: {cmd}'; "
            f"timeout 30 bash -lc '{quoted}' 2>&1; "
            f"ec=$?; echo '__CMD_END__ {idx} :: exit_code='${{ec}}"
        )
    return wrapped

def _parse_marked_output(stdout: str) -> List[Dict]:
    """
    _wrap_for_ssm_with_markersで吐いたstdoutを解析し、
    [{'index':1, 'cmd':'...', 'exit_code':0, 'output':'...'}, ...] に整形
    """
    lines = (stdout or "").splitlines()
    results = []
    cur = None
    for ln in lines:
        if ln.startswith("__CMD_START__"):
            # 例: "__CMD_START__ 1 :: ps aux"
            m = re.match(r"__CMD_START__\s+(\d+)\s+::\s+(.*)$", ln)
            if m:
                if cur:  # 取りこぼし防止
                    results.append(cur)
                cur = {"index": int(m.group(1)), "cmd": m.group(2), "output": []}
        elif ln.startswith("__CMD_END__"):
            # 例: "__CMD_END__ 1 :: exit_code=0"
            m = re.match(r"__CMD_END__\s+(\d+)\s+::\s+exit_code=(\d+)", ln)
            if m and cur and cur.get("index") == int(m.group(1)):
                cur["exit_code"] = int(m.group(2))
                cur["output"] = "\n".join(cur["output"]).strip()
                results.append(cur)
                cur = None
        else:
            if cur is not None:
                cur["output"].append(ln)
    # 最終閉じ忘れ対策
    if cur:
        cur["exit_code"] = None
        cur["output"] = "\n".join(cur["output"]).strip()
        results.append(cur)
    # index順で並べ替え
    results.sort(key=lambda x: x.get("index", 0))
    return results

# ===== Lambdaハンドラ =====
def lambda_handler(event, context):
    # 受信イベントの形を自動判別(EventBridge or 直接アラーム)
    alarm_ctx = _normalize_alarm_event(event)
    if not alarm_ctx['ok']:
        _send_teams_error("(unknown)", f"イベント解析に失敗: {alarm_ctx['error']}\n{safe_preview(event)}")
        return {'statusCode': 400}

    alarm_name   = alarm_ctx['alarm_name']
    alarm_reason = alarm_ctx['alarm_reason']
    instance_id  = alarm_ctx['instance_id']

    if not instance_id:
        _send_teams_error(alarm_name, "インスタンスIDが取得できませんでした。(アラームのディメンションに InstanceId を含めてください)")
        return {'statusCode': 400}

    # 診断に必要な素材
    inst     = _get_instance_info(instance_id)
    cpu_data = _get_cpu_metrics(instance_id)
    top_out  = _get_top_output(instance_id)

    # ① 初期診断+コマンド候補(最大5件、非破壊)
    res = _bedrock_initial_and_commands(alarm_reason, inst, cpu_data, top_out, max_cmds=5)
    initial  = res['initial']
    commands = res['commands']

    # ② 「調査のみ」ポリシーでフィルタ
    safe_cmds, dropped_cmds = _filter_investigation_commands(commands)

    # ③ SSM 実行(コマンド毎にマーカー付与→結果をペア化)
    cmd_results = _execute_commands_structured(instance_id, safe_cmds)

    # ④ 最終分析(“提案のみ”。自動停止などは書かないようプロンプトで明示)
    final = _bedrock_final(alarm_reason, inst, cpu_data, top_out, initial, safe_cmds, cmd_results)

    # ⑤ Teams 通知(コマンド→結果の対応を明示/長文は省略表示)
    _send_teams_report(
        alarm_name, inst, cpu_data, top_out,
        initial, safe_cmds, dropped_cmds, cmd_results, final
    )

    return {'statusCode': 200}

# ---- イベント正規化(EventBridge/直接アラーム 両対応) ----
def _normalize_alarm_event(event: dict) -> dict:
    try:
        # (A) EventBridge(detail-type=CloudWatch Alarm State Change)
        if 'detail' in event and isinstance(event['detail'], dict):
            d = event['detail']
            alarm_name   = d.get('alarmName') or "(unknown)"
            alarm_reason = (d.get('state') or {}).get('reason') or "(no reason)"
            instance_id  = _extract_instance_id_from_detail(d)
            return {'ok': True, 'alarm_name': alarm_name, 'alarm_reason': alarm_reason, 'instance_id': instance_id, 'error': None}

        # (B) CloudWatch アラーム → Lambda 直接
        if 'alarmData' in event and isinstance(event['alarmData'], dict):
            d = event['alarmData']
            alarm_name   = d.get('alarmName') or "(unknown)"
            alarm_reason = (d.get('state') or {}).get('reason') or "(no reason)"
            instance_id  = _extract_instance_id_from_detail(d)
            return {'ok': True, 'alarm_name': alarm_name, 'alarm_reason': alarm_reason, 'instance_id': instance_id, 'error': None}

        # (C) 旧来の直接形式
        if 'AlarmName' in event or 'NewStateReason' in event or 'Trigger' in event:
            alarm_name   = event.get('AlarmName') or "(unknown)"
            alarm_reason = event.get('NewStateReason') or event.get('Reason') or "(no reason)"
            instance_id  = _extract_instance_id_from_trigger(event.get('Trigger'))
            return {'ok': True, 'alarm_name': alarm_name, 'alarm_reason': alarm_reason, 'instance_id': instance_id, 'error': None}

        return {'ok': False, 'alarm_name': None, 'alarm_reason': None, 'instance_id': None, 'error': "未対応のイベント形式"}
    except Exception as e:
        return {'ok': False, 'alarm_name': None, 'alarm_reason': None, 'instance_id': None, 'error': str(e)}

def _extract_instance_id_from_detail(detail: dict):
    cfg = detail.get('configuration', {})
    metrics = cfg.get('metrics') or []
    for m in metrics:
        ms = m.get('metricStat') or m.get('metric_stat') or {}
        metric = ms.get('metric') or {}
        dims = metric.get('dimensions')
        if isinstance(dims, dict):
            v = dims.get('InstanceId')
            if v: return v
        if isinstance(dims, list):
            for d in dims:
                n = d.get('name') or d.get('Name')
                if n == 'InstanceId':
                    return d.get('value') or d.get('Value')
    return None

def _extract_instance_id_from_trigger(trigger: dict):
    if not trigger or not isinstance(trigger, dict):
        return None
    dims = trigger.get('Dimensions') or trigger.get('dimensions')
    if isinstance(dims, list):
        for d in dims:
            n = d.get('name') or d.get('Name')
            if n == 'InstanceId':
                return d.get('value') or d.get('Value')
    if isinstance(dims, dict):
        return dims.get('InstanceId')
    return None

def safe_preview(event: dict) -> str:
    try:
        js = json.dumps(event)[:2000]
        return js
    except Exception:
        return "(event preview unavailable)"

# ---- EC2/CloudWatch/SSM ----
def _get_instance_info(instance_id):
    try:
        r = ec2.describe_instances(InstanceIds=[instance_id])
        i = r['Reservations'][0]['Instances'][0]
        tags = {t['Key']: t['Value'] for t in i.get('Tags', [])}
        return {
            'InstanceId': instance_id,
            'Name': tags.get('Name', 'Unknown'),
            'Type': i.get('InstanceType', ''),
            'State': i.get('State', {}).get('Name', ''),
            'LaunchTime': str(i.get('LaunchTime'))
        }
    except Exception:
        return {'InstanceId': instance_id, 'Name': 'Unknown'}

def _get_cpu_metrics(instance_id):
    now = datetime.datetime.utcnow()
    past = now - datetime.timedelta(minutes=30)
    resp = cloudwatch.get_metric_data(
        MetricDataQueries=[{
            'Id': 'cpu',
            'MetricStat': {
                'Metric': {
                    'Namespace': 'AWS/EC2',
                    'MetricName': 'CPUUtilization',
                    'Dimensions': [{'Name': 'InstanceId', 'Value': instance_id}]
                },
                'Period': 300,
                'Stat': 'Average'
            },
            'ReturnData': True
        }],
        StartTime=past, EndTime=now
    )
    r = resp['MetricDataResults'][0]
    data = sorted(zip(r['Timestamps'], r['Values']), reverse=True)[:5]
    return [{'timestamp': t.strftime('%Y-%m-%dT%H:%M:%SZ'), 'value': v} for t, v in data]

def _get_top_output(instance_id):
    try:
        resp = ssm.send_command(
            InstanceIds=[instance_id],
            DocumentName='AWS-RunShellScript',
            Parameters={'commands': ['top -b -n 1 | head -n 60']}
        )
        cid = resp['Command']['CommandId']
        for _ in range(12):
            time.sleep(5)
            inv = ssm.get_command_invocation(CommandId=cid, InstanceId=instance_id)
            if inv['Status'] in ['Success', 'Failed', 'Cancelled', 'TimedOut']:
                break
        return inv.get('StandardOutputContent', '').strip() or f"(SSM status: {inv['Status']})"
    except Exception as e:
        return f"top コマンド取得エラー: {e}"

# ---- Bedrock問い合わせ(非破壊コマンドのみを強く指示) ----
def _bedrock_initial_and_commands(reason, inst, cpu, top_out, max_cmds=5):
    prompt = f"""
あなたは AWS EC2 のトラブルシュート専門家です。
これから出力する「commands」は **非破壊(読み取り専用)の調査コマンド** のみとしてください。
禁止例: kill/pkill/killall, systemctl stop/restart, rm/mv/chmod/chown, パッケージインストール, 再起動, ファイル上書き, リダイレクト(>), curl|bash 等。
許可例: ps/top/cat/grep/journalctl/ls/lsof/df/du/uptime/free/uname/systemctl status/find/stat/readlink/sha256sum/head/tail/awk/sed -n/dmesg など。

出力フォーマットは以下の厳格なJSONのみ:
{{
  "initial": "(問題概要と仮説を簡潔に)",
  "commands": ["cmd1", "cmd2", "...(最大{max_cmds}件)"]
}}

-- アラーム理由: {reason}
-- インスタンス: {inst['Name']} ({inst['InstanceId']})
-- 過去 5 回の CPU 使用率: {_format_cpu_list(cpu)}
-- top コマンド結果(先頭のみ): {top_out[:1500]}
"""
    text = _invoke_bedrock(prompt, max_tokens=600)
    try:
        body = text[text.find('{'): text.rfind('}') + 1]
        data = json.loads(body)
        cmds = (data.get('commands') or [])[:max_cmds]
        return {'initial': data.get('initial', ''), 'commands': cmds}
    except Exception:
        return {'initial': '(解析エラー)', 'commands': []}

def _bedrock_final(reason, inst, cpu, top_out, initial, cmds, cmd_results):
    # 最終報告も「提案のみ。自動実施しない」ことを明示
    joined_cmds = ", ".join(cmds or [])
    summarized_results = []
    for r in cmd_results or []:
        preview, truncated = _truncate_lines(r.get("output",""), max_lines=10, max_chars=1200)
        summarized_results.append(
            f"[cmd#{r.get('index')} exit={r.get('exit_code')}]: {r.get('cmd')}\n{preview}\n{'(…省略…) ' if truncated else ''}".strip()
        )
    results_text = "\n\n".join(summarized_results) if summarized_results else "(結果なし)"
    prompt = f"""
あなたはトラブルシュートの専門家です。以下をもとに、**提案のみ(自動実行なし)**で
1) 問題概要 2) 根本原因の仮説 3) 追加調査/恒久対策の提案(実行はしない) を簡潔にまとめてください。

-- アラーム理由: {reason}
-- インスタンス: {inst['Name']} ({inst['InstanceId']})
-- 過去 CPU: {_format_cpu_list(cpu)}
-- top 抜粋: {top_out[:1500]}
-- 初期診断: {initial}
-- 実行した調査コマンド: {joined_cmds or '(なし)'}
-- コマンド結果サマリ:
{results_text}
"""
    return _invoke_bedrock(prompt, max_tokens=700)

def _invoke_bedrock(prompt, max_tokens=300):
    body = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": max_tokens,
        "messages": [{"role": "user", "content": prompt}]
    }
    kwargs = {
        "contentType": "application/json",
        "accept": "application/json",
        "body": json.dumps(body),
    }
    if BEDROCK_INFERENCE_PROFILE_ARN:
        kwargs["modelId"] = BEDROCK_INFERENCE_PROFILE_ARN
    else:
        kwargs["modelId"] = FALLBACK_MODEL_ID

    resp = bedrock.invoke_model(**kwargs)
    data = json.loads(resp['body'].read())
    return data['content'][0]['text']

# ---- SSM 実行(構造化結果で返す) ----
def _execute_commands_structured(instance_id: str, commands: List[str]) -> List[Dict]:
    if not commands:
        return []

    try:
        wrapped = _wrap_for_ssm_with_markers(commands)
        resp = ssm.send_command(
            InstanceIds=[instance_id],
            DocumentName='AWS-RunShellScript',
            Parameters={'commands': wrapped}
        )
        cid = resp['Command']['CommandId']

        inv = None
        for _ in range(18):  # 最大約90秒
            time.sleep(5)
            inv = ssm.get_command_invocation(CommandId=cid, InstanceId=instance_id)
            if inv['Status'] in ['Success', 'Failed', 'Cancelled', 'TimedOut']:
                break

        stdout = inv.get('StandardOutputContent', '') or ''
        # stderrは2>&1でstdoutに混ぜているので参考程度に追記
        stderr = inv.get('StandardErrorContent', '') or ''
        if stderr.strip():
            stdout = f"{stdout}\n[stderr]\n{stderr}"

        parsed = _parse_marked_output(stdout)

        # 長文をカード向けに軽く整形
        for item in parsed:
            out = item.get("output", "")
            out = _soft_wrap_long_tokens(out)
            preview, truncated = _truncate_lines(out, max_lines=40, max_chars=6000)
            item["output_preview"] = preview + ("\n(…省略…)" if truncated else "")
        return parsed

    except Exception as e:
        return [{"index": 1, "cmd": "(実行エラー)", "exit_code": None, "output_preview": f"SSM 実行エラー: {e}"}]

# ---- Teams 送信 ----
def _send_teams_error(alarm, msg):
    payload = {
        "type": "message",
        "attachments": [
            {"contentType": "application/vnd.microsoft.card.adaptive",
             "content": {
                 "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
                 "type": "AdaptiveCard", "version": "1.2",
                 "body": [
                     {"type": "TextBlock", "text": f"[Error] {alarm}", "weight": "Bolder"},
                     {"type": "TextBlock", "text": msg, "wrap": True}
                 ]
             }}
        ]
    }
    _post_teams(payload)

def _send_teams_report(alarm, inst, cpu, top_out, initial, cmds, dropped_cmds, cmd_results, final):
    def _tb(text, bold=False, size=None, monospace=False):
        tb = {"type": "TextBlock", "text": text, "wrap": True}
        if bold: tb["weight"] = "Bolder"
        if size: tb["size"] = size
        if monospace: tb["fontType"] = "Monospace"
        return tb

    facts = [
        {"title": "InstanceId", "value": inst.get("InstanceId", "")},
        {"title": "モード", "value": "調査のみ(非破壊)" if not ALLOW_DESTRUCTIVE else "調査+対処(許可済)"},
        {"title": "CPU(最新5)", "value": _format_cpu_list(cpu)}
    ]

    body_elements = [
        _tb(f"EC2 {inst.get('Name','')} の CPU アラート", bold=True, size="Large"),
        {"type": "FactSet", "facts": facts},
        _tb("初期診断", bold=True),
        _tb(initial or "(なし)"),
        _tb("実行コマンド(調査のみ)", bold=True),
        _tb("\n".join(cmds) if cmds else "(なし)", monospace=True),
    ]

    if dropped_cmds:
        body_elements += [
            _tb("※除外されたコマンド(安全ポリシーにより未実行)", bold=True),
            _tb("\n".join(dropped_cmds), monospace=True)
        ]

    # コマンド結果(コマンドごと)
    body_elements.append(_tb("コマンド結果(コマンド→結果)", bold=True))
    if cmd_results:
        for r in cmd_results:
            body_elements += [
                _tb(f"cmd#{r.get('index')} (exit={r.get('exit_code')})", bold=True),
                _tb(r.get("cmd", ""), monospace=True),
                _tb(r.get("output_preview", "(出力なし)"), monospace=True)
            ]
    else:
        body_elements.append(_tb("(結果なし)"))

    # top 抜粋(参考)
    if top_out:
        top_preview, top_trunc = _truncate_lines(top_out, max_lines=20, max_chars=2000)
        body_elements += [
            _tb("top 抜粋(参考)", bold=True),
            _tb(top_preview + ("\n(…省略…)" if top_trunc else ""), monospace=True)
        ]

    # 最終分析(提案のみ)
    body_elements += [
        _tb("最終分析(提案のみ・自動実行なし)", bold=True),
        _tb(final or "(なし)")
    ]

    body = {
        "type": "message",
        "attachments": [
            {
                "contentType": "application/vnd.microsoft.card.adaptive",
                "content": {
                    "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
                    "type": "AdaptiveCard",
                    "version": "1.2",
                    "msteams": { "width": "full" },
                    "body": body_elements
                }
            }
        ]
    }
    _post_teams(body)

def _post_teams(payload):
    req = urllib.request.Request(
        TEAMS_WEBHOOK_URL,
        data=json.dumps(payload).encode('utf-8'),
        headers={'Content-Type': 'application/json'}
    )
    with urllib.request.urlopen(req):
        pass


環境変数の設定

Lambda に以下の環境変数を設定します。

名前 推奨値 / 例 説明
ALLOW_DESTRUCTIVE false false にすると破壊的コマンドをフィルタ(本番推奨)。true は検証時のみ。
BEDROCK_INFERENCE_PROFILE_ARN arn:aws:bedrock:... Cross-region inference を使う場合の ARN。未使用なら空でOK。
TEAMS_WEBHOOK_URL https://... Teams 通知用 Webhook URL

(参考スクショ)
image.png


4. CloudWatch Alarm の設定

最後に CloudWatch Alarm を作成し、トリガー先に先ほど作成した Lambda を設定します。

今回は「CPU使用率が 80% を超えたら アラーム発報」というシンプルな条件にしました。

(参考スクショ)
image.png

これで一通りの準備が整いました 🎉


実際に動かしてみた

では実際に動かしてみましょう。

対象EC2で以下のコマンドを実行し、CPUを強制的に100%にしてみます。

stress --cpu 2 --timeout 900

すると、CloudWatch Alarm → Lambda → SSM → Bedrock という流れで処理が進み…

最終的に Teams にレポートが届きました!
初期診断 → コマンド生成 → 結果収集 → 最終分析 まで自動でまとめてくれるので、まさに「Botが一次調査を代行してくれる」感覚です。

(参考スクショ)
image.png


まとめ

今回ご紹介したのは、
「AWS運用 × 生成AI × サーバレス構成」で実現するインシデント要約Bot です。

  • CloudWatch がアラートを検知
  • Lambda が初動調査を自動化
  • Bedrock が調査内容を要約
  • Teams にわかりやすく通知

これまで人手で行っていた「サーバに入って状態を確認 → サマリを作成 → 報告」という流れをほとんど自動化できます。

今回は CPU に限定しましたが、メモリ/ディスク 監視にも拡張可能。
通知先を Slack にする、ServiceNow と連携する、などの発展もできるかなと思います!

ぜひ「AWS運用における初動調査の自動化」から、生成AI活用を始めてみてください ✨

8
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?