0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【AWS】rootユーザでのアクティビティを検知してslackに通知する

Posted at

概要

AWSアカウントでrootユーザの操作を検知してすべてslack通知する設定です。

通知設定手順

1.slackwebhook URLの発行発行

以下手順てslack webhook URLを発行します。
https://docs.slack.dev/tools/java-slack-sdk/ja-jp/guides/incoming-webhooks/

出力されたwebhook URLを控えます。後ほど利用します。
skackの対象チャンネルにincoming-webhookが追加されます。

スクリーンショット 2025-12-03 11.12.05.png

2. CloudTrailの確認、設定

CloudTrailの確認

  1. サービス一覧から[CloudTrail]を選択
  2. [証跡]を選択し、証跡設定が存在するか確認する
    既存の証跡がある場合は、新規作成は不要です。

CloudTrailがない場合

  1. 「証跡の作成」を選択する。
    指定がなければ以下のような設定で作成する。

    • 証跡名: root-login-notification-trail
    • ストレージの場所:
      • 「新しいS3バケットの作成」を選択
      • バケット名: cloudtrail-logs-[アカウントID]-[リージョン](自動生成でOK)
    • ログファイルの SSE-KMS 暗号化: 無効
    • ログファイルの検証: 有効
    • SNS通知の配信: 無効
    • CloudWatch Logsの統合: 無効
  2. ログイベントの選択は下記で設定。

    • 管理イベント: 有効
      • 読み取り: 有効
      • 書き込み: 有効
    • データイベント: 無効
    • Insights イベント: 無効
    • ネットワークアクティビティイベント: 無効

スクリーンショット 2025-12-03 15.52.39.png

上記設定後、証跡を作成してください。

3. Lambdaの作成

  1. サービス一覧から[Lambda]を選択

  2. 「関数を作成」を選択
    下記パラメータで作成

    • 関数の作成方法: 「一から作成」を選択
    • 関数名: root-login-notification
    • ランタイム: Python 3.14
    • アーキテクチャ: x86_64
    • デフォルトの実行ロールの変更: 基本的な Lambda アクセス権限で新しいロールを作成
  3. [コード]タブから「lambda_function.py」を編集して下記コードに書き換えを行う。

import json
import os
import urllib.request
import urllib.error
from datetime import datetime, timedelta
from typing import Dict, Any


