1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MCPの活用や応用への考察 - LLMエージェントのアクセスを制御せよ:MCPサーバーにおけるRBACと属性ベース認可(ABAC)の実装

Posted at

はじめに

AnthropicのModel Context Protocol (MCP)環境では、LLMエージェントが外部の機密コンテキストやToolにアクセスする際、MCPサーバーが重要な役割を果たします。このサーバーは、エージェントのアクセス権限を管理する 「セキュリティゲートウェイ」 として機能します。

エンタープライズ環境における課題

機密データを扱うエンタープライズ環境では、単なるID認証だけでは不十分です。以下のような高度なアクセス制御が求められます。

  • 誰が: どのLLMエージェントが
  • 何に: どのコンテキストやToolに
  • どのような条件で: どの時間帯、どの機密レベルで
  • 何ができるか: 読み取り、書き込み、実行など

本記事では、これらの要件を満たすための ロールベースアクセス制御(RBAC)属性ベース認可(ABAC) の実装方法を解説します。


1. アクセス制御の基礎:RBACによる役割の限定

1.1. RBACとは

RBAC (Role-Based Access Control)は、アクセス制御の基本的なアプローチです。ユーザーやシステムに 役割(ロール) を割り当て、その役割に基づいて権限を定義します。

RBACの基本原則

  1. ロールの定義: 業務上の役割や責任に応じたロールを定義
  2. 権限の割り当て: 各ロールに必要な権限を付与
  3. ユーザーへの割り当て: ユーザー(LLMエージェント)にロールを割り当て

1.2. LLMエージェントへのロールの割り当て

MCPサーバーは、アクセスを試みるLLMエージェントに対して、その目的や機能に応じたロールを割り当てます。

ロール名 アクセス権限の例 割り当てられるエージェントの例
Financial-Analyst ・財務データベース(四半期報告書)への参照
・予測Toolの実行
・集計データの生成
財務レポート作成エージェント
Customer-Support-L1 ・一般顧客のCRMデータ(PII除く)の参照
・FAQデータベースへの書き込み
・基本的な問い合わせ対応Tool
顧客対応チャットボット(初期対応)
Customer-Support-L2 ・顧客の完全なCRMデータ(PII含む)の参照
・返金処理Toolの実行
・エスカレーション対応
上級カスタマーサポートエージェント
Executive-Read-Only ・経営ダッシュボードデータへの参照のみ
・書き込みToolの実行禁止
・集計レポートの閲覧
経営層向け要約エージェント
Data-Engineer ・データパイプラインの管理
・バッチ処理Toolの実行
・データ品質チェック
データ処理自動化エージェント

1.3. RBACの実装例

# MCPサーバーでのRBAC実装例(疑似コード)
class RBACController:
    def __init__(self):
        self.roles = {
            "Financial-Analyst": {
                "resources": ["financial_db", "forecast_tool"],
                "actions": ["read", "execute"]
            },
            "Customer-Support-L1": {
                "resources": ["crm_public", "faq_db"],
                "actions": ["read", "write"]
            }
        }
    
    def check_access(self, agent_role, resource, action):
        if agent_role not in self.roles:
            return False
        
        role_permissions = self.roles[agent_role]
        return (resource in role_permissions["resources"] and 
                action in role_permissions["actions"])

1.4. RBACの限界

RBACはシンプルで理解しやすい反面、以下のような動的で粒度の細かい条件に対応できません。

  • 時間的制約: 「平日の9時〜17時のみアクセス可能」
  • データ属性に基づく制約: 「顧客のLTV(生涯価値)が100万円以上の場合のみアクセス可能」
  • 環境的制約: 「社内ネットワークからのアクセスのみ許可」
  • コンテキスト依存の制約: 「過去24時間以内に更新されたデータのみアクセス可能」

これらの要件を満たすために、ABACが必要となります。


2. 高度な制御:ABACによる動的な条件付け

2.1. ABACとは

ABAC (Attribute-Based Access Control)は、RBACの限界を克服する高度なアクセス制御モデルです。アクセス要求を行う 主体、リソース、環境、アクション属性(アトリビュート) に基づいて、アクセスをリアルタイムで許可/拒否します。

