こんにちは。NetAppでSales Specialistをしている小寺です。
ONTAPファイルサーバのアクセス権をBedrock Agentで設定してみた(中編)と題して3本立てのシリーズ2本目をお届けします。
シリーズ前編はこちら
中編:FSx for ONTAP × Lambda/SSM × ACLログ — 収集と前処理の実装
1. システム要件・前提
今日お伝えする範囲としては、主に2つに分かれます。
(1)Bedrock Agentから「Taroのアクセス権は?」と質問した際に、アクショングループのLambda経由で、EC2上に保管したACLの情報を取得するスクリプトをセッションマネージャー経由で実行し、S3にログとして保管する
(2) (1)でS3に保管したログファイルをアクショングループのLambda経由で解析して、Bedrock Agentから応答を返す
・EC2
PythonスクリプトでACL収集、ログをS3へアップロード。
・FSx for ONTAP
SMB共有を行い、ファイルサーバとして利用。
・Active Directory
FSx for ONTAPのストレージ仮想マシン(SVM)をADドメインに結合させ、ファイルアクセス制御を行う。
・SSM
EC2に保管したPythonスクリプトをLambdaから呼び出し実行。
・S3
Pythonスクリプトを実行して、取得したFSx for ONTAPのACL情報のログを保管。
2. ACL取得スクリプト(getacl.py)詳細
目的
ファイル/フォルダで誰がアクセスできるのか、有効な権限を取得します。
実行形態
Bedrock Agentのアクショングループ(Lambda) → SSM → EC2
内容
pythonスクリプトではユーザを指定したACL情報取得ではなく、全ての情報を取得する処理にしていました。
Windows APIでSIDをユーザー名に変換し、解決できない場合(エラーコード1332)はAD検索へフォールバックします。
また、指定パスのNTFS ACLを取得する際には、
ACEごとに以下の情報を抽出しています。
・ユーザー/グループ名(SID解決済み)
・権限(READ / WRITE / EXECUTE / FULL CONTROL)
・パス
またACL取得のために都度、net use コマンドを実行し、共有フォルダをマウントし、ディレクトリを再帰的に走査させます。
ファイル数が増えれば、当然処理時間も長くなるので、ThreadPoolExecutor(最大10スレッド)で並列処理できるようにしています。
収集後は、ユーザー単位で権限リストを集約し、S3バケットへアップロード処理を行います。
3. Lambda + SSM 連携
一つ目のLambdaでは、Bedrockエージェントからのリクエストを受け、SSM経由でEC2上のPythonスクリプトを実行し、ACL情報を取得します。
SSMとの連携には、「AWS-RunPowerShellScript」を利用しました。
また、Lambdaのアクセス権限として、SSM実行用とS3にログ保管するためのロールを追加しています。
ACL確認 Lambda(展開してください)
import boto3
import json
import os
import time
from typing import Dict, Any, Optional, List
from botocore.exceptions import ClientError, BotoCoreError
PYTHON_EXE = os.environ.get("PYTHON_EXE", r"C:\Users\Administrator\AppData\Local\Programs\Python\Python313\python.exe")
GETACL_SCRIPT = os.environ.get("GETACL_SCRIPT", r"C:\batch\getacl.py")
INSTANCE_ID = os.environ.get("INSTANCE_ID", "i-xxxxxxxxx")
BASE_SHARE = os.environ.get("BASE_SHARE", r"\\XXXXX-T6ECO6R\volad\AXIES大学")
TIMEOUT_SSM = int(os.environ.get("COMMAND_TIMEOUT", "1800"))
POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL", "2.0"))
WAIT_FOR_COMPLETION = os.environ.get("WAIT_FOR_COMPLETION", "true").lower() == "true"
ssm = boto3.client("ssm")
# ===== Bedrock 応答(Function details 方式)=====
def _build_response(event: Dict[str, Any], body_text: str, state: Optional[str] = None) -> Dict[str, Any]:
"""
- TEXT.body は必ず str(JSON を返す場合は json.dumps で文字列化して渡す)
- state: None / "FAILURE" / "REPROMPT"
"""
fr = {
"responseBody": {
"TEXT": {"body": body_text}
}
}
if state in ("FAILURE", "REPROMPT"):
fr["responseState"] = state
return {
"messageVersion": "1.0",
"response": {
# actionGroup/function は event をエコー
"actionGroup": event["actionGroup"],
"function": event["function"],
"functionResponse": fr
},
"sessionAttributes": event.get("sessionAttributes", {}),
"promptSessionAttributes": event.get("promptSessionAttributes", {})
}
# ===== Bedrock parameters(list) パース =====
def _extract_params(event: Dict[str, Any]) -> Dict[str, Any]:
out = {}
for p in (event.get("parameters") or []):
if not isinstance(p, dict):
continue
name = p.get("name")
val = p.get("value", p.get("stringValue", p.get("booleanValue", p.get("numberValue"))))
if name:
out[name] = val
return out
# ===== SSM 呼び出し =====
def _send_ssm_powershell(commands: List[str], comment="getacl via bedrock agent") -> str:
resp = ssm.send_command(
InstanceIds=[INSTANCE_ID],
DocumentName="AWS-RunPowerShellScript",
Parameters={"commands": commands},
TimeoutSeconds=TIMEOUT_SSM,
Comment=comment
)
return resp["Command"]["CommandId"]
def _wait_ssm(command_id: str) -> Dict[str, Any]:
while True:
time.sleep(POLL_INTERVAL)
inv = ssm.list_command_invocations(
CommandId=command_id, InstanceId=INSTANCE_ID, Details=True
)
if not inv.get("CommandInvocations"):
continue
ci = inv["CommandInvocations"][0]
status = ci.get("Status")
if status in ("Pending", "InProgress", "Delayed"):
continue
plugin = (ci.get("CommandPlugins") or [{}])[0]
return {
"status": status,
"responseCode": plugin.get("ResponseCode", -1),
"stdout": plugin.get("Output", "") or "",
"stderr": plugin.get("StandardErrorContent", "") or ""
}
# ===== PowerShell コマンド生成(UTF-8 強制)=====
def _build_commands() -> List[str]:
lines = [
'$ErrorActionPreference = "Stop"',
# --- UTF-8 強制 ---
'[Console]::OutputEncoding = [System.Text.Encoding]::UTF8',
'$OutputEncoding = [System.Text.Encoding]::UTF8',
'chcp 65001 > $null',
'$env:PYTHONIOENCODING = "utf-8"',
'$env:PYTHONUTF8 = "1"',
# --- Transcript ---
'try { Stop-Transcript -ErrorAction SilentlyContinue } catch {}',
'Start-Transcript -Path C:\\batch\\ssm_log_getacl.txt -Force',
]
py = f'"{PYTHON_EXE}"'
script = f'"{GETACL_SCRIPT}"'
base = f'"{BASE_SHARE}"'
# ルートのみ渡す。Python は -X utf8
lines.append(f'& {py} -X utf8 {script} {base}')
lines.append('$code = $LASTEXITCODE; if ($code -ne 0) { throw ("getacl exited {0}" -f $code) }')
lines.append('Stop-Transcript')
return lines
# ===== Lambda handler =====
def lambda_handler(event, context):
try:
print("=== RAW EVENT ===")
print(json.dumps(event, ensure_ascii=False))
except Exception:
pass
try:
_ = _extract_params(event) # 将来の拡張用(今は未使用)
commands = _build_commands()
cmd_id = _send_ssm_powershell(commands)
if WAIT_FOR_COMPLETION:
result = _wait_ssm(cmd_id)
# 成否でメッセージを作成(短文。詳細はログ/トランスクリプトに委譲)
if result.get("status") == "Success" and result.get("responseCode") == 0:
msg = (
"ACLログの更新が完了しました。"
"続けて AnalyzeACLLog を呼び出し、user パラメータを指定して最新のアクセス権を確認してください。"
)
return _build_response(event, msg, state="REPROMPT")
else:
err = (
f"ACLログ更新が失敗しました。"
f"status={result.get('status')}, code={result.get('responseCode')}\n"
f"stderr_tail={result.get('stderr')[-1000:]}"
)
return _build_response(event, err, state="FAILURE")
else:
# 非同期に更新を走らせて即返す(チェイン用に REPROMPT)
msg = (
f"ACLログの更新コマンドを送信しました(CommandId={cmd_id})。"
"完了後に AnalyzeACLLog を呼び出して権限を確認してください。"
)
return _build_response(event, msg, state="REPROMPT")
except (ClientError, BotoCoreError) as e:
return _build_response(event, f"SSM 実行エラー: {e}", state="FAILURE")
except Exception as e:
return _build_response(event, f"Unhandled Exception: {e}", state="FAILURE")
import time
from typing import List, Dict, Any, Optional
from botocore.exceptions import ClientError, BotoCoreError
# ===== 固定/環境設定 =====
二つ目のLambdaでは、S3に保管したログを解析してアクセス権を回答しています。
ACL分析&結果応答(展開してください)
# -*- coding: utf-8 -*-
import json
import re
from typing import Dict, Any, Optional, List
import boto3
from botocore.exceptions import BotoCoreError, ClientError
# ===== Settings =====
BUCKET_NAME = "XXXXXX"
PREFIX = "logs"
s3 = boto3.client("s3")
def _build_response(event: Dict[str, Any], body_text: str, state: Optional[str] = None) -> Dict[str, Any]:
"""
Return payload for Bedrock Agents (Function details).
- body_text must be str
- state: None / "FAILURE" / "REPROMPT"
"""
fr = {"responseBody": {"TEXT": {"body": body_text}}}
if state in ("FAILURE", "REPROMPT"):
fr["responseState"] = state
return {
"messageVersion": "1.0",
"response": {
"actionGroup": event.get("actionGroup", "AnalyzeACLLog"),
"function": event.get("function", "AnalyzeACLLog"),
"functionResponse": fr,
},
"sessionAttributes": event.get("sessionAttributes", {}),
"promptSessionAttributes": event.get("promptSessionAttributes", {}),
}
def _get_param(event: Dict[str, Any], name: str) -> str:
for p in (event.get("parameters") or []):
if isinstance(p, dict) and p.get("name") == name:
v = p.get("value", p.get("stringValue"))
return (v or "").strip() if v is not None else ""
return ""
def _extract_acl_for_candidates(log_text: str, user_candidates: List[str]) -> (List[Dict[str, Any]], Optional[str]):
"""
Try multiple user candidates: e.g., ["fsxontap\\Hanako", "Hanako"].
Returns (accessible_paths, matched_user).
"""
accessible_paths: List[Dict[str, Any]] = []
matched_user: Optional[str] = None
for candidate in user_candidates:
u = re.escape(candidate)
# Domain part optional: (fsx-ontap | fsx_ontap | fsxontap) + backslash
pattern = r"ACLs\s+for\s+(?:fsx[-_]?ontap\\\s*)?{}\s*:\s*(\[(?:.|\n)*?\])".format(u)
m = re.search(pattern, log_text, flags=re.IGNORECASE)
if not m:
continue
acl_text = m.group(1).replace("'", '"')
try:
acl_list = json.loads(acl_text)
for acl in acl_list:
path = acl.get("Path")
rights = acl.get("Rights")
if path is not None and rights is not None:
accessible_paths.append({"Path": path, "Rights": rights})
matched_user = candidate
break
except json.JSONDecodeError:
continue
return accessible_paths, matched_user
def lambda_handler(event, context):
# Optional debug
try:
print("=== RAW EVENT ===")
print(json.dumps(event, ensure_ascii=False))
except Exception:
pass
try:
# 1) Read user parameter
user_name = _get_param(event, "user")
if not user_name:
# REPROMPT: ask for user
return _build_response(
event,
"ユーザー名(user)を指定してください。例: 「Jiroのアクセス権を教えて」",
state="REPROMPT",
)
# 2) Find latest log object
res = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=PREFIX)
contents = res.get("Contents") or []
if not contents:
return _build_response(
event,
f"ログファイルが見つかりません(s3://{BUCKET_NAME}/{PREFIX})。",
)
latest = max(contents, key=lambda x: x["LastModified"])
key = latest["Key"]
# 3) Read log content
obj = s3.get_object(Bucket=BUCKET_NAME, Key=key)
log_data = obj["Body"].read().decode("utf-8", errors="replace")
# 4) Prepare candidates: "fsxontap\\Hanako" and "Hanako"
candidates: List[str] = [user_name]
if "\\" in user_name:
right = user_name.split("\\", 1)[1]
if right:
candidates.append(right)
# 5) Extract ACLs
accessible_paths, matched_user = _extract_acl_for_candidates(log_data, candidates)
display_user = matched_user or user_name
# 6) Build summary (TEXT.body must be str)
if accessible_paths:
lines = [f"- {it['Path']}: {it['Rights']}" for it in accessible_paths]
summary = (
f"ユーザー {display_user} がアクセス可能なフォルダは {len(accessible_paths)} 件です。\n"
+ "\n".join(lines)
+ f"\n(解析ログ: s3://{BUCKET_NAME}/{key})"
)
else:
summary = (
f"ユーザー {display_user} のアクセス可能なフォルダは見つかりませんでした。"
f"\n(解析ログ: s3://{BUCKET_NAME}/{key})"
)
return _build_response(event, summary)
except (ClientError, BotoCoreError) as e:
return _build_response(event, f"S3 アクセスでエラーが発生しました: {e}", state="FAILURE")
except Exception as e:
return _build_response(event, f"ACL 解析中に予期しないエラーが発生しました: {e}", state="FAILURE")
4. Bedrock Agentから自然言語でアクセス権を回答する
自然言語で「Taroのアクセス権は?」と質問した際に、ACL情報を取得するアクショングループと取得してS3に保管されたログを解析して結果を返すアクショングループの2つを作っています。
アクショングループ:アクセス権取得用
パラメータとして、「folder」と「account」の2つを指定しています。

アクショングループ:アクセス権解析用
アクセス権取得用の処理として、「ACLログの更新が完了しました。続けて AnalyzeACLLog を呼び出してください。」と正常終了した場合に実行される設定です。
以上、お読みいただきありがとうございます。
次の後編で最終回です。Bedrock Agentの設定や今後の改善ポイントについてお伝えできればと思います。