def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    EventBridgeからrootユーザーのすべてのアクティビティを受け取り、
    Slackに通知を送信するLambda関数
    """

    # 環境変数からSlack Webhook URLを取得
    webhook_url = os.environ.get('SLACK_WEBHOOK_URL')
    if not webhook_url:
        raise ValueError('SLACK_WEBHOOK_URL environment variable is not set')

    # イベントの詳細を抽出
    detail = event.get('detail', {})

    # イベント情報を抽出
    event_time = detail.get('eventTime', '')
    event_name = detail.get('eventName', 'N/A')
    source_ip = detail.get('sourceIPAddress', 'N/A')
    user_agent = detail.get('userAgent', 'N/A')
    aws_region = detail.get('awsRegion', 'N/A')

    # アカウント情報
    account_id = event.get('account', 'N/A')

    # ユーザー情報
    user_identity = detail.get('userIdentity', {})
    user_type = user_identity.get('type', 'N/A')

    # MFA使用状況(ConsoleLoginイベントの場合のみ)
    additional_event_data = detail.get('additionalEventData', {})
    mfa_used = additional_event_data.get('MFAUsed', None)

    # レスポンス要素(ログイン成功/失敗 - ConsoleLoginイベントの場合のみ)
    response_elements = detail.get('responseElements')
    console_login = None
    if response_elements and isinstance(response_elements, dict):
        console_login = response_elements.get('ConsoleLogin', None)

    # 時刻をJSTに変換
    if event_time:
        try:
            utc_time = datetime.fromisoformat(event_time.replace('Z', '+00:00'))
            # UTCからJST(+9時間)に変換
            jst_time = utc_time.replace(tzinfo=None)
            jst_time = jst_time + timedelta(hours=9)
            formatted_time = jst_time.strftime('%Y-%m-%d %H:%M:%S JST')
        except Exception as e:
            formatted_time = event_time
    else:
        formatted_time = 'N/A'

    # イベントタイプに応じた処理
    if event_name == 'ConsoleLogin':
        # コンソールログインの場合
        if console_login == 'Success':
            status_emoji = ':red_circle:'
            title = 'AWSルートユーザーログイン検知'
            color = 'danger'  # 赤色
        elif console_login == 'Failure':
            status_emoji = ':warning:'
            title = 'AWSルートユーザーログイン失敗'
            color = 'warning'  # オレンジ色
        else:
            status_emoji = ':question:'
            title = 'AWSルートユーザーログイン'
            color = '#808080'  # グレー
    else:
        # その他のイベントの場合
        status_emoji = ':red_circle:'
        title = 'AWSルートユーザーアクティビティ検知'
        color = 'danger'  # 赤色

    # MFAの表示(ConsoleLoginの場合のみ)
    mfa_emoji = ':white_check_mark:' if mfa_used == 'Yes' else ':x:'
    mfa_text = '使用' if mfa_used == 'Yes' else '未使用'

    # Slackメッセージのフィールドを構築
    fields = [
        {
            'title': 'イベント名',
            'value': event_name,
            'short': True
        },
        {
            'title': 'AWSアカウントID',
            'value': account_id,
            'short': True
        },
        {
            'title': '発生時刻',
            'value': formatted_time,
            'short': True
        },
        {
            'title': 'ソースIP',
            'value': source_ip,
            'short': True
        },
        {
            'title': 'リージョン',
            'value': aws_region,
            'short': True
        }
    ]

    # ConsoleLoginイベントの場合のみMFA情報を追加
    if event_name == 'ConsoleLogin' and mfa_used:
        fields.append({
            'title': f'MFA {mfa_emoji}',
            'value': mfa_text,
            'short': True
        })

    # ConsoleLoginイベントの場合のみログイン結果を追加
    if event_name == 'ConsoleLogin' and console_login:
        login_result = '成功' if console_login == 'Success' else ('失敗' if console_login == 'Failure' else console_login)
        fields.insert(0, {
            'title': 'ログイン結果',
            'value': login_result,
            'short': True
        })

    # ユーザーエージェントを追加
    fields.append({
        'title': 'ユーザーエージェント',
        'value': user_agent,
        'short': False
    })

    # Slackメッセージを構築
    slack_message = {
        'text': f'{status_emoji} *{title}*',
        'attachments': [
            {
                'color': color,
                'fields': fields,
                'footer': 'AWS Root Activity Notification',
                'footer_icon': 'https://a0.awsstatic.com/libra-css/images/logos/aws_logo_smile_1200x630.png',
                'ts': int(datetime.now().timestamp())
            }
        ]
    }

    # Slackに通知を送信
    try:
        headers = {
            'Content-Type': 'application/json'
        }

        data = json.dumps(slack_message).encode('utf-8')
        request = urllib.request.Request(
            webhook_url,
            data=data,
            headers=headers,
            method='POST'
        )

        with urllib.request.urlopen(request) as response:
            response_body = response.read().decode('utf-8')
            print(f'Slack notification sent successfully: {response_body}')
            print(f'Event: {event_name}, Account: {account_id}, Time: {formatted_time}')

    except urllib.error.HTTPError as e:
        error_body = e.read().decode('utf-8')
        print(f'Failed to send Slack notification: {e.code} - {error_body}')
        raise
    except Exception as e:
        print(f'Error sending Slack notification: {str(e)}')
        raise

    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'Notification sent successfully',
            'event_name': event_name,
            'event_time': formatted_time,
            'account_id': account_id
        })
    }

4. 「Deploy」ボタンを選択する。

スクリーンショット_2025-12-03_16_20_52.jpg

Lambda環境変数の設定

  1. [設定]タブを選択し、左ナビゲーションから[環境変数]を選択し、「編集」を選択します。
    スクリーンショット_2025-12-03_16_24_31.jpg

下記キーと値を入力し、「保存」を選択します。

  • キー: SLACK_WEBHOOK_URL
  • : 先ほどコピーしたSlack Webhook URL
  • 「保存」をクリック
    スクリーンショット_2025-12-03_16_27_21.jpg

Lambda一般設定の変更

  1. [設定]タブを選択し、左ナビゲーションから[一般設定]を選択し、「編集」を選択します。
    1.タイムアウト値を10秒に変更します。
    スクリーンショット_2025-12-03_16_29_48.jpg

4. EventBridgeの作成

rootログインイベントを検知してLambdaを起動するEventBridgeルールを作成します。

  1. サービス一覧から[EventBridge]を選択

  2. ルールの作成

    • 左メニューの「ルール」を選択
    • 「ルールを作成」ボタンを選択

3. [設定]タブを選択し、下記情報を入力します。

  • Rule name: root-login-notification-rule
  • 説明: Detect root user console login and trigger Lambda notification
  • イベントバス: default
  • 選択したイベントバスでルールを有効にする: 有効

スクリーンショット 2025-12-03 16.45.28.png

4. [イベントパターンを構築]ページでは下記情報を入力します。
  - 「AWS のイベントまたは EventBridge パートナーイベント」を選択

スクリーンショット 2025-12-03 16.46.12.png

5. イベントパターンは「カスタムパターン(JSONエディタ)」を選択し、下記jsonを貼り付けます。

{
  "detail": {
    "userIdentity": {
      "type": ["Root"]
    }
  }
}

上記パターンは、rootユーザーが行うすべてのAWS操作を検知します。
コンソールログインだけでなく、API呼び出し、リソース操作なども含まれます。

6. ターゲットを選択画面では下記を入力します。

ターゲットタイプ

  • 「AWS のサービス」を選択

ターゲットを選択

  • ターゲット: Lambda 関数
  • 関数: root-login-notification を選択
  • その他はデフォルト

スクリーンショット 2025-12-03 16.51.11.png

5. Lambda関数への権限追加

この権限設定により、EventBridgeがLambda関数を呼び出せるようになります。
この設定がないと、EventBridgeからLambda関数が起動されません。

  1. 作成したLambda関数(root-login-notification)を開きます。
  2. [設定]タブを開き[アクセス権限]を選択します。
  3. リソースベースのポリシーステートメントの「アクセス権限を追加」を選択し、以下の項目を追加します。
  • ステートメントID: AllowEventBridgeInvoke
  • プリンシパル: events.amazonaws.com
  • アクション: lambda:InvokeFunction

スクリーンショット 2025-12-03 17.08.10.png

4. 作成されたポリシーステートメントを選択し、「編集」ボタンを選択します。
ソース ARNにEventBridgeのARNを入力し、「保存」を選択します。

スクリーンショット_2025-12-03_17_10_37.jpg

EventBridge ARNは下記手順で表示できます。
EventBridge → ルール → root-login-notification-rule
ページ上部にARNが表示されている
例: arn:aws:events:ap-northeast-1:{AWSアカウントID}:rule/root-login-notification-rule

5. 動作確認

rootログインでログインし、S3バケット作成などCloudTrailにログ出力される操作をしてslack通知が行われるか確認を行います。
設定が問題なく行われている場合、下記のような通知がslackに届きます。
スクリーンショット_2025-12-03_20_07_57.jpg

まとめ

本記事の設定で以下のようなセキュリティ向上が図れます。

  • rootユーザのすべてのAWS操作を即座に検知可能
  • 不正アクセスや意図しない操作を早期に発見できる
  • 操作履歴の可視化

ただし全てのrootユーザのAPI操作を検知してしまうため、ログインのみ検知したい場合はEventBridgeのフィルタパターンを下記に書き換えてください。

{
  "source": ["aws.signin"],
  "detail-type": ["AWS Console Sign In via CloudTrail"],
  "detail": {
    "userIdentity": {
      "type": ["Root"]
    },
    "eventName": ["ConsoleLogin"]
  }
}

基本的にrootユーザは利用せず、IAMユーザまたはスイッチロールでのアクセスが推奨となります。
また、rootユーザを凍結する場合でもMFAは必ず設定してください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?