1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FastAPIとPyPSRPを使ったActive Directory管理システムの構築

Last updated at Posted at 2025-08-18

はじめに

この記事では、FastAPIとPyPSRPライブラリを使用してWindows Active Directoryをリモートで管理するシステムの実装方法について詳しく解説します。特に、AD接続の管理方法とPowerShellを実行する仕組みの設計について説明します。

プロジェクト概要

このシステムは以下の主要コンポーネントで構成されています。

  • ad_connection.py: WinRM接続とセッションプールの管理
  • ad_handler.py: PowerShellコマンドの実行とAD操作の処理
  • main.py: FastAPI アプリケーションのエントリーポイント

技術スタック

# requirements.txt
fastapi
uvicorn[standard]
pypsrp
pydantic
python-dotenv

1. AD接続管理クラス(ad_connection.py)

設計思想

Active Directory サーバーへの接続は、以下の要件を満たす必要があります:

  • 接続プールの管理: 複数の同時リクエストに対応
  • 自動再接続: 接続が失われた場合の自動回復
  • 環境変数による設定: セキュリティを考慮した認証情報管理

実装詳細

class ADConnectionManager:
    """Active Directory接続とRunspacePoolを管理するクラス"""
    
    def __init__(self):
        load_dotenv()
        self.ad_server = os.getenv("AD_SERVER")
        self.ad_username = os.getenv("AD_USERNAME") 
        self.ad_password = os.getenv("AD_PASSWORD")
        self.pool: Optional[RunspacePool] = None

    def connect(self) -> RunspacePool:
        """ADサーバーへの接続を確立し、Runspaceプールを作成"""
        try:
            # HTTP(ポート5985)経由で接続 - HTTPSが利用できないサーバー用
            wsman = WSMan(
                self.ad_server,
                username=self.ad_username,
                password=self.ad_password,
                auth="ntlm",           # Windows認証
                ssl=False,             # HTTPを使用
                port=5985,            # WinRM HTTPポート
                cert_validation=False, # 証明書検証無効
                connection_timeout=30,
                read_timeout=60
            )
            
            self.pool = RunspacePool(wsman, min_runspaces=1, max_runspaces=5)
            self.pool.open()
            return self.pool
            
        except Exception as e:
            self.pool = None
            raise RuntimeError(f"ADサーバーへの接続に失敗: {e}")

接続プール管理の利点

  1. パフォーマンス向上: 接続の再利用により初期化コストを削減
  2. リソース効率: 最小・最大runspace数の制御
  3. エラーハンドリング: 接続失敗時の適切な例外処理

2. ADハンドラークラス(ad_handler.py)

PowerShellコマンド実行の抽象化

ADハンドラーは、PowerShellコマンドの実行を抽象化し、以下の機能を提供します:

class PowerShellCommandRequest(BaseModel):
    """PowerShellコマンドリクエストモデル"""
    command: str
    timeout: int = 60
    load_ad_module: bool = True      # ADモジュールの自動ロード
    format_as_json: bool = True      # JSON形式での出力
    json_depth: int = 2              # JSON変換の深度

エラーハンドリングと診断

def test_connection(self) -> Dict[str, Any]:
    """PowerShell実行とADモジュールの可用性をテスト"""
    try:
        # 基本的なPowerShell実行テスト
        ps = PowerShell(self.pool)
        ps.add_script("Get-Date")
        output = ps.invoke()
        
        # ユーザー識別テスト
        ps2 = PowerShell(self.pool)
        ps2.add_script("whoami")
        whoami_output = ps2.invoke()
        
        # ADモジュール可用性テスト
        ps3 = PowerShell(self.pool)
        ps3.add_script("Get-Module -ListAvailable -Name ActiveDirectory")
        ad_module_output = ps3.invoke()
        
        return {
            "status": "success",
            "tests": {
                "basic_powershell": {...},
                "user_identity": {...},
                "ad_module_available": {...}
            }
        }

安全なPowerShellコマンド実行