2.2. ABACの4つの主要属性

MCPサーバーは、以下の4つの属性カテゴリを組み合わせて、きめ細かなアクセス制御ポリシーを定義・実行します。

1. 主体属性 (Subject Attributes)

アクセスを要求するLLMエージェント自身の属性です。

属性名 説明
agent_id エージェントの一意識別子 agent_finance_001
role 割り当てられたロール Financial-Analyst
clearance_level セキュリティクリアランスレベル confidential, secret, top-secret
department 所属部門 finance, legal, hr
tool_capabilities エージェントが持つTool機能 ["forecast", "analyze", "report"]

アクセス制御例: 「Customer-Support-L1ロールで、clearance_levelpublicのエージェントのみアクセス許可」

2. リソース属性 (Resource Attributes)

アクセス対象となるコンテキストやToolの属性です。

属性名 説明
resource_id リソースの一意識別子 context_financial_q4_2024
classification 機密性レベル public, internal, confidential, secret
data_owner データの所有者/管理部門 finance_dept
last_updated 最終更新日時 2025-10-10T14:30:00Z
data_type データの種類 pii, financial, operational
retention_policy データ保持ポリシー 7_years, permanent

アクセス制御例: 「classificationconfidential以上のコンテキストには、clearance_levelconfidential以上のエージェントのみアクセス可能」

3. 環境属性 (Environment Attributes)

アクセス要求が行われる環境の属性です。

属性名 説明
current_time アクセス要求の日時 2025-10-10T15:30:00Z
day_of_week 曜日 monday, tuesday, ...
business_hours 営業時間内かどうか true, false
source_ip アクセス元IPアドレス 192.168.1.100
network_segment ネットワークセグメント internal, dmz, external
geographic_location 地理的位置 jp-tokyo, us-west

アクセス制御例: 「 営業時間外(夜間) には、すべてのエージェントのWriteアクションを禁止」

4. アクション属性 (Action Attributes)

エージェントが実行しようとしている操作の属性です。

属性名 説明
action_type 操作の種類 read, write, delete, execute
tool_name 実行するTool名 forecast_tool, payment_tool
impact_level 操作の影響度 low, medium, high, critical
reversible 操作が可逆的かどうか true, false

アクセス制御例: 「Financial-Analystロールは、execute:forecast_toolは許可するが、execute:payment_toolは禁止」

2.3. ABACポリシーの実装例

ABACポリシーは、これらの属性を組み合わせた論理ルールとして定義されます。

ポリシー例1: カスタマーサポートの基本アクセス

{
  "policy_id": "cs_l1_basic_access",
  "description": "Customer Support L1の基本アクセス権限",
  "effect": "allow",
  "conditions": {
    "AND": [
      {"subject.role": {"equals": "Customer-Support-L1"}},
      {"resource.classification": {"not_equals": "secret"}},
      {"environment.business_hours": {"equals": true}},
      {"action.type": {"in": ["read"]}}
    ]
  }
}

解説: Customer-Support-L1ロールのエージェントは、営業時間内に限り、secretレベル以外のリソースを読み取ることができます。

ポリシー例2: 財務データへの高度なアクセス

{
  "policy_id": "finance_analyst_advanced",
  "description": "財務アナリストの高度なアクセス権限",
  "effect": "allow",
  "conditions": {
    "AND": [
      {"subject.role": {"equals": "Financial-Analyst"}},
      {"subject.clearance_level": {"in": ["confidential", "secret"]}},
      {"resource.data_owner": {"equals": "finance_dept"}},
      {"resource.last_updated": {"within_days": 90}},
      {
        "OR": [
          {"action.type": {"equals": "read"}},
          {
            "AND": [
              {"action.type": {"equals": "execute"}},
              {"action.tool_name": {"in": ["forecast_tool", "analyze_tool"]}}
            ]
          }
        ]
      }
    ]
  }
}

解説: Financial-Analystロールで適切なクリアランスを持つエージェントは、財務部門が所有する90日以内に更新されたデータを読み取り、特定の分析Toolを実行できます。

