0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ONTAPファイルサーバのアクセス権をBedrock Agentで設定してみた(中編)

Posted at

こんにちは。NetAppでSales Specialistをしている小寺です。

ONTAPファイルサーバのアクセス権をBedrock Agentで設定してみた(中編)と題して3本立てのシリーズ2本目をお届けします。

シリーズ前編はこちら

中編:FSx for ONTAP × Lambda/SSM × ACLログ — 収集と前処理の実装

1. システム要件・前提

今日お伝えする範囲としては、主に2つに分かれます。

(1)Bedrock Agentから「Taroのアクセス権は?」と質問した際に、アクショングループのLambda経由で、EC2上に保管したACLの情報を取得するスクリプトをセッションマネージャー経由で実行し、S3にログとして保管する

(2) (1)でS3に保管したログファイルをアクショングループのLambda経由で解析して、Bedrock Agentから応答を返す

image.png

・EC2
PythonスクリプトでACL収集、ログをS3へアップロード。

・FSx for ONTAP
SMB共有を行い、ファイルサーバとして利用。

・Active Directory
FSx for ONTAPのストレージ仮想マシン(SVM)をADドメインに結合させ、ファイルアクセス制御を行う。

・SSM
EC2に保管したPythonスクリプトをLambdaから呼び出し実行。

・S3
Pythonスクリプトを実行して、取得したFSx for ONTAPのACL情報のログを保管。

2. ACL取得スクリプト(getacl.py)詳細

目的

ファイル/フォルダで誰がアクセスできるのか、有効な権限を取得します。

実行形態

Bedrock Agentのアクショングループ(Lambda) → SSM → EC2

内容

pythonスクリプトではユーザを指定したACL情報取得ではなく、全ての情報を取得する処理にしていました。
Windows APIでSIDをユーザー名に変換し、解決できない場合(エラーコード1332)はAD検索へフォールバックします。

また、指定パスのNTFS ACLを取得する際には、
ACEごとに以下の情報を抽出しています。

・ユーザー/グループ名(SID解決済み)
・権限(READ / WRITE / EXECUTE / FULL CONTROL)
・パス

またACL取得のために都度、net use コマンドを実行し、共有フォルダをマウントし、ディレクトリを再帰的に走査させます。

ファイル数が増えれば、当然処理時間も長くなるので、ThreadPoolExecutor(最大10スレッド)で並列処理できるようにしています。
収集後は、ユーザー単位で権限リストを集約し、S3バケットへアップロード処理を行います。

3. Lambda + SSM 連携

一つ目のLambdaでは、Bedrockエージェントからのリクエストを受け、SSM経由でEC2上のPythonスクリプトを実行し、ACL情報を取得します。

SSMとの連携には、「AWS-RunPowerShellScript」を利用しました。

また、Lambdaのアクセス権限として、SSM実行用とS3にログ保管するためのロールを追加しています。

ACL確認 Lambda(展開してください)

import boto3
import json
import os
import time
from typing import Dict, Any, Optional, List
from botocore.exceptions import ClientError, BotoCoreError
PYTHON_EXE    = os.environ.get("PYTHON_EXE",  r"C:\Users\Administrator\AppData\Local\Programs\Python\Python313\python.exe")
GETACL_SCRIPT = os.environ.get("GETACL_SCRIPT", r"C:\batch\getacl.py")
INSTANCE_ID   = os.environ.get("INSTANCE_ID", "i-xxxxxxxxx")

BASE_SHARE    = os.environ.get("BASE_SHARE",  r"\\XXXXX-T6ECO6R\volad\AXIES大学")

TIMEOUT_SSM   = int(os.environ.get("COMMAND_TIMEOUT", "1800"))
POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL", "2.0"))
WAIT_FOR_COMPLETION = os.environ.get("WAIT_FOR_COMPLETION", "true").lower() == "true"

ssm = boto3.client("ssm")

# ===== Bedrock 応答(Function details 方式)=====
def _build_response(event: Dict[str, Any], body_text: str, state: Optional[str] = None) -> Dict[str, Any]:
    """
    - TEXT.body は必ず str(JSON を返す場合は json.dumps で文字列化して渡す)
    - state: None / "FAILURE" / "REPROMPT"
    """
    fr = {
        "responseBody": {
            "TEXT": {"body": body_text}
        }
    }
    if state in ("FAILURE", "REPROMPT"):
        fr["responseState"] = state

    return {
        "messageVersion": "1.0",
        "response": {
            # actionGroup/function は event をエコー
            "actionGroup": event["actionGroup"],
            "function": event["function"],
            "functionResponse": fr
        },
        "sessionAttributes": event.get("sessionAttributes", {}),
        "promptSessionAttributes": event.get("promptSessionAttributes", {})
    }

# ===== Bedrock parameters(list) パース =====
def _extract_params(event: Dict[str, Any]) -> Dict[str, Any]:
    out = {}
    for p in (event.get("parameters") or []):
        if not isinstance(p, dict):
            continue
        name = p.get("name")
        val = p.get("value", p.get("stringValue", p.get("booleanValue", p.get("numberValue"))))
        if name:
            out[name] = val
    return out