def execute_powershell_command(self, command_request: PowerShellCommandRequest) -> Dict[str, Any]:
    """リモートでPowerShellコマンドを安全に実行"""
    
    # コマンドの検証とサニタイズ
    command = command_request.command.strip()
    if not command:
        raise Exception("コマンドが空です")
    
    # 問題のある文字をチェック
    problematic_chars = ['"', "'", "`", "\n", "\r"]
    if any(char in command for char in problematic_chars):
        print(f"警告: コマンドに問題のある文字が含まれています: {command}")
    
    try:
        # ADモジュールのロード(必要に応じて)
        if command_request.load_ad_module:
            ps_load = PowerShell(self.pool)
            ps_load.add_script("Import-Module ActiveDirectory -ErrorAction SilentlyContinue")
            ps_load.invoke()
        
        # コマンドの実行
        ps = PowerShell(self.pool)
        ps.add_script(command)
        output = ps.invoke()
        
        # すべてのストリーム出力を収集
        result_data = {
            "command": command,
            "success": not ps.had_errors,
            "output": [str(item) for item in output] if output else [],
            "errors": [str(error) for error in ps.streams.error] if ps.had_errors else [],
            "warnings": [str(warning) for warning in ps.streams.warning] if hasattr(ps.streams, 'warning') and ps.streams.warning else [],
            "verbose": [str(verbose) for verbose in ps.streams.verbose] if hasattr(ps.streams, 'verbose') and ps.streams.verbose else [],
            "debug": [str(debug) for debug in ps.streams.debug] if hasattr(ps.streams, 'debug') and ps.streams.debug else []
        }
        
        return {
            "status": "success" if not ps.had_errors else "completed_with_errors",
            "data": result_data
        }

3. FastAPI統合(main.py)

アプリケーションライフサイクル管理

@app.on_event("startup")
def startup_event():
    """起動時にAD接続を初期化"""
    try:
        ad_connection_manager.connect()
        print("FastAPI AD Manager started successfully!")
    except Exception as e:
        raise RuntimeError(f"Failed to start AD Manager: {e}")

@app.on_event("shutdown")  
def shutdown_event():
    """シャットダウン時にAD接続をクリーンアップ"""
    ad_connection_manager.disconnect()
    print("FastAPI AD Manager shutdown complete!")

API エンドポイント設計

@app.post("/powershell/execute")
def execute_powershell_command(command_request: PowerShellCommandRequest):
    """PowerShellコマンドをリモートで実行"""
    try:
        handler = _get_ad_handler()
        result = handler.execute_powershell_command(command_request)
        return result
    except HTTPException:
        raise
    except Exception as e:
        if "WinRMTransportError" in str(type(e)) or "Bad HTTP response" in str(e):
            raise HTTPException(status_code=500, detail=str(e))
        raise HTTPException(status_code=500, detail=f"PowerShell execution failed: {str(e)}")

実装のポイント

1. セキュリティ考慮事項

  • 環境変数: 認証情報を.envファイルで管理
  • NTLM認証: Windows統合認証の使用
  • コマンド検証: 危険な文字の検出とログ出力

2. エラーハンドリング戦略

  • 階層化されたエラー処理: 接続レベル、実行レベル、アプリケーションレベル
  • 詳細な診断情報: すべてのPowerShellストリームの収集
  • 適切なHTTPステータスコード: FastAPIの例外処理との連携

3. パフォーマンス最適化

  • 接続プール: runspaceの再利用
  • 非同期対応: FastAPIの非同期処理との統合可能性
  • タイムアウト制御: 長時間実行コマンドへの対応

セットアップと使用方法

1. 環境設定

# .env ファイル
AD_SERVER=your-ad-server.domain.com
AD_USERNAME=your-username
AD_PASSWORD=your-password

2. インストールと起動

pip install -r requirements.txt
uvicorn main:app --host 0.0.0.0 --port 8002

3. 使用例

# 接続テスト
curl http://localhost:8002/connection/info

# PowerShellコマンド実行
curl -X POST http://localhost:8002/powershell/execute \
  -H "Content-Type: application/json" \
  -d '{"command": "Get-ADUser -Filter * | Select-Object Name, Surname, FirstName | ConvertTo-Json"}'

image.png

まとめ

この実装パターンは、以下の利点を提供します:

  1. モジュラー設計: 接続管理とコマンド実行の分離
  2. エラー回復力: 自動再接続とエラーハンドリング
  3. 拡張性: 新しいAD操作の容易な追加
  4. 保守性: 明確な責任分離と構造化されたコード

このアーキテクチャを使用することで、安全で効率的なActive Directory管理APIを構築できます。

参考資料

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?