ポリシー例3: 時間帯による書き込み制限

{
  "policy_id": "night_write_restriction",
  "description": "夜間の書き込み操作を制限",
  "effect": "deny",
  "conditions": {
    "AND": [
      {"environment.business_hours": {"equals": false}},
      {"action.type": {"in": ["write", "delete"]}},
      {"action.impact_level": {"in": ["high", "critical"]}}
    ]
  }
}

解説: 営業時間外は、影響度が高い書き込みや削除操作を禁止します(読み取りは許可)。

2.4. ポリシー評価の優先順位

複数のポリシーが競合する場合、一般的に以下の優先順位で評価されます。

  1. 明示的な拒否(Explicit Deny): 最優先
  2. 明示的な許可(Explicit Allow): 次に優先
  3. デフォルト拒否(Default Deny): 該当するポリシーがない場合

3. MCPサーバーにおけるRBAC/ABACの実装指針

3.1. システムアーキテクチャ

┌─────────────────────────────────────────────────────────┐
│                   LLM Agent                             │
│              アクセス要求の発行                            │
└─────────────────────────────────────────────────────────┘
                            ↓
                  アクセス要求(ID, リソース, アクション)
                            ↓
┌─────────────────────────────────────────────────────────┐
│                  MCP Server                             │
│  ┌───────────────────────────────────────────────┐      │ 
│  │   1. 認証レイヤー (Authentication)              │      │
│  │      - エージェントIDの検証                      │      │
│  │      - DID/VCによる身元確認                     │      │
│  └───────────────────────────────────────────────┘      │
│                            ↓                            │
│  ┌───────────────────────────────────────────────┐      │
│  │   2. 属性収集レイヤー (Attribute Collection)     │      │
│  │      - 主体属性の取得                            │     │
│  │      - リソース属性の取得                         │     │
│  │      - 環境属性の取得                            │     │
│  │      - アクション属性の取得                       │     │
│  └───────────────────────────────────────────────┘     │
│                            ↓                           │
│  ┌───────────────────────────────────────────────┐     │
│  │   3. 認可ポリシーエンジン                         │    │
│  │      (Authorization Policy Engine)            │    │
│  │      例: Open Policy Agent (OPA)               │    │
│  │                                               │    │
│  │      - ポリシーの評価                           │    │
│  │      - 許可/拒否の決定                          │    │
│  └───────────────────────────────────────────────┘    │
│                            ↓                          │
│            許可 (Allow) / 拒否 (Deny)                   │
│                            ↓                          │
│  ┌───────────────────────────────────────────────┐    │
│  │   4. ロギング・監査レイヤー                       │    │
│  │      - 全アクセス要求の記録                       │    │
│  │      - 認可決定の記録                            │    │
│  │      - 属性値の記録                             │    │
│  └───────────────────────────────────────────────┘    │
└───────────────────────────────────────────────────────┘
                            ↓
              許可された場合のみ実行
                            ↓
┌───────────────────────────────────────────────────────┐
│            External Context Source / Tool             │
│                  実際のリソース                          │
└───────────────────────────────────────────────────────┘

3.2. 認可ポリシーエンジンの導入

MCPサーバーのコアに、RBACとABACポリシーを解釈・評価する専用の認可ポリシーエンジンを組み込みます。

推奨ツール: Open Policy Agent (OPA)

Open Policy Agent (OPA)は、汎用的なポリシーエンジンで、以下の特徴があります。

  • 宣言的ポリシー言語: Regoという高水準言語でポリシーを記述
  • 高速評価: マイクロ秒単位でのポリシー評価
  • 統合が容易: RESTful APIでの統合が可能
  • テスト機能: ポリシーのユニットテストをサポート

OPAを使ったABACポリシーの例

package mcp.authorization

import future.keywords.if
import future.keywords.in

# デフォルトは拒否
default allow = false

# Customer Support L1の基本アクセス
allow if {
    input.subject.role == "Customer-Support-L1"
    input.resource.classification != "secret"
    input.environment.business_hours == true
    input.action.type == "read"
}

