0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

# AWS S3 GetObject を Slack に通知する(IP/AccessKeyはマスク)

Last updated at Posted at 2025-08-11

概要

業務で特定S3バケットのアクセス監視を強化する必要があり、
実装前に運用に耐える仕様を固めたうえで、実装はAIにサポートさせ構築しました。

結果として、

  • セキュリティリスクを抑えられる
  • 障害時も追跡できる
    といった、現場で“本当に使える”監視ができています。

設計方針(仕様)

監視アラートは「精度」と「情報の適切さ」が重要です。
そのため、以下の仕様を定義しました。

  1. ECS経由アクセスの識別
    → バッチやアプリケーションの自動アクセスと人間のアクセスの識別
  2. IPアドレスのマスキング
    → 社内SlackでもフルIPは扱わない(セキュリティポリシー遵守)
  3. AccessKeyIdのマスキング
    → 内部漏えい時の影響を最小化
  4. CloudWatch Logsでのバックアップ記録
    → Slack通知が失敗しても履歴を追跡可能にする
  5. 通知内容の強化(バケット名・ファイル名・IP・UAなど)
    → 通知だけで初動判断できる情報量を確保

アーキテクチャ

  • CloudTrail データイベント:S3 GetObjectを捕捉
  • EventBridge:イベントルーティング
  • Lambda:通知処理(マスク機能内蔵)
  • Slack Incoming Webhook:アラート配信
  • CloudWatch Logs:障害時のバックアップログ

前提

  • CloudTrail が有効で、対象バケットのデータイベントを記録している
  • Slack の Incoming Webhook URL を取得済み
  • AWS CLI が利用可能で、対象リージョン・権限が設定されている

実装

設計方針を満たすコードはAIを使って生成し、
マスク処理や除外条件など仕様に沿った調整を行いました。