# ===== SSM 呼び出し =====
def _send_ssm_powershell(commands: List[str], comment="getacl via bedrock agent") -> str:
    resp = ssm.send_command(
        InstanceIds=[INSTANCE_ID],
        DocumentName="AWS-RunPowerShellScript",
        Parameters={"commands": commands},
        TimeoutSeconds=TIMEOUT_SSM,
        Comment=comment
    )
    return resp["Command"]["CommandId"]

def _wait_ssm(command_id: str) -> Dict[str, Any]:
    while True:
        time.sleep(POLL_INTERVAL)
        inv = ssm.list_command_invocations(
            CommandId=command_id, InstanceId=INSTANCE_ID, Details=True
        )
        if not inv.get("CommandInvocations"):
            continue
        ci = inv["CommandInvocations"][0]
        status = ci.get("Status")
        if status in ("Pending", "InProgress", "Delayed"):
            continue
        plugin = (ci.get("CommandPlugins") or [{}])[0]
        return {
            "status": status,
            "responseCode": plugin.get("ResponseCode", -1),
            "stdout": plugin.get("Output", "") or "",
            "stderr": plugin.get("StandardErrorContent", "") or ""
        }

# ===== PowerShell コマンド生成(UTF-8 強制)=====
def _build_commands() -> List[str]:
    lines = [
        '$ErrorActionPreference = "Stop"',
        # --- UTF-8 強制 ---
        '[Console]::OutputEncoding = [System.Text.Encoding]::UTF8',
        '$OutputEncoding           = [System.Text.Encoding]::UTF8',
        'chcp 65001 > $null',
        '$env:PYTHONIOENCODING = "utf-8"',
        '$env:PYTHONUTF8       = "1"',
        # --- Transcript ---
        'try { Stop-Transcript -ErrorAction SilentlyContinue } catch {}',
        'Start-Transcript -Path C:\\batch\\ssm_log_getacl.txt -Force',
    ]

    py     = f'"{PYTHON_EXE}"'
    script = f'"{GETACL_SCRIPT}"'
    base   = f'"{BASE_SHARE}"'

    # ルートのみ渡す。Python は -X utf8
    lines.append(f'& {py} -X utf8 {script} {base}')
    lines.append('$code = $LASTEXITCODE; if ($code -ne 0) { throw ("getacl exited {0}" -f $code) }')
    lines.append('Stop-Transcript')
    return lines

# ===== Lambda handler =====
def lambda_handler(event, context):
    try:
        print("=== RAW EVENT ===")
        print(json.dumps(event, ensure_ascii=False))
    except Exception:
        pass

    try:
        _ = _extract_params(event)  # 将来の拡張用(今は未使用)

        commands = _build_commands()
        cmd_id = _send_ssm_powershell(commands)

        if WAIT_FOR_COMPLETION:
            result = _wait_ssm(cmd_id)
            # 成否でメッセージを作成(短文。詳細はログ/トランスクリプトに委譲)
            if result.get("status") == "Success" and result.get("responseCode") == 0:
                msg = (
                    "ACLログの更新が完了しました。"
                    "続けて AnalyzeACLLog を呼び出し、user パラメータを指定して最新のアクセス権を確認してください。"
                )
                return _build_response(event, msg, state="REPROMPT")
            else:
                err = (
                    f"ACLログ更新が失敗しました。"
                    f"status={result.get('status')}, code={result.get('responseCode')}\n"
                    f"stderr_tail={result.get('stderr')[-1000:]}"
                )
                return _build_response(event, err, state="FAILURE")
        else:
            # 非同期に更新を走らせて即返す(チェイン用に REPROMPT)
            msg = (
                f"ACLログの更新コマンドを送信しました(CommandId={cmd_id})。"
                "完了後に AnalyzeACLLog を呼び出して権限を確認してください。"
            )
            return _build_response(event, msg, state="REPROMPT")

    except (ClientError, BotoCoreError) as e:
        return _build_response(event, f"SSM 実行エラー: {e}", state="FAILURE")
    except Exception as e:
        return _build_response(event, f"Unhandled Exception: {e}", state="FAILURE")
import time
from typing import List, Dict, Any, Optional
from botocore.exceptions import ClientError, BotoCoreError

# ===== 固定/環境設定 =====

二つ目のLambdaでは、S3に保管したログを解析してアクセス権を回答しています。

ACL分析&結果応答(展開してください)
# -*- coding: utf-8 -*-
import json
import re
from typing import Dict, Any, Optional, List

import boto3
from botocore.exceptions import BotoCoreError, ClientError

# ===== Settings =====
BUCKET_NAME = "XXXXXX"
PREFIX = "logs"

s3 = boto3.client("s3")