# 財務アナリストの高度なアクセス
allow if {
    input.subject.role == "Financial-Analyst"
    input.subject.clearance_level in ["confidential", "secret"]
    input.resource.data_owner == "finance_dept"
    resource_is_recent
    allowed_action
}

# 90日以内に更新されたリソースか判定
resource_is_recent if {
    resource_age_days := time.diff_days(
        time.now_ns(),
        time.parse_rfc3339_ns(input.resource.last_updated)
    )
    resource_age_days <= 90
}

# 許可されたアクションか判定
allowed_action if {
    input.action.type == "read"
}

allowed_action if {
    input.action.type == "execute"
    input.action.tool_name in ["forecast_tool", "analyze_tool"]
}

# 夜間の高リスク操作を拒否
deny_night_operations if {
    input.environment.business_hours == false
    input.action.type in ["write", "delete"]
    input.action.impact_level in ["high", "critical"]
}

# 拒否ルールが優先
allow = false if deny_night_operations

3.3. アクセス制御のフロー

  1. アクセス要求の発生: LLMエージェントが、MCPサーバーを通じて外部コンテキストへの参照(Read)やToolの実行を要求

  2. 認証: エージェントのIDを検証(DID/VCによる認証)

  3. 属性の収集: MCPサーバーは、以下の属性を収集

    • 主体属性: エージェントのロール、クリアランスレベル、所属部門など
    • リソース属性: コンテキストの機密性タグ、データオーナー、最終更新日など
    • 環境属性: 現在の時刻、曜日、営業時間、アクセス元IPなど
    • アクション属性: 実行するアクションの種類、Tool名、影響度など
  4. ポリシー評価: 認可ポリシーエンジン(OPA)は、収集された属性と定義されたABACポリシーを照合し、「許可(Allow)」または「拒否(Deny)」の決定を下す

  5. アクセスの実行: 「許可」の場合のみ、MCPサーバーは外部コンテキストソースへのアクセスやToolの実行を仲介

  6. ロギング: すべての決定と属性値を記録

3.4. ロギングと監査の徹底

すべてのアクセス要求と認可決定は、不変の監査証跡として記録する必要があります。

ログに記録すべき情報

{
  "timestamp": "2025-10-10T15:30:45Z",
  "request_id": "req_abc123",
  "decision": "allow",
  "subject": {
    "agent_id": "agent_finance_001",
    "role": "Financial-Analyst",
    "clearance_level": "confidential",
    "department": "finance"
  },
  "resource": {
    "resource_id": "context_financial_q4_2024",
    "classification": "confidential",
    "data_owner": "finance_dept",
    "last_updated": "2025-10-01T10:00:00Z"
  },
  "environment": {
    "current_time": "2025-10-10T15:30:45Z",
    "business_hours": true,
    "source_ip": "192.168.1.100",
    "network_segment": "internal"
  },
  "action": {
    "type": "read",
    "tool_name": null,
    "impact_level": "low"
  },
  "policy_evaluation": {
    "policies_evaluated": ["finance_analyst_advanced", "default_deny"],
    "matched_policy": "finance_analyst_advanced",
    "evaluation_time_ms": 2.3
  }
}

監査の目的

これにより、セキュリティ監査時に以下を明確に追跡できます。

  • アクセスの正当性: なぜこのエージェントがこのデータにアクセスできたのか
  • ポリシーの有効性: どのポリシーが適用されたか
  • 属性の状態: 決定時の各属性の値
  • 異常検知: 通常と異なるアクセスパターンの発見

3.5. 実装のベストプラクティス

1. ポリシーのバージョン管理

ポリシーはコードと同様にバージョン管理システム(Git)で管理します。

mcp-policies/
├── rbac/
│   ├── roles.yaml
│   └── permissions.yaml
├── abac/
│   ├── customer_support.rego
│   ├── financial_analyst.rego
│   └── executive.rego
├── tests/
│   ├── test_customer_support.rego
│   └── test_financial_analyst.rego
└── README.md

2. ポリシーのテスト

すべてのポリシーに対してユニットテストを作成します。

