はじめに
この記事では、FastAPIとPyPSRPライブラリを使用してWindows Active Directoryをリモートで管理するシステムの実装方法について詳しく解説します。特に、AD接続の管理方法とPowerShellを実行する仕組みの設計について説明します。
プロジェクト概要
このシステムは以下の主要コンポーネントで構成されています。
- ad_connection.py: WinRM接続とセッションプールの管理
- ad_handler.py: PowerShellコマンドの実行とAD操作の処理
- main.py: FastAPI アプリケーションのエントリーポイント
技術スタック
# requirements.txt
fastapi
uvicorn[standard]
pypsrp
pydantic
python-dotenv
1. AD接続管理クラス(ad_connection.py)
設計思想
Active Directory サーバーへの接続は、以下の要件を満たす必要があります:
- 接続プールの管理: 複数の同時リクエストに対応
- 自動再接続: 接続が失われた場合の自動回復
- 環境変数による設定: セキュリティを考慮した認証情報管理
実装詳細
class ADConnectionManager:
"""Active Directory接続とRunspacePoolを管理するクラス"""
def __init__(self):
load_dotenv()
self.ad_server = os.getenv("AD_SERVER")
self.ad_username = os.getenv("AD_USERNAME")
self.ad_password = os.getenv("AD_PASSWORD")
self.pool: Optional[RunspacePool] = None
def connect(self) -> RunspacePool:
"""ADサーバーへの接続を確立し、Runspaceプールを作成"""
try:
# HTTP(ポート5985)経由で接続 - HTTPSが利用できないサーバー用
wsman = WSMan(
self.ad_server,
username=self.ad_username,
password=self.ad_password,
auth="ntlm", # Windows認証
ssl=False, # HTTPを使用
port=5985, # WinRM HTTPポート
cert_validation=False, # 証明書検証無効
connection_timeout=30,
read_timeout=60
)
self.pool = RunspacePool(wsman, min_runspaces=1, max_runspaces=5)
self.pool.open()
return self.pool
except Exception as e:
self.pool = None
raise RuntimeError(f"ADサーバーへの接続に失敗: {e}")
接続プール管理の利点
- パフォーマンス向上: 接続の再利用により初期化コストを削減
- リソース効率: 最小・最大runspace数の制御
- エラーハンドリング: 接続失敗時の適切な例外処理
2. ADハンドラークラス(ad_handler.py)
PowerShellコマンド実行の抽象化
ADハンドラーは、PowerShellコマンドの実行を抽象化し、以下の機能を提供します:
class PowerShellCommandRequest(BaseModel):
"""PowerShellコマンドリクエストモデル"""
command: str
timeout: int = 60
load_ad_module: bool = True # ADモジュールの自動ロード
format_as_json: bool = True # JSON形式での出力
json_depth: int = 2 # JSON変換の深度
エラーハンドリングと診断
def test_connection(self) -> Dict[str, Any]:
"""PowerShell実行とADモジュールの可用性をテスト"""
try:
# 基本的なPowerShell実行テスト
ps = PowerShell(self.pool)
ps.add_script("Get-Date")
output = ps.invoke()
# ユーザー識別テスト
ps2 = PowerShell(self.pool)
ps2.add_script("whoami")
whoami_output = ps2.invoke()
# ADモジュール可用性テスト
ps3 = PowerShell(self.pool)
ps3.add_script("Get-Module -ListAvailable -Name ActiveDirectory")
ad_module_output = ps3.invoke()
return {
"status": "success",
"tests": {
"basic_powershell": {...},
"user_identity": {...},
"ad_module_available": {...}
}
}
安全なPowerShellコマンド実行
def execute_powershell_command(self, command_request: PowerShellCommandRequest) -> Dict[str, Any]:
"""リモートでPowerShellコマンドを安全に実行"""
# コマンドの検証とサニタイズ
command = command_request.command.strip()
if not command:
raise Exception("コマンドが空です")
# 問題のある文字をチェック
problematic_chars = ['"', "'", "`", "\n", "\r"]
if any(char in command for char in problematic_chars):
print(f"警告: コマンドに問題のある文字が含まれています: {command}")
try:
# ADモジュールのロード(必要に応じて)
if command_request.load_ad_module:
ps_load = PowerShell(self.pool)
ps_load.add_script("Import-Module ActiveDirectory -ErrorAction SilentlyContinue")
ps_load.invoke()
# コマンドの実行
ps = PowerShell(self.pool)
ps.add_script(command)
output = ps.invoke()
# すべてのストリーム出力を収集
result_data = {
"command": command,
"success": not ps.had_errors,
"output": [str(item) for item in output] if output else [],
"errors": [str(error) for error in ps.streams.error] if ps.had_errors else [],
"warnings": [str(warning) for warning in ps.streams.warning] if hasattr(ps.streams, 'warning') and ps.streams.warning else [],
"verbose": [str(verbose) for verbose in ps.streams.verbose] if hasattr(ps.streams, 'verbose') and ps.streams.verbose else [],
"debug": [str(debug) for debug in ps.streams.debug] if hasattr(ps.streams, 'debug') and ps.streams.debug else []
}
return {
"status": "success" if not ps.had_errors else "completed_with_errors",
"data": result_data
}
3. FastAPI統合(main.py)
アプリケーションライフサイクル管理
@app.on_event("startup")
def startup_event():
"""起動時にAD接続を初期化"""
try:
ad_connection_manager.connect()
print("FastAPI AD Manager started successfully!")
except Exception as e:
raise RuntimeError(f"Failed to start AD Manager: {e}")
@app.on_event("shutdown")
def shutdown_event():
"""シャットダウン時にAD接続をクリーンアップ"""
ad_connection_manager.disconnect()
print("FastAPI AD Manager shutdown complete!")
API エンドポイント設計
@app.post("/powershell/execute")
def execute_powershell_command(command_request: PowerShellCommandRequest):
"""PowerShellコマンドをリモートで実行"""
try:
handler = _get_ad_handler()
result = handler.execute_powershell_command(command_request)
return result
except HTTPException:
raise
except Exception as e:
if "WinRMTransportError" in str(type(e)) or "Bad HTTP response" in str(e):
raise HTTPException(status_code=500, detail=str(e))
raise HTTPException(status_code=500, detail=f"PowerShell execution failed: {str(e)}")
実装のポイント
1. セキュリティ考慮事項
-
環境変数: 認証情報を
.env
ファイルで管理 - NTLM認証: Windows統合認証の使用
- コマンド検証: 危険な文字の検出とログ出力
2. エラーハンドリング戦略
- 階層化されたエラー処理: 接続レベル、実行レベル、アプリケーションレベル
- 詳細な診断情報: すべてのPowerShellストリームの収集
- 適切なHTTPステータスコード: FastAPIの例外処理との連携
3. パフォーマンス最適化
- 接続プール: runspaceの再利用
- 非同期対応: FastAPIの非同期処理との統合可能性
- タイムアウト制御: 長時間実行コマンドへの対応
セットアップと使用方法
1. 環境設定
# .env ファイル
AD_SERVER=your-ad-server.domain.com
AD_USERNAME=your-username
AD_PASSWORD=your-password
2. インストールと起動
pip install -r requirements.txt
uvicorn main:app --host 0.0.0.0 --port 8002
3. 使用例
# 接続テスト
curl http://localhost:8002/connection/info
# PowerShellコマンド実行
curl -X POST http://localhost:8002/powershell/execute \
-H "Content-Type: application/json" \
-d '{"command": "Get-ADUser -Filter * | Select-Object Name, Surname, FirstName | ConvertTo-Json"}'
まとめ
この実装パターンは、以下の利点を提供します:
- モジュラー設計: 接続管理とコマンド実行の分離
- エラー回復力: 自動再接続とエラーハンドリング
- 拡張性: 新しいAD操作の容易な追加
- 保守性: 明確な責任分離と構造化されたコード
このアーキテクチャを使用することで、安全で効率的なActive Directory管理APIを構築できます。