概要
業務で特定S3バケットのアクセス監視を強化する必要があり、
実装前に運用に耐える仕様を固めたうえで、実装はAIにサポートさせ構築しました。
結果として、
- セキュリティリスクを抑えられる
- 障害時も追跡できる
といった、現場で“本当に使える”監視ができています。
設計方針(仕様)
監視アラートは「精度」と「情報の適切さ」が重要です。
そのため、以下の仕様を定義しました。
-
ECS経由アクセスの識別
→ バッチやアプリケーションの自動アクセスと人間のアクセスの識別 -
IPアドレスのマスキング
→ 社内SlackでもフルIPは扱わない(セキュリティポリシー遵守) -
AccessKeyIdのマスキング
→ 内部漏えい時の影響を最小化 -
CloudWatch Logsでのバックアップ記録
→ Slack通知が失敗しても履歴を追跡可能にする -
通知内容の強化(バケット名・ファイル名・IP・UAなど)
→ 通知だけで初動判断できる情報量を確保
アーキテクチャ
- CloudTrail データイベント:S3 GetObjectを捕捉
- EventBridge:イベントルーティング
- Lambda:通知処理(マスク機能内蔵)
- Slack Incoming Webhook:アラート配信
- CloudWatch Logs:障害時のバックアップログ
前提
- CloudTrail が有効で、対象バケットのデータイベントを記録している
- Slack の Incoming Webhook URL を取得済み
- AWS CLI が利用可能で、対象リージョン・権限が設定されている
実装
設計方針を満たすコードはAIを使って生成し、
マスク処理や除外条件など仕様に沿った調整を行いました。
1. Lambda 関数コード(NotifySlackOnGetObject/app.py)
import json, os, urllib.parse, urllib.request
from datetime import datetime
SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
MASK_IP = os.environ.get('MASK_IP', 'true').lower() == 'true'
MASK_ACCESS_KEY = os.environ.get('MASK_ACCESS_KEY', 'true').lower() == 'true'
def mask_ip(ip: str) -> str:
if not ip:
return "-"
if MASK_IP:
if ip.count(".") == 3:
a, b, c, _ = ip.split(".")
return f"{a}.{b}.{c}.0/24"
if ":" in ip:
parts = ip.split(":")
return ":".join(parts[:4] + ["0000"]*4) + "/64"
return ip
def mask_access_key(key: str) -> str:
if MASK_ACCESS_KEY and key:
return "***" + key[-4:]
return key or "-"
def mask_arn(arn: str) -> str:
if not arn or ":" not in arn:
return arn or "-"
parts = arn.split(":")
if len(parts) >= 5:
parts[4] = "************"
return ":".join(parts)
def post_to_slack(text: str):
data = json.dumps({"text": text}).encode("utf-8")
req = urllib.request.Request(SLACK_WEBHOOK_URL, data=data, headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=5): pass
def is_likely_automated(ui, ua):
arn = ui.get("arn", "")
role = arn.split("/")[-2] if "assumed-role/" in arn else ""
is_ecs = role == "ecsTaskExecutionRole"
is_sdk = "aws-sdk-go" in (ua or "").lower()
return ui.get("type") == "AssumedRole" and is_ecs and is_sdk
def handler(event, context):
print("EVENT:", json.dumps(event))
d = event.get("detail") or {}
ev = d.get("eventName")
src = d.get("eventSource")
acc = d.get("recipientAccountId")
et = d.get("eventTime") or datetime.utcnow().isoformat() + "Z"
ip = mask_ip(d.get("sourceIPAddress"))
ua = d.get("userAgent") or "-"
req = d.get("requestParameters") or {}
bucket = req.get("bucketName")
key = req.get("key") or (req.get("object") or {}).get("key")
if key:
key = urllib.parse.unquote(key)
ui = d.get("userIdentity") or {}
utype = ui.get("type")
uarn = mask_arn(ui.get("arn") or "-")
aks = mask_access_key(ui.get("accessKeyId"))
is_robot = is_likely_automated(ui, ua)
prefix = ":robot_face: *S3 GetObject from ECS Task*" if is_robot else ":inbox_tray: *S3 GetObject detected*"
lines = [
prefix,
f"- *Time:* {et}",
f"- *IP:* `{ip}`",
f"- *UserAgent:* `{ua}`",
f"- *UserIdentity.type:* `{utype}`",
f"- *UserIdentity.arn:* `{uarn}`",
f"- *AccessKeyId:* `{aks}`",
f"- *Event:* `{ev}` via `{src}` (Account `{acc}`)"
]
if bucket:
lines.insert(2, f"- *Bucket:* `{bucket}`")
if key:
lines.insert(3, f"- *Key:* `{key}`")
post_to_slack("\n".join(lines))
return {"ok": True}
2. ZIP 作成
zip -j NotifySlackOnGetObject.zip NotifySlackOnGetObject/app.py
3. Lambda 更新
REGION=ap-northeast-1
LAMBDA_NAME=NotifySlackOnGetObject
aws lambda update-function-code \
--region "$REGION" \
--function-name "$LAMBDA_NAME" \
--zip-file fileb://NotifySlackOnGetObject.zip
4. EventBridge ルール作成
RULE_NAME=NotifySlackOnGetObject
BUCKET_NAME=your-bucket-name
aws events put-rule \
--region "$REGION" \
--name "$RULE_NAME" \
--event-pattern "{
\"source\": [\"aws.s3\"],
\"detail-type\": [\"AWS API Call via CloudTrail\"],
\"detail\": {
\"eventSource\": [\"s3.amazonaws.com\"],
\"eventName\": [\"GetObject\"],
\"requestParameters\": {\"bucketName\": [\"$BUCKET_NAME\"]}
}
}"
5. Lambda 実行権限 & ターゲット設定
RULE_ARN=$(aws events describe-rule --region "$REGION" --name "$RULE_NAME" --query 'Arn' --output text)
LAMBDA_ARN=$(aws lambda get-function --region "$REGION" --function-name "$LAMBDA_NAME" --query 'Configuration.FunctionArn' --output text)
aws lambda add-permission \
--region "$REGION" \
--function-name "$LAMBDA_NAME" \
--statement-id allow-events-invoke-from-eb \
--action lambda:InvokeFunction \
--principal events.amazonaws.com \
--source-arn "$RULE_ARN"
aws events put-targets \
--region "$REGION" \
--rule "$RULE_NAME" \
--targets "Id"="t1","Arn"="$LAMBDA_ARN"
6. 動作確認
aws s3 cp s3://your-bucket-name/your-test-file.txt ./_tmp_test
Slack に通知が飛び、マスク済みの内容が表示されれば成功です。
人間

自動アクセス

Slack上で :robot_face: をミュート対象にするorメッセージごとに通知設定を分けることで、「うるさくしないけど見逃さない」 を実現できる。
運用してみて
- 人間とバッチ処理などの自動アクセスの識別がアイコンでわかりやすい
- マスクにより情報漏えいリスクを抑えつつ、調査に必要な情報は確保
- CloudWatchで通知失敗時の追跡が可能
まとめ
- AIは実装の高速化、人間は仕様設計に集中
- 最初に仕様を固めると、実装はほぼ一発で完成
- 運用を見据えた監視は「通知精度」と「セキュリティ意識」が鍵