# test_financial_analyst.rego
package mcp.authorization_test

import data.mcp.authorization

# 正常系のテスト
test_financial_analyst_read_allowed {
    authorization.allow with input as {
        "subject": {
            "role": "Financial-Analyst",
            "clearance_level": "confidential"
        },
        "resource": {
            "data_owner": "finance_dept",
            "last_updated": "2025-10-01T10:00:00Z"
        },
        "action": {"type": "read"}
    }
}

# 異常系のテスト
test_financial_analyst_secret_denied {
    not authorization.allow with input as {
        "subject": {
            "role": "Financial-Analyst",
            "clearance_level": "public"
        },
        "resource": {
            "classification": "secret",
            "data_owner": "finance_dept"
        },
        "action": {"type": "read"}
    }
}

3. 段階的なロールアウト

新しいポリシーは、以下の段階で導入します。

  1. 開発環境: ポリシーの動作確認
  2. 監査モード: 実際の決定には影響を与えず、ログのみ記録
  3. 一部ユーザー: 限定的な本番適用
  4. 全体展開: 全エージェントへの適用

4. パフォーマンスの最適化

  • 属性キャッシング: 頻繁に参照される属性をキャッシュ
  • ポリシーの最適化: 評価が重いルールを最後に配置
  • バッチ評価: 複数のアクセス要求をまとめて評価

4. 実装例:PythonでのMCPサーバー統合

4.1. 基本的な統合コード

from typing import Dict, Any
import requests
from datetime import datetime

class MCPAccessController:
    def __init__(self, opa_endpoint: str):
        self.opa_endpoint = opa_endpoint
    
    def check_access(
        self,
        agent_id: str,
        agent_role: str,
        resource_id: str,
        action_type: str,
        additional_context: Dict[str, Any] = None
    ) -> Dict[str, Any]:
        """
        OPAを使ってアクセス権限をチェック
        """
        # 属性の収集
        input_data = {
            "subject": {
                "agent_id": agent_id,
                "role": agent_role,
                "clearance_level": self._get_clearance_level(agent_id),
                "department": self._get_department(agent_id)
            },
            "resource": {
                "resource_id": resource_id,
                "classification": self._get_resource_classification(resource_id),
                "data_owner": self._get_data_owner(resource_id),
                "last_updated": self._get_last_updated(resource_id)
            },
            "environment": {
                "current_time": datetime.utcnow().isoformat(),
                "business_hours": self._is_business_hours(),
                "network_segment": "internal"
            },
            "action": {
                "type": action_type,
                "impact_level": self._get_impact_level(action_type)
            }
        }
        
        # additional_contextがあれば追加
        if additional_context:
            input_data.update(additional_context)
        
        # OPAにポリシー評価を依頼
        response = requests.post(
            f"{self.opa_endpoint}/v1/data/mcp/authorization/allow",
            json={"input": input_data}
        )
        
        result = response.json()
        decision = result.get("result", False)
        
        # ログ記録
        self._log_access_decision(input_data, decision)
        
        return {
            "allowed": decision,
            "input": input_data,
            "timestamp": datetime.utcnow().isoformat()
        }
    
    def _get_clearance_level(self, agent_id: str) -> str:
        # 実際の実装では、エージェントのクリアランスレベルをDBから取得
        return "confidential"
    
    def _get_department(self, agent_id: str) -> str:
        # 実際の実装では、エージェントの部門をDBから取得
        return "finance"
    
    def _get_resource_classification(self, resource_id: str) -> str:
        # 実際の実装では、リソースの機密性レベルをDBから取得
        return "confidential"
    
    def _get_data_owner(self, resource_id: str) -> str:
        # 実際の実装では、リソースのオーナーをDBから取得
        return "finance_dept"
    
    def _get_last_updated(self, resource_id: str) -> str:
        # 実際の実装では、リソースの最終更新日時をDBから取得
        return "2025-10-01T10:00:00Z"
    
    def _is_business_hours(self) -> bool:
        # 実際の実装では、現在時刻が営業時間内かをチェック
        now = datetime.utcnow()
        return 9 <= now.hour < 17 and now.weekday() < 5
    
    def _get_impact_level(self, action_type: str) -> str:
        # アクションの種類に応じた影響度を返す
        impact_map = {
            "read": "low",
            "write": "medium",
            "delete": "high",
            "execute": "medium"
        }
        return impact_map.get(action_type, "low")
    
    def _log_access_decision(self, input_data: Dict, decision: bool):
        # 実際の実装では、監査ログをDBやファイルに記録
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "decision": "allow" if decision else "deny",
            "input": input_data
        }
        print(f"[AUDIT] {log_entry}")


