はじめに
こんにちは!前回はCDKでのスタック分割とデッドロック回避について解説しました。今回は、いよいよ実際の通信部分の実装に入ります!
Public VPCからIsolated VPCへの通信を実現するために、プロキシサーバーとVPC Endpointをどう組み合わせるか、実際のコード例とともに詳しく解説していきます。
「理論はわかったけど、実際にどうコードを書けばいいの?」という疑問にお答えする、実践的な内容です😊
今回の内容
- Public VPC側のプロキシサーバー詳細設計
- FastAPIを使ったマイクロサービス一覧管理
- VPC Endpoint設定とセキュリティ考慮事項
- ALBのパスルーティング設定
- ヘルスチェックとエラーハンドリング
- 実際に動くコード例とデプロイ手順
アーキテクチャおさらい
Public VPC側: プロキシサーバーの実装
1. FastAPIベースのプロキシサーバー
まずは、マイクロサービス群への入り口となるプロキシサーバーを実装します。
🐍 app/main.py - メインアプリケーション
from fastapi import FastAPI, HTTPException, Request, Depends
from fastapi.responses import JSONResponse
import httpx
import asyncio
from typing import Dict, List, Optional
import logging
import os
from datetime import datetime, timedelta
import json
# ログ設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="Microservice Proxy Gateway",
description="Private Link経由でマイクロサービスにアクセスするプロキシサーバー",
version="1.0.0"
)
# 設定
VPC_ENDPOINT_DNS = os.getenv("VPC_ENDPOINT_DNS", "vpce-xxxxxxxxx-xxxxxxxx.execute-api.ap-northeast-1.vpce.amazonaws.com")
HEALTH_CHECK_INTERVAL = int(os.getenv("HEALTH_CHECK_INTERVAL", "30"))
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30"))
# マイクロサービス設定
MICROSERVICES = {
"users": {
"name": "User Service",
"path": "/users",
"port": 3000,
"health_endpoint": "/health",
"version": "v1"
},
"orders": {
"name": "Order Service",
"path": "/orders",
"port": 3001,
"health_endpoint": "/health",
"version": "v1"
},
"payments": {
"name": "Payment Service",
"path": "/payments",
"port": 3002,
"health_endpoint": "/health",
"version": "v1"
}
}
# サービス健康状態管理
service_health_status = {}
last_health_check = {}
class ServiceRegistry:
"""マイクロサービス一覧管理クラス"""
def __init__(self):
self.services = MICROSERVICES.copy()
self.client = httpx.AsyncClient(timeout=REQUEST_TIMEOUT)
async def get_service_url(self, service_name: str) -> str:
"""サービスのURLを構築"""
if service_name not in self.services:
raise HTTPException(status_code=404, detail=f"Service {service_name} not found")
service = self.services[service_name]
# VPC Endpoint経由でIsolated VPCのALBにアクセス
return f"http://{VPC_ENDPOINT_DNS}:{service['port']}"
async def check_service_health(self, service_name: str) -> bool:
"""個別サービスのヘルスチェック"""
try:
service = self.services[service_name]
url = await self.get_service_url(service_name)
health_url = f"{url}{service['health_endpoint']}"
async with httpx.AsyncClient(timeout=5) as client:
response = await client.get(health_url)
is_healthy = response.status_code == 200
# ヘルス状態を更新
service_health_status[service_name] = {
"healthy": is_healthy,
"last_check": datetime.now(),
"response_code": response.status_code
}
logger.info(f"Health check for {service_name}: {'OK' if is_healthy else 'FAIL'}")
return is_healthy
except Exception as e:
logger.error(f"Health check failed for {service_name}: {str(e)}")
service_health_status[service_name] = {
"healthy": False,
"last_check": datetime.now(),
"error": str(e)
}
return False
async def get_healthy_services(self) -> List[str]:
"""健康なサービス一覧を取得"""
healthy_services = []
for service_name in self.services.keys():
if service_health_status.get(service_name, {}).get("healthy", False):
healthy_services.append(service_name)
return healthy_services
# サービスレジストリのインスタンス
registry = ServiceRegistry()
@app.on_event("startup")
async def startup_event():
"""アプリケーション起動時の初期化"""
logger.info("Starting Microservice Proxy Gateway...")
# 初回ヘルスチェック実行
await perform_health_checks()
# 定期ヘルスチェックをバックグラウンドで開始
asyncio.create_task(periodic_health_check())
async def perform_health_checks():
"""全サービスのヘルスチェックを実行"""
tasks = []
for service_name in MICROSERVICES.keys():
tasks.append(registry.check_service_health(service_name))
await asyncio.gather(*tasks, return_exceptions=True)
async def periodic_health_check():
"""定期的なヘルスチェック"""
while True:
try:
await asyncio.sleep(HEALTH_CHECK_INTERVAL)
await perform_health_checks()
except Exception as e:
logger.error(f"Periodic health check error: {str(e)}")
# API エンドポイント
@app.get("/")
async def root():
"""ルートエンドポイント - サービス一覧表示"""
healthy_services = await registry.get_healthy_services()
return {
"message": "Microservice Proxy Gateway",
"vpc_endpoint": VPC_ENDPOINT_DNS,
"available_services": len(healthy_services),
"services": {
name: {
**MICROSERVICES[name],
"status": "healthy" if name in healthy_services else "unhealthy",
"last_check": service_health_status.get(name, {}).get("last_check")
}
for name in MICROSERVICES.keys()
},
"endpoints": [f"/api/{name}" for name in MICROSERVICES.keys()]
}
@app.get("/health")
async def health_check():
"""プロキシサーバー自体のヘルスチェック"""
healthy_services = await registry.get_healthy_services()
total_services = len(MICROSERVICES)
is_healthy = len(healthy_services) > 0
return {
"status": "healthy" if is_healthy else "degraded",
"timestamp": datetime.now(),
"services": {
"total": total_services,
"healthy": len(healthy_services),
"unhealthy": total_services - len(healthy_services)
},
"details": service_health_status
}
@app.api_route("/api/{service_name}/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy_request(service_name: str, path: str, request: Request):
"""マイクロサービスへのプロキシリクエスト"""
# サービスが存在するかチェック
if service_name not in MICROSERVICES:
raise HTTPException(status_code=404, detail=f"Service '{service_name}' not found")
# サービスが健康かチェック
if not service_health_status.get(service_name, {}).get("healthy", False):
raise HTTPException(
status_code=503,
detail=f"Service '{service_name}' is currently unhealthy"
)
try:
# リクエストデータを取得
body = await request.body()
headers = dict(request.headers)
# 不要なヘッダーを削除
headers.pop("host", None)
headers.pop("content-length", None)
# 転送先URLを構築
target_url = await registry.get_service_url(service_name)
full_url = f"{target_url}/{path}"
# クエリパラメータを含める
if request.url.query:
full_url += f"?{request.url.query}"
logger.info(f"Proxying {request.method} {full_url}")
# HTTPクライアントでリクエストを転送
async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT) as client:
response = await client.request(
method=request.method,
url=full_url,
headers=headers,
content=body
)
# レスポンスを返す
return JSONResponse(
content=response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text,
status_code=response.status_code,
headers=dict(response.headers)
)
except httpx.TimeoutException:
logger.error(f"Timeout accessing {service_name}")
raise HTTPException(status_code=504, detail=f"Service '{service_name}' timeout")
except Exception as e:
logger.error(f"Error proxying request to {service_name}: {str(e)}")
raise HTTPException(status_code=502, detail=f"Bad gateway: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
2. Docker設定
🐳 Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 依存関係ファイルをコピー
COPY requirements.txt .
# 依存関係をインストール
RUN pip install --no-cache-dir -r requirements.txt
# アプリケーションコードをコピー
COPY app/ ./app/
# ポート8000を公開
EXPOSE 8000
# ヘルスチェック設定
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# アプリケーション実行
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
📦 requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
httpx==0.25.2
pydantic==2.5.1
python-multipart==0.0.6
VPC Endpoint設定の詳細
1. CDKでのVPC Endpoint構成
🏗️ VPC Endpoint用のCDK Stack
// lib/vpc-endpoint-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as elbv2 from 'aws-cdk-lib/aws-elasticloadbalancingv2';
import * as ssm from 'aws-cdk-lib/aws-ssm';
import { Construct } from 'constructs';
export interface VpcEndpointStackProps extends cdk.StackProps {
environment: 'public' | 'isolated';
vpcId: string;
subnetIds: string[];
}
export class VpcEndpointStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: VpcEndpointStackProps) {
super(scope, id, props);
// VPCを取得
const vpc = ec2.Vpc.fromVpcAttributes(this, 'Vpc', {
vpcId: props.vpcId,
availabilityZones: ['ap-northeast-1a', 'ap-northeast-1c'],
});
// セキュリティグループを作成
const vpcEndpointSg = new ec2.SecurityGroup(this, 'VpcEndpointSg', {
vpc,
description: `VPC Endpoint security group for ${props.environment}`,
allowAllOutbound: true,
});
// Public環境の場合、プロキシサーバーからのアクセスを許可
if (props.environment === 'public') {
vpcEndpointSg.addIngressRule(
ec2.Peer.ipv4('10.0.0.0/16'), // Public VPC CIDR
ec2.Port.tcp(443),
'Allow HTTPS from Public VPC'
);
// プロキシサーバー用の追加ポート
const microservicePorts = [3000, 3001, 3002];
microservicePorts.forEach(port => {
vpcEndpointSg.addIngressRule(
ec2.Peer.ipv4('10.0.0.0/16'),
ec2.Port.tcp(port),
`Allow microservice port ${port}`
);
});
}
// Isolated環境の場合、ALBからのアクセスを許可
if (props.environment === 'isolated') {
vpcEndpointSg.addIngressRule(
ec2.Peer.ipv4('10.1.0.0/16'), // Isolated VPC CIDR
ec2.Port.tcp(443),
'Allow HTTPS from Isolated VPC'
);
}
// 対向環境のVPC Endpoint Service IDを取得
const targetEnvironment = props.environment === 'public' ? 'isolated' : 'public';
const vpcEndpointServiceId = ssm.StringParameter.valueForStringParameter(
this,
`/microservice/${targetEnvironment}/vpc-endpoint-service-id`
);
// Interface VPC Endpointを作成
const vpcEndpoint = new ec2.InterfaceVpcEndpoint(this, 'VpcEndpoint', {
vpc,
service: new ec2.InterfaceVpcEndpointService(vpcEndpointServiceId, 443),
subnets: {
subnets: props.subnetIds.map(subnetId =>
ec2.Subnet.fromSubnetId(this, `Subnet${subnetId}`, subnetId)
)
},
securityGroups: [vpcEndpointSg],
privateDnsEnabled: false, // カスタムDNSを使用するため無効化
policyDocument: new cdk.aws_iam.PolicyDocument({
statements: [
new cdk.aws_iam.PolicyStatement({
effect: cdk.aws_iam.Effect.ALLOW,
principals: [new cdk.aws_iam.AnyPrincipal()],
actions: ['*'],
resources: ['*'],
}),
],
}),
});
// VPC Endpoint DNSをSSMパラメータに保存
new ssm.StringParameter(this, 'VpcEndpointDns', {
parameterName: `/microservice/${props.environment}/vpc-endpoint-dns`,
stringValue: vpcEndpoint.vpcEndpointDnsEntries[0].domainName,
description: `VPC Endpoint DNS for ${props.environment} environment`,
});
// VPC Endpoint IDをSSMパラメータに保存
new ssm.StringParameter(this, 'VpcEndpointId', {
parameterName: `/microservice/${props.environment}/vpc-endpoint-id`,
stringValue: vpcEndpoint.vpcEndpointId,
description: `VPC Endpoint ID for ${props.environment} environment`,
});
// 出力
new cdk.CfnOutput(this, 'VpcEndpointDnsOutput', {
value: vpcEndpoint.vpcEndpointDnsEntries[0].domainName,
description: 'VPC Endpoint DNS name',
});
new cdk.CfnOutput(this, 'VpcEndpointIdOutput', {
value: vpcEndpoint.vpcEndpointId,
description: 'VPC Endpoint ID',
});
}
}
2. セキュリティ設定のベストプラクティス
// セキュリティグループの詳細設定
const createSecurityGroups = (vpc: ec2.Vpc) => {
// プロキシサーバー用セキュリティグループ
const proxyServerSg = new ec2.SecurityGroup(stack, 'ProxyServerSg', {
vpc,
description: 'Security group for proxy server',
});
// 外部からのHTTPSアクセスのみ許可
proxyServerSg.addIngressRule(
ec2.Peer.anyIpv4(),
ec2.Port.tcp(443),
'HTTPS from internet'
);
// VPC Endpoint用セキュリティグループ
const vpcEndpointSg = new ec2.SecurityGroup(stack, 'VpcEndpointSg', {
vpc,
description: 'Security group for VPC endpoint',
});
// プロキシサーバーからのアクセスのみ許可
vpcEndpointSg.addIngressRule(
proxyServerSg,
ec2.Port.tcp(443),
'HTTPS from proxy server'
);
// マイクロサービス用ポートも許可
[3000, 3001, 3002].forEach(port => {
vpcEndpointSg.addIngressRule(
proxyServerSg,
ec2.Port.tcp(port),
`Microservice port ${port} from proxy`
);
});
return { proxyServerSg, vpcEndpointSg };
};
Isolated VPC側: ALBのパスルーティング設定
1. ALBのパスベースルーティング
⚖️ ALBルーティング設定
// lib/alb-routing-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as elbv2 from 'aws-cdk-lib/aws-elasticloadbalancingv2';
import * as ecs from 'aws-cdk-lib/aws-ecs';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
export class AlbRoutingStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
// ALBを作成
const alb = new elbv2.ApplicationLoadBalancer(this, 'MicroserviceALB', {
vpc,
internetFacing: false,
scheme: elbv2.ApplicationLoadBalancerScheme.INTERNAL,
});
// HTTPSリスナーを作成
const listener = alb.addListener('HttpsListener', {
port: 443,
protocol: elbv2.ApplicationProtocol.HTTPS,
certificates: [certificate], // ACM証明書
defaultAction: elbv2.ListenerAction.fixedResponse(404, {
contentType: 'application/json',
messageBody: JSON.stringify({
error: 'Service not found',
message: 'The requested service path does not exist'
})
}),
});
// マイクロサービス別のターゲットグループとルーティング設定
const microservices = [
{ name: 'users', port: 3000, path: '/users*', priority: 100 },
{ name: 'orders', port: 3001, path: '/orders*', priority: 200 },
{ name: 'payments', port: 3002, path: '/payments*', priority: 300 },
];
microservices.forEach(service => {
// ターゲットグループを作成
const targetGroup = new elbv2.ApplicationTargetGroup(this, `${service.name}TargetGroup`, {
port: service.port,
protocol: elbv2.ApplicationProtocol.HTTP,
vpc,
targetType: elbv2.TargetType.IP,
healthCheck: {
enabled: true,
path: '/health',
protocol: elbv2.Protocol.HTTP,
port: service.port.toString(),
healthyHttpCodes: '200',
interval: cdk.Duration.seconds(30),
timeout: cdk.Duration.seconds(5),
healthyThresholdCount: 2,
unhealthyThresholdCount: 3,
},
deregistrationDelay: cdk.Duration.seconds(30),
});
// パスベースルーティングルールを追加
listener.addTargetGroups(`${service.name}Rule`, {
targetGroups: [targetGroup],
priority: service.priority,
conditions: [
// パスパターンマッチング
elbv2.ListenerCondition.pathPatterns([service.path]),
// 任意:ヘッダーベースルーティング
elbv2.ListenerCondition.httpHeader('X-Service-Name', [service.name]),
],
});
// ターゲットグループARNをSSMに保存(ECSデプロイ時に使用)
new ssm.StringParameter(this, `${service.name}TargetGroupArn`, {
parameterName: `/microservice/isolated/target-group-arn/${service.name}`,
stringValue: targetGroup.targetGroupArn,
});
});
// ヘルスチェック用のエンドポイント
const healthTargetGroup = new elbv2.ApplicationTargetGroup(this, 'HealthTargetGroup', {
port: 8080,
protocol: elbv2.ApplicationProtocol.HTTP,
vpc,
targetType: elbv2.TargetType.IP,
healthCheck: {
enabled: true,
path: '/health',
healthyHttpCodes: '200',
},
});
listener.addTargetGroups('HealthRule', {
targetGroups: [healthTargetGroup],
priority: 50, // 最高優先度
conditions: [
elbv2.ListenerCondition.pathPatterns(['/health', '/health/*']),
],
});
}
}
2. ECSサービスとターゲットグループの統合
🐳 ECSサービス設定例
// lib/ecs-service-stack.ts
export class EcsServiceStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: EcsServiceProps) {
super(scope, id, props);
// タスク定義
const taskDefinition = new ecs.FargateTaskDefinition(this, 'TaskDefinition', {
memoryLimitMiB: 512,
cpu: 256,
});
// コンテナ追加
const container = taskDefinition.addContainer('ServiceContainer', {
image: ecs.ContainerImage.fromRegistry(props.imageUri),
environment: {
SERVICE_NAME: props.serviceName,
PORT: props.port.toString(),
LOG_LEVEL: 'INFO',
},
logging: ecs.LogDrivers.awsLogs({
streamPrefix: `${props.serviceName}-service`,
}),
});
container.addPortMappings({
containerPort: props.port,
protocol: ecs.Protocol.TCP,
});
// ECSサービス作成
const service = new ecs.FargateService(this, 'Service', {
cluster: props.cluster,
taskDefinition,
desiredCount: 2,
enableExecuteCommand: true, // デバッグ用
healthCheckGracePeriod: cdk.Duration.seconds(60),
});
// ターゲットグループARNを取得
const targetGroupArn = ssm.StringParameter.valueForStringParameter(
this,
`/microservice/isolated/target-group-arn/${props.serviceName}`
);
// ALBターゲットグループにサービスを登録
const targetGroup = elbv2.ApplicationTargetGroup.fromTargetGroupAttributes(
this, 'TargetGroup',
{
targetGroupArn: targetGroupArn,
loadBalancerArns: [props.albArn],
}
);
service.attachToApplicationTargetGroup(targetGroup);
// オートスケーリング設定
const scaling = service.autoScaleTaskCount({
minCapacity: 1,
maxCapacity: 10,
});
// CPU使用率ベースのスケーリング
scaling.scaleOnCpuUtilization('CpuScaling', {
targetUtilizationPercent: 70,
scaleInCooldown: cdk.Duration.seconds(300),
scaleOutCooldown: cdk.Duration.seconds(300),
});
// メモリ使用率ベースのスケーリング
scaling.scaleOnMemoryUtilization('MemoryScaling', {
targetUtilizationPercent: 80,
});
}
}
実際のマイクロサービス実装例
1. User Service (FastAPI)
👥 User Service実装例
# user-service/app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="User Service",
description="ユーザー管理マイクロサービス",
version="1.0.0"
)
# データモデル
class User(BaseModel):
id: int
name: str
email: str
created_at: datetime
class CreateUserRequest(BaseModel):
name: str
email: str
# インメモリデータストア(実際の環境ではRDSやDynamoDBを使用)
users_db = {
1: User(id=1, name="田中太郎", email="tanaka@example.com", created_at=datetime.now()),
2: User(id=2, name="山田花子", email="yamada@example.com", created_at=datetime.now()),
}
@app.get("/health")
async def health_check():
"""ヘルスチェックエンドポイント"""
return {
"status": "healthy",
"service": "user-service",
"timestamp": datetime.now(),
"version": "1.0.0"
}
@app.get("/users", response_model=List[User])
async def get_users():
"""全ユーザー取得"""
logger.info("Getting all users")
return list(users_db.values())
@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: int):
"""特定ユーザー取得"""
logger.info(f"Getting user {user_id}")
if user_id not in users_db:
raise HTTPException(status_code=404, detail="User not found")
return users_db[user_id]
@app.post("/users", response_model=User)
async def create_user(user_request: CreateUserRequest):
"""新規ユーザー作成"""
logger.info(f"Creating user: {user_request.name}")
# 新しいIDを生成
new_id = max(users_db.keys()) + 1 if users_db else 1
new_user = User(
id=new_id,
name=user_request.name,
email=user_request.email,
created_at=datetime.now()
)
users_db[new_id] = new_user
return new_user
@app.put("/users/{user_id}", response_model=User)
async def update_user(user_id: int, user_request: CreateUserRequest):
"""ユーザー更新"""
logger.info(f"Updating user {user_id}")
if user_id not in users_db:
raise HTTPException(status_code=404, detail="User not found")
users_db[user_id].name = user_request.name
users_db[user_id].email = user_request.email
return users_db[user_id]
@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
"""ユーザー削除"""
logger.info(f"Deleting user {user_id}")
if user_id not in users_db:
raise HTTPException(status_code=404, detail="User not found")
del users_db[user_id]
return {"message": "User deleted successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=3000)
デプロイとテスト手順
1. ローカル開発環境でのテスト
# 1. プロキシサーバーを起動
cd proxy-server
docker build -t microservice-proxy .
docker run -p 8000:8000 -e VPC_ENDPOINT_DNS="localhost" microservice-proxy
# 2. 各マイクロサービスを起動
cd user-service
docker build -t user-service .
docker run -p 3000:3000 user-service
# 3. 動作確認
curl http://localhost:8000/
curl http://localhost:8000/api/users
curl -X POST http://localhost:8000/api/users \
-H "Content-Type: application/json" \
-d '{"name": "新規ユーザー", "email": "new@example.com"}'
2. AWS環境でのデプロイ
# 1. ECRにプッシュ
aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin 123456789012.dkr.ecr.ap-northeast-1.amazonaws.com
docker tag microservice-proxy:latest 123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/microservice-proxy:latest
docker push 123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/microservice-proxy:latest
# 2. CDKでスタックをデプロイ
cdk deploy Foundation-public Foundation-isolated --concurrency 2
cdk deploy VpcEndpoint-public VpcEndpoint-isolated --concurrency 2
cdk deploy AlbRouting-isolated
cdk deploy EcsService-users EcsService-orders EcsService-payments --concurrency 3
cdk deploy ProxyServer-public
監視とトラブルシューティング
1. CloudWatchメトリクス設定
📊 カスタムメトリクス設定
# monitoring/metrics.py
import boto3
from datetime import datetime
class CloudWatchMetrics:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
def put_custom_metric(self, metric_name: str, value: float, unit: str = 'Count'):
"""カスタムメトリクスをCloudWatchに送信"""
try:
self.cloudwatch.put_metric_data(
Namespace='MicroserviceProxy',
MetricData=[
{
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.now()
}
]
)
except Exception as e:
logger.error(f"Failed to put metric {metric_name}: {str(e)}")
def put_service_health_metric(self, service_name: str, is_healthy: bool):
"""サービス健康状態メトリクス"""
self.put_custom_metric(
f'ServiceHealth/{service_name}',
1.0 if is_healthy else 0.0,
'None'
)
def put_request_metric(self, service_name: str, status_code: int, response_time: float):
"""リクエストメトリクス"""
self.put_custom_metric(f'RequestCount/{service_name}', 1.0)
self.put_custom_metric(f'ResponseTime/{service_name}', response_time, 'Milliseconds')
if status_code >= 400:
self.put_custom_metric(f'ErrorCount/{service_name}', 1.0)
# プロキシサーバーのメイン処理にメトリクス送信を追加
metrics = CloudWatchMetrics()
@app.api_route("/api/{service_name}/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy_request(service_name: str, path: str, request: Request):
start_time = datetime.now()
try:
# ... 既存の処理 ...
# 成功メトリクス送信
response_time = (datetime.now() - start_time).total_seconds() * 1000
metrics.put_request_metric(service_name, response.status_code, response_time)
return response
except Exception as e:
# エラーメトリクス送信
response_time = (datetime.now() - start_time).total_seconds() * 1000
metrics.put_request_metric(service_name, 500, response_time)
raise
2. よくある問題と解決方法
問題 | 症状 | 解決方法 |
---|---|---|
VPC Endpoint接続エラー | 502 Bad Gateway |
セキュリティグループの設定を確認、VPC Endpoint Service IDが正しいか確認 |
マイクロサービス応答なし | 504 Gateway Timeout |
ECSサービスの状態確認、ターゲットグループのヘルスチェック確認 |
ALBルーティング失敗 | 間違ったサービスにルーティング | パスパターンの優先度確認、リスナールールの順序確認 |
ヘルスチェック失敗 | サービスがUnhealthy 状態 |
/health エンドポイントの実装確認、ポート設定確認 |
次回予告
次回は「Isolated VPC → Public VPC逆方向通信編」として、Isolated VPC内のマイクロサービスから外部APIへのアクセスを実現する方法を解説します。
egress通信の制御、NATゲートウェイの設定、セキュリティ考慮事項など、より高度なネットワーク設計についてお話しします!
まとめ
Public VPCからIsolated VPCへの通信実装について、実際のコード例とともに詳しく解説しました。
重要なポイント:
- FastAPIプロキシサーバーによる統一エントリーポイント
- VPC Endpointを通じたセキュアな通信
- ALBパスルーティングによる効率的なマイクロサービス振り分け
- 包括的なヘルスチェックとエラーハンドリング
- CloudWatch監視による運用可視化
この実装により、制約環境でも高可用性と高セキュリティを両立したマイクロサービス基盤が実現できます。
連載記事一覧
- 第1回: Private Link縛りの企業環境で双方向VPC連携アーキテクチャを設計してみた
- 第2回: CDKで作るPrivate Link双方向通信 - スタック分割とデッドロック回避編
- 👈 第3回: Public VPC → Isolated VPC通信編 - プロキシサーバーとVPC Endpoint構成
- 第4回: Isolated VPC → Public VPC逆方向通信編
- 第5回: ECSマイクロサービス動的追加編 - forループで運用を楽にする