def _build_response(event: Dict[str, Any], body_text: str, state: Optional[str] = None) -> Dict[str, Any]:
    """
    Return payload for Bedrock Agents (Function details).
    - body_text must be str
    - state: None / "FAILURE" / "REPROMPT"
    """
    fr = {"responseBody": {"TEXT": {"body": body_text}}}
    if state in ("FAILURE", "REPROMPT"):
        fr["responseState"] = state
    return {
        "messageVersion": "1.0",
        "response": {
            "actionGroup": event.get("actionGroup", "AnalyzeACLLog"),
            "function": event.get("function", "AnalyzeACLLog"),
            "functionResponse": fr,
        },
        "sessionAttributes": event.get("sessionAttributes", {}),
        "promptSessionAttributes": event.get("promptSessionAttributes", {}),
    }


def _get_param(event: Dict[str, Any], name: str) -> str:
    for p in (event.get("parameters") or []):
        if isinstance(p, dict) and p.get("name") == name:
            v = p.get("value", p.get("stringValue"))
            return (v or "").strip() if v is not None else ""
    return ""


def _extract_acl_for_candidates(log_text: str, user_candidates: List[str]) -> (List[Dict[str, Any]], Optional[str]):
    """
    Try multiple user candidates: e.g., ["fsxontap\\Hanako", "Hanako"].
    Returns (accessible_paths, matched_user).
    """
    accessible_paths: List[Dict[str, Any]] = []
    matched_user: Optional[str] = None

    for candidate in user_candidates:
        u = re.escape(candidate)
        # Domain part optional: (fsx-ontap | fsx_ontap | fsxontap) + backslash
        pattern = r"ACLs\s+for\s+(?:fsx[-_]?ontap\\\s*)?{}\s*:\s*(\[(?:.|\n)*?\])".format(u)
        m = re.search(pattern, log_text, flags=re.IGNORECASE)
        if not m:
            continue

        acl_text = m.group(1).replace("'", '"')
        try:
            acl_list = json.loads(acl_text)
            for acl in acl_list:
                path = acl.get("Path")
                rights = acl.get("Rights")
                if path is not None and rights is not None:
                    accessible_paths.append({"Path": path, "Rights": rights})
            matched_user = candidate
            break
        except json.JSONDecodeError:
            continue

    return accessible_paths, matched_user


def lambda_handler(event, context):
    # Optional debug
    try:
        print("=== RAW EVENT ===")
        print(json.dumps(event, ensure_ascii=False))
    except Exception:
        pass

    try:
        # 1) Read user parameter
        user_name = _get_param(event, "user")
        if not user_name:
            # REPROMPT: ask for user
            return _build_response(
                event,
                "ユーザー名(user)を指定してください。例: 「Jiroのアクセス権を教えて」",
                state="REPROMPT",
            )

        # 2) Find latest log object
        res = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=PREFIX)
        contents = res.get("Contents") or []
        if not contents:
            return _build_response(
                event,
                f"ログファイルが見つかりません(s3://{BUCKET_NAME}/{PREFIX})。",
            )

        latest = max(contents, key=lambda x: x["LastModified"])
        key = latest["Key"]

        # 3) Read log content
        obj = s3.get_object(Bucket=BUCKET_NAME, Key=key)
        log_data = obj["Body"].read().decode("utf-8", errors="replace")

        # 4) Prepare candidates: "fsxontap\\Hanako" and "Hanako"
        candidates: List[str] = [user_name]
        if "\\" in user_name:
            right = user_name.split("\\", 1)[1]
            if right:
                candidates.append(right)

        # 5) Extract ACLs
        accessible_paths, matched_user = _extract_acl_for_candidates(log_data, candidates)
        display_user = matched_user or user_name

        # 6) Build summary (TEXT.body must be str)
        if accessible_paths:
            lines = [f"- {it['Path']}: {it['Rights']}" for it in accessible_paths]
            summary = (
                f"ユーザー {display_user} がアクセス可能なフォルダは {len(accessible_paths)} 件です。\n"
                + "\n".join(lines)
                + f"\n(解析ログ: s3://{BUCKET_NAME}/{key})"
            )
        else:
            summary = (
                f"ユーザー {display_user} のアクセス可能なフォルダは見つかりませんでした。"
                f"\n(解析ログ: s3://{BUCKET_NAME}/{key})"
            )

        return _build_response(event, summary)

    except (ClientError, BotoCoreError) as e:
        return _build_response(event, f"S3 アクセスでエラーが発生しました: {e}", state="FAILURE")
    except Exception as e:
        return _build_response(event, f"ACL 解析中に予期しないエラーが発生しました: {e}", state="FAILURE")

4. Bedrock Agentから自然言語でアクセス権を回答する

自然言語で「Taroのアクセス権は?」と質問した際に、ACL情報を取得するアクショングループと取得してS3に保管されたログを解析して結果を返すアクショングループの2つを作っています。

アクショングループ:アクセス権取得用

パラメータとして、「folder」と「account」の2つを指定しています。
image.png

アクショングループ:アクセス権解析用

アクセス権取得用の処理として、「ACLログの更新が完了しました。続けて AnalyzeACLLog を呼び出してください。」と正常終了した場合に実行される設定です。

{EA77315D-94E5-4499-85B1-2DE8AAAE46E2}.png

以上、お読みいただきありがとうございます。
次の後編で最終回です。Bedrock Agentの設定や今後の改善ポイントについてお伝えできればと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?