# 使用例
if __name__ == "__main__":
    controller = MCPAccessController(opa_endpoint="http://localhost:8181")
    
    # 財務アナリストエージェントのアクセスチェック
    result = controller.check_access(
        agent_id="agent_finance_001",
        agent_role="Financial-Analyst",
        resource_id="context_financial_q4_2024",
        action_type="read"
    )
    
    if result["allowed"]:
        print("アクセスが許可されました")
        # 実際のリソースへのアクセスを実行
    else:
        print("アクセスが拒否されました")

4.2. MCPサーバーへの統合

from mcp.server import Server
from mcp.server.models import InitializationOptions
import mcp.types as types

class SecureMCPServer:
    def __init__(self, access_controller: MCPAccessController):
        self.server = Server("secure-mcp-server")
        self.access_controller = access_controller
        self._setup_handlers()
    
    def _setup_handlers(self):
        @self.server.list_resources()
        async def handle_list_resources() -> list[types.Resource]:
            # リソース一覧を返す前にアクセスチェック
            agent_context = self._get_current_agent_context()
            
            result = self.access_controller.check_access(
                agent_id=agent_context["agent_id"],
                agent_role=agent_context["role"],
                resource_id="resource_list",
                action_type="read"
            )
            
            if not result["allowed"]:
                raise PermissionError("リソース一覧へのアクセスが拒否されました")
            
            # 許可された場合、リソース一覧を返す
            return [
                types.Resource(
                    uri="context://financial/q4_2024",
                    name="Financial Q4 2024",
                    description="2024年第4四半期の財務データ"
                )
            ]
        
        @self.server.read_resource()
        async def handle_read_resource(uri: str) -> str:
            # リソースを読む前にアクセスチェック
            agent_context = self._get_current_agent_context()
            
            result = self.access_controller.check_access(
                agent_id=agent_context["agent_id"],
                agent_role=agent_context["role"],
                resource_id=uri,
                action_type="read"
            )
            
            if not result["allowed"]:
                raise PermissionError(f"リソース {uri} へのアクセスが拒否されました")
            
            # 許可された場合、実際のリソースを返す
            return self._fetch_resource(uri)
        
        @self.server.call_tool()
        async def handle_call_tool(
            name: str, 
            arguments: dict
        ) -> list[types.TextContent]:
            # Toolを実行する前にアクセスチェック
            agent_context = self._get_current_agent_context()
            
            result = self.access_controller.check_access(
                agent_id=agent_context["agent_id"],
                agent_role=agent_context["role"],
                resource_id=f"tool:{name}",
                action_type="execute",
                additional_context={
                    "action": {
                        "tool_name": name,
                        "type": "execute"
                    }
                }
            )
            
            if not result["allowed"]:
                raise PermissionError(f"Tool {name} の実行が拒否されました")
            
            # 許可された場合、実際にToolを実行
            tool_result = self._execute_tool(name, arguments)
            
            return [
                types.TextContent(
                    type="text",
                    text=tool_result
                )
            ]
    
    def _get_current_agent_context(self) -> Dict[str, str]:
        # 実際の実装では、リクエストヘッダーやセッション情報から
        # エージェントのコンテキストを取得
        return {
            "agent_id": "agent_finance_001",
            "role": "Financial-Analyst"
        }
    
    def _fetch_resource(self, uri: str) -> str:
        # 実際の実装では、URIに基づいてリソースを取得
        return f"Content of {uri}"
    
    def _execute_tool(self, name: str, arguments: dict) -> str:
        # 実際の実装では、Toolを実行して結果を返す
        return f"Result of {name} with {arguments}"