1. Lambda 関数コード(NotifySlackOnGetObject/app.py

import json, os, urllib.parse, urllib.request
from datetime import datetime

SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
MASK_IP = os.environ.get('MASK_IP', 'true').lower() == 'true'
MASK_ACCESS_KEY = os.environ.get('MASK_ACCESS_KEY', 'true').lower() == 'true'

def mask_ip(ip: str) -> str:
    if not ip:
        return "-"
    if MASK_IP:
        if ip.count(".") == 3:
            a, b, c, _ = ip.split(".")
            return f"{a}.{b}.{c}.0/24"
        if ":" in ip:
            parts = ip.split(":")
            return ":".join(parts[:4] + ["0000"]*4) + "/64"
    return ip

def mask_access_key(key: str) -> str:
    if MASK_ACCESS_KEY and key:
        return "***" + key[-4:]
    return key or "-"

def mask_arn(arn: str) -> str:
    if not arn or ":" not in arn:
        return arn or "-"
    parts = arn.split(":")
    if len(parts) >= 5:
        parts[4] = "************"
    return ":".join(parts)

def post_to_slack(text: str):
    data = json.dumps({"text": text}).encode("utf-8")
    req = urllib.request.Request(SLACK_WEBHOOK_URL, data=data, headers={"Content-Type": "application/json"})
    with urllib.request.urlopen(req, timeout=5): pass

def is_likely_automated(ui, ua):
    arn = ui.get("arn", "")
    role = arn.split("/")[-2] if "assumed-role/" in arn else ""
    is_ecs = role == "ecsTaskExecutionRole"
    is_sdk = "aws-sdk-go" in (ua or "").lower()
    return ui.get("type") == "AssumedRole" and is_ecs and is_sdk

def handler(event, context):
    print("EVENT:", json.dumps(event))
    d = event.get("detail") or {}
    ev = d.get("eventName")
    src = d.get("eventSource")
    acc = d.get("recipientAccountId")

    et = d.get("eventTime") or datetime.utcnow().isoformat() + "Z"
    ip = mask_ip(d.get("sourceIPAddress"))
    ua = d.get("userAgent") or "-"

    req = d.get("requestParameters") or {}
    bucket = req.get("bucketName")
    key = req.get("key") or (req.get("object") or {}).get("key")
    if key:
        key = urllib.parse.unquote(key)

    ui = d.get("userIdentity") or {}
    utype = ui.get("type")
    uarn  = mask_arn(ui.get("arn") or "-")
    aks   = mask_access_key(ui.get("accessKeyId"))

    is_robot = is_likely_automated(ui, ua)
    prefix = ":robot_face: *S3 GetObject from ECS Task*" if is_robot else ":inbox_tray: *S3 GetObject detected*"

    lines = [
        prefix,
        f"- *Time:* {et}",
        f"- *IP:* `{ip}`",
        f"- *UserAgent:* `{ua}`",
        f"- *UserIdentity.type:* `{utype}`",
        f"- *UserIdentity.arn:* `{uarn}`",
        f"- *AccessKeyId:* `{aks}`",
        f"- *Event:* `{ev}` via `{src}` (Account `{acc}`)"
    ]
    if bucket:
        lines.insert(2, f"- *Bucket:* `{bucket}`")
    if key:
        lines.insert(3, f"- *Key:* `{key}`")

    post_to_slack("\n".join(lines))
    return {"ok": True}

2. ZIP 作成

zip -j NotifySlackOnGetObject.zip NotifySlackOnGetObject/app.py

3. Lambda 更新

REGION=ap-northeast-1
LAMBDA_NAME=NotifySlackOnGetObject

aws lambda update-function-code \
  --region "$REGION" \
  --function-name "$LAMBDA_NAME" \
  --zip-file fileb://NotifySlackOnGetObject.zip

4. EventBridge ルール作成

RULE_NAME=NotifySlackOnGetObject
BUCKET_NAME=your-bucket-name

aws events put-rule \
  --region "$REGION" \
  --name "$RULE_NAME" \
  --event-pattern "{
    \"source\": [\"aws.s3\"],
    \"detail-type\": [\"AWS API Call via CloudTrail\"],
    \"detail\": {
      \"eventSource\": [\"s3.amazonaws.com\"],
      \"eventName\": [\"GetObject\"],
      \"requestParameters\": {\"bucketName\": [\"$BUCKET_NAME\"]}
    }
  }"

5. Lambda 実行権限 & ターゲット設定

RULE_ARN=$(aws events describe-rule --region "$REGION" --name "$RULE_NAME" --query 'Arn' --output text)
LAMBDA_ARN=$(aws lambda get-function --region "$REGION" --function-name "$LAMBDA_NAME" --query 'Configuration.FunctionArn' --output text)

aws lambda add-permission \
  --region "$REGION" \
  --function-name "$LAMBDA_NAME" \
  --statement-id allow-events-invoke-from-eb \
  --action lambda:InvokeFunction \
  --principal events.amazonaws.com \
  --source-arn "$RULE_ARN"

aws events put-targets \
  --region "$REGION" \
  --rule "$RULE_NAME" \
  --targets "Id"="t1","Arn"="$LAMBDA_ARN"

6. 動作確認

aws s3 cp s3://your-bucket-name/your-test-file.txt ./_tmp_test

Slack に通知が飛び、マスク済みの内容が表示されれば成功です。
人間
hito.jpg

自動アクセス
robo.jpg
Slack上で :robot_face: をミュート対象にするorメッセージごとに通知設定を分けることで、「うるさくしないけど見逃さない」 を実現できる。

運用してみて

  • 人間とバッチ処理などの自動アクセスの識別がアイコンでわかりやすい
  • マスクにより情報漏えいリスクを抑えつつ、調査に必要な情報は確保
  • CloudWatchで通知失敗時の追跡が可能

まとめ

  • AIは実装の高速化、人間は仕様設計に集中
  • 最初に仕様を固めると、実装はほぼ一発で完成
  • 運用を見据えた監視は「通知精度」と「セキュリティ意識」が鍵
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?