5. 高度な活用シナリオ

5.1. 動的なロール昇格

特定の条件下で、一時的にエージェントのロールを昇格させることができます。

class DynamicRoleElevation:
    def elevate_role_if_needed(
        self,
        agent_id: str,
        current_role: str,
        requested_resource: str,
        justification: str
    ) -> str:
        """
        必要に応じてロールを一時的に昇格
        """
        # 上位管理者の承認が必要な場合
        if self._requires_approval(requested_resource):
            approval = self._request_approval(
                agent_id=agent_id,
                resource=requested_resource,
                justification=justification
            )
            
            if approval["approved"]:
                # 一時的に昇格したロールを返す
                elevated_role = approval["elevated_role"]
                self._log_role_elevation(agent_id, current_role, elevated_role)
                return elevated_role
        
        return current_role

5.2. リスクベースの適応的アクセス制御

アクセスパターンに基づいてリスクスコアを計算し、動的に制御を調整します。

class RiskBasedAccessControl:
    def calculate_risk_score(
        self,
        agent_id: str,
        resource_id: str,
        action_type: str
    ) -> float:
        """
        アクセスのリスクスコアを計算(0.0-1.0)
        """
        risk_score = 0.0
        
        # 1. 異常な時間帯のアクセス
        if not self._is_typical_access_time(agent_id):
            risk_score += 0.2
        
        # 2. 通常と異なるリソースへのアクセス
        if not self._is_typical_resource(agent_id, resource_id):
            risk_score += 0.3
        
        # 3. 高頻度アクセス
        if self._is_high_frequency_access(agent_id):
            risk_score += 0.2
        
        # 4. 機密度の高いリソース
        if self._is_sensitive_resource(resource_id):
            risk_score += 0.3
        
        return min(risk_score, 1.0)
    
    def apply_risk_based_controls(
        self,
        risk_score: float,
        access_request: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        リスクスコアに基づいて追加の制御を適用
        """
        controls = {
            "allowed": True,
            "additional_requirements": []
        }
        
        if risk_score > 0.7:
            # 高リスク: MFA必須
            controls["additional_requirements"].append("mfa_required")
            controls["additional_requirements"].append("manager_approval")
        elif risk_score > 0.4:
            # 中リスク: 追加の認証
            controls["additional_requirements"].append("additional_auth")
        
        return controls

5.3. コンテキスト依存のデータマスキング

アクセス権限に応じて、返されるデータを動的にマスキングします。

class ContextualDataMasking:
    def apply_masking(
        self,
        data: Dict[str, Any],
        agent_clearance: str,
        resource_classification: str
    ) -> Dict[str, Any]:
        """
        クリアランスレベルに応じてデータをマスキング
        """
        masked_data = data.copy()
        
        # PIIフィールドの定義
        pii_fields = ["email", "phone", "ssn", "address"]
        
        # クリアランスが不十分な場合はマスキング
        if not self._has_sufficient_clearance(
            agent_clearance, 
            resource_classification
        ):
            for field in pii_fields:
                if field in masked_data:
                    masked_data[field] = self._mask_value(
                        masked_data[field], 
                        field
                    )
        
        return masked_data
    
    def _mask_value(self, value: str, field_type: str) -> str:
        """
        値をマスキング
        """
        if field_type == "email":
            parts = value.split("@")
            return f"{parts[0][:2]}***@{parts[1]}"
        elif field_type == "phone":
            return f"***-***-{value[-4:]}"
        elif field_type == "ssn":
            return f"***-**-{value[-4:]}"
        else:
            return "***MASKED***"

6. 運用とモニタリング

6.1. アクセス制御のメトリクス

監視すべき重要なメトリクス:

メトリクス名 説明 アラート条件
拒否率 全アクセス要求に対する拒否の割合 30%以上
ポリシー評価時間 1回のポリシー評価にかかる平均時間 10ms以上
異常なアクセスパターン 通常と異なるアクセスパターンの検出数 1日10件以上
ロール昇格の頻度 一時的なロール昇格の回数 1日5回以上
夜間アクセス数 営業時間外のアクセス試行回数 1日20回以上

6.2. アラートと対応

class AccessControlMonitoring:
    def check_anomalies(self, access_logs: list) -> list:
        """
        アクセスログから異常を検出
        """
        anomalies = []
        
        # 1. 短時間での大量アクセス
        if self._detect_high_frequency_access(access_logs):
            anomalies.append({
                "type": "high_frequency_access",
                "severity": "high",
                "action": "rate_limit"
            })
        
        # 2. 異常な時間帯のアクセス
        if self._detect_unusual_time_access(access_logs):
            anomalies.append({
                "type": "unusual_time_access",
                "severity": "medium",
                "action": "additional_verification"
            })
        
        # 3. 通常と異なるリソースへのアクセス
        if self._detect_unusual_resource_access(access_logs):
            anomalies.append({
                "type": "unusual_resource_access",
                "severity": "medium",
                "action": "alert_security_team"
            })
        
        return anomalies

6.3. ポリシーの定期的な見直し

  • 月次レビュー: アクセスログを分析し、ポリシーの有効性を評価
  • 四半期監査: 外部監査人によるポリシーの妥当性チェック
  • 年次更新: ビジネス要件の変化に応じたポリシーの全体的な見直し

7. まとめ

7.1. RBACとABACの比較まとめ

特徴 RBAC ABAC
複雑さ シンプル 複雑
柔軟性 低い 高い
管理負荷 ロールの数が増えると増大 ポリシーの記述が必要
動的制御 不可 可能
適用シーン 基本的なアクセス制御 高度で動的な制御が必要な場合
実装コスト 低い 高い

7.2. 実装の推奨アプローチ

エンタープライズMCP環境では、RBACとABACのハイブリッドアプローチを推奨します。

  1. Phase 1: RBAC導入 (1-2ヶ月)

    • 基本的なロール定義
    • 静的な権限管理
    • シンプルな運用から開始
  2. Phase 2: ABAC追加 (3-6ヶ月)

    • 動的な条件の追加
    • リスクベースの制御
    • 細粒度のアクセス管理
  3. Phase 3: 高度な機能 (6ヶ月以降)

    • 機械学習による異常検知
    • 適応的アクセス制御
    • ゼロトラストアーキテクチャへの統合

7.3. 得られる効果

MCPサーバーにRBACとABACを実装することで、企業は以下を実現できます。

効果 説明
セキュリティ強化 機密データへの不正アクセスを防止
コンプライアンス対応 GDPR、SOC2等の規制要件を満たす監査証跡
柔軟な権限管理 ビジネス要件の変化に迅速に対応
リスク低減 動的な制御により、リアルタイムで脅威に対応
運用効率化 ポリシーの自動評価により、手動承認を削減
透明性の向上 すべてのアクセスが記録され、監査可能

7.4. 次のステップ

  1. 現状分析: 既存のアクセス制御の課題を特定
  2. 要件定義: 必要なロールと属性を洗い出し
  3. PoC実施: 小規模な環境でRBAC/ABACを試験導入
  4. 段階的展開: 部門ごとに順次展開
  5. 継続的改善: アクセスログを分析し、ポリシーを最適化

参考資料

推奨ツールとフレームワーク

  • Open Policy Agent (OPA): 汎用ポリシーエンジン
  • Casbin: マルチ言語対応のアクセス制御ライブラリ
  • AWS IAM: クラウド環境でのアクセス制御の参考実装
  • XACML: ABACの標準仕様

関連標準

  • NIST SP 800-162: Attribute Based Access Control (ABAC) Definition and Considerations
  • NIST SP 800-207: Zero Trust Architecture
  • ISO/IEC 27001: 情報セキュリティマネジメントシステム

MCPサーバーにRBACとABACを実装することで、LLMエージェントの能力を最大限に活用しつつ、エンタープライズグレードのセキュリティと、粒度が高く動的で監査可能なアクセス制御を実現できます。


注意: MCPはAnthropicが開発した比較的新しいプロトコルです。最新の情報については、公式ドキュメントを参照してください。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?