導入
背景
データベース無停止での移行プロジェクト(Oracle → Aurora PostgreSQL)にDMSサービスのCDCデータ同期タスクにおける予期しない問題が発見しました。
移行期間に上記エラーの運用負荷を減らすため、自動化運用可能の回避対策を検討する必要になります。
本記事の目的
この記事では、CDCデータ同期タスクの異常停止を自動検知し、冪等性を保証しながら安全に再起動する自動化仕組みを解説します。
特に以下に焦点を中心とします。
- DMSタスク停止の検知方法
- 冪等性を担保した再起動ロジック
- 無限ループやデータ破損を防ぐ安全装置
- 実運用で遭遇した問題と対策
対象読者
- 異種類データベース移行の運用知識が持つ方が対象と想定しています。
- DMSサービスでデータ移行が検討している方が対象と想定しています。
アーキテクチャ概要・技術スタック
データベース移行構成およびエラー発生時の自動復旧ソリューションについて、下記の技術要素が利用されています。
移行構成:
- ソース:Oracle Database 11g
- ターゲット:Aurora PostgreSQL
- 移行方式:Full Load → CDC(継続的差分同期)
自動復旧システム:
- 検知:CloudWatch Events(DMS タスク状態変更)
- 判定:Lambda(停止原因の分析)
- 記録:DynamoDB(再起動履歴・エラーカウント)
- 実行:Lambda(冪等性を保証した再起動)
- 通知:SNS(管理者アラート)
試行:自動復旧システムの構築
基本的に移行タスクのにてCDCエラーが発生した場合、DMSタスクの自動復旧フローが下記の流れで対策で実現されました。
データフロー:
DMS Task 状態変化
↓
CloudWatch Events(状態: stopped, failed 検知)
↓
Lambda 関数(停止原因分析)
├─ CDC 遅延起因? → 自動再起動
└─ その他? → 管理者通知
↓
DynamoDB(再起動履歴記録)
├─ 再起動回数カウント
└─ 無限ループ防止
↓
DMS Task 再起動(resume-processing)
コアアイディア
上記の対応フローにて下記の四つポイントが実現する必要になります。
- 即時エラー検知:CloudWatch Events でCDC移行状態変化をリアルタイム監視しましょう
- エラー原因判別:移行タスク停止理由を自動分析し、再起動可否を判断します。
- 安全に再起動:データ同期の冪等性を担保し、無限ループを防止しましょう。
- 回復履歴記録:再起動回数を DynamoDB で管理します。
再起動判断ロジック:
DMS Task 停止検知
↓
停止理由を CloudWatch Logs から取得
↓
┌─ CDCエラー → 自動再起動(条件付き)
├─ ターゲット DB エラー → 管理者通知
├─ DMS 設定エラー → 管理者通知
└─ 手動キャンセル → 何もしない
アーキテクチャ
システム構成図
自動復旧対策について、下記のアーキテクチャ構成で実現されています。
┌─────────────────────────────────────────────┐
│ DMS Replication Task │
│ Oracle → CDC Engine → Aurora PostgreSQL │
└────────────┬────────────────────────────────┘
│ 状態変化イベント
↓
┌─────────────────────────────────────────────┐
│ CloudWatch Events │
│ Rule: DMS Task State Change │
│ - Source: aws.dms │
│ - Detail Type: DMS Replication Task │
│ State Change │
│ - State: stopped, failed │
└────────────┬────────────────────────────────┘
│ イベント発火
↓
┌─────────────────────────────────────────────┐
│ Lambda: DMS Auto Recovery │
│ │
│ 1. 停止原因を分析 │
│ └─ CloudWatch Logs Insights クエリ │
│ │
│ 2. 再起動可否を判定 │
│ ├─ CDC 遅延 → OK │
│ ├─ VPN 切断 → OK │
│ └─ その他 → NG(管理者通知) │
│ │
│ 3. DynamoDB で再起動履歴確認 │
│ └─ 過去1時間に3回以上? → 無限ループ防止│
│ │
│ 4. 冪等性を保証して再起動 │
│ └─ resume-processing で実行 │
└────────────┬────────────────────────────────┘
│
┌────┴─────┐
↓ ↓
┌──────────────┐ ┌──────────────┐
│ DynamoDB │ │ SNS Topic │
│ │ │ │
│ - TaskArn │ │ 管理者通知: │
│ - Timestamp │ │ - Email │
│ - Reason │ │ - Slack │
│ - Action │ │ │
│ - Count │ │ │
└──────────────┘ └──────────────┘
実装詳細
1. CloudWatch Events ルール
下記のフィルターでCloudWatchログからタスクのエラーを検知し、自動復旧イベントルールを発火します。
{
"source": ["aws.dms"],
"detail-type": ["DMS Replication Task State Change"],
"detail": {
"eventType": ["state change"],
"state": ["stopped", "failed"]
}
}
2. Lambda 関数(自動復旧ロジック)
イベントルールより呼び出すLambda関数にて下記の自動復旧ロジックを行います。
自動復旧処理が連続失敗した場合、もしくはインフラ関連なエラーが発生した場合、管理者に通知し、手動運用に切り替えします。
"""
DMS タスク自動復旧 Lambda
機能:
1. タスク停止原因を分析
2. 再起動可否を判定
3. 冪等性を保証して再起動
4. 履歴を DynamoDB に記録
"""
import boto3
import json
import re
from datetime import datetime, timedelta
from decimal import Decimal
dms = boto3.client('dms')
logs = boto3.client('logs')
dynamodb = boto3.resource('dynamodb')
sns = boto3.client('sns')
HISTORY_TABLE = dynamodb.Table('dms-task-recovery-history')
ADMIN_SNS_TOPIC = 'arn:aws:sns:ap-northeast-1:123456789012:dms-admin-alerts'
# 再起動可能な停止理由(正規表現パターン)
RESTARTABLE_PATTERNS = [
r'CDC load has exceeded',
r'Replication task ran out of memory',
r'Network error',
r'Connection reset',
r'VPN.*disconnect',
]
def lambda_handler(event, context):
"""
CloudWatch Events からのイベントを処理
"""
print(f"受信イベント: {json.dumps(event)}")
# イベントから情報抽出
detail = event['detail']
task_arn = detail['replicationTaskArn']
current_state = detail['state']
previous_state = detail.get('previousState', 'unknown')
timestamp = event['time']
print(f"Task: {task_arn}")
print(f"状態変化: {previous_state} → {current_state}")
# タスク情報を取得
task_info = get_task_info(task_arn)
if not task_info:
notify_admin(f"タスク情報の取得に失敗: {task_arn}")
return
# 停止理由を分析
stop_reason = analyze_stop_reason(task_arn, task_info)
print(f"停止理由: {stop_reason}")
# 再起動可否を判定
if not is_restartable(stop_reason):
print("このエラーは自動再起動の対象外")
notify_admin(
f"DMS タスクが停止しました(手動対応が必要)\n\n"
f"Task ARN: {task_arn}\n"
f"停止理由: {stop_reason}\n"
f"時刻: {timestamp}"
)
record_history(task_arn, timestamp, stop_reason, 'MANUAL_INTERVENTION_REQUIRED')
return
# 無限ループチェック
recent_restarts = count_recent_restarts(task_arn, hours=1)
print(f"過去1時間の再起動回数: {recent_restarts}")
if recent_restarts >= 3:
print("再起動回数が上限に達しました(無限ループ防止)")
notify_admin(
f"DMS タスクの自動再起動が制限されました\n\n"
f"Task ARN: {task_arn}\n"
f"理由: 過去1時間に3回再起動済み\n"
f"停止理由: {stop_reason}\n\n"
f"手動で原因を調査してください。"
)
record_history(task_arn, timestamp, stop_reason, 'MAX_RETRY_EXCEEDED')
return
# 冪等性を保証して再起動
restart_success = restart_task_idempotent(task_arn)
# 履歴記録
action = 'AUTO_RESTART_SUCCESS' if restart_success else 'AUTO_RESTART_FAILED'
record_history(task_arn, timestamp, stop_reason, action, recent_restarts + 1)
if restart_success:
print("タスクを正常に再起動しました")
else:
notify_admin(
f"DMS タスクの自動再起動に失敗しました\n\n"
f"Task ARN: {task_arn}\n"
f"停止理由: {stop_reason}\n"
f"時刻: {timestamp}"
)
def get_task_info(task_arn):
"""DMS タスクの詳細情報を取得"""
try:
response = dms.describe_replication_tasks(
Filters=[
{'Name': 'replication-task-arn', 'Values': [task_arn]}
]
)
if response['ReplicationTasks']:
return response['ReplicationTasks'][0]
return None
except Exception as e:
print(f"タスク情報取得エラー: {e}")
return None
def analyze_stop_reason(task_arn, task_info):
"""
CloudWatch Logs から停止理由を分析
"""
# DMS のログストリーム名を構築
# 形式: dms-tasks-{task-id}
task_id = task_arn.split(':')[-1]
log_group = f"dms-tasks-{task_id}"
try:
# 過去10分間のエラーログを検索
end_time = int(datetime.now().timestamp() * 1000)
start_time = int((datetime.now() - timedelta(minutes=10)).timestamp() * 1000)
query = """
fields @timestamp, @message
| filter @message like /ERROR/ or @message like /FATAL/
| sort @timestamp desc
| limit 20
"""
query_id = logs.start_query(
logGroupName=log_group,
startTime=start_time,
endTime=end_time,
queryString=query
)['queryId']
# クエリ結果を待機
import time
for _ in range(10):
time.sleep(1)
result = logs.get_query_results(queryId=query_id)
if result['status'] == 'Complete':
break
# エラーメッセージを抽出
if result['results']:
error_messages = [
field['value']
for row in result['results']
for field in row
if field['field'] == '@message'
]
return '\n'.join(error_messages[:3]) # 最新3件
return "UNKNOWN_REASON"
except Exception as e:
print(f"ログ分析エラー: {e}")
# フォールバック: タスク情報から停止理由を取得
return task_info.get('StopReason', 'UNKNOWN_REASON')
def is_restartable(stop_reason):
"""
停止理由から自動再起動可能かを判定
"""
for pattern in RESTARTABLE_PATTERNS:
if re.search(pattern, stop_reason, re.IGNORECASE):
return True
return False
def count_recent_restarts(task_arn, hours=1):
"""
指定時間内の再起動回数をカウント
"""
cutoff_time = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
try:
response = HISTORY_TABLE.query(
KeyConditionExpression='TaskArn = :arn AND #ts > :cutoff',
ExpressionAttributeNames={'#ts': 'Timestamp'},
ExpressionAttributeValues={
':arn': task_arn,
':cutoff': cutoff_time
}
)
# AUTO_RESTART アクションのみカウント
restart_actions = [
item for item in response['Items']
if item.get('Action', '').startswith('AUTO_RESTART')
]
return len(restart_actions)
except Exception as e:
print(f"履歴カウントエラー: {e}")
return 0
def restart_task_idempotent(task_arn):
"""
冪等性を保証してタスクを再起動
"""
try:
# 現在の状態を確認
task_info = get_task_info(task_arn)
if not task_info:
print("タスク情報が取得できません")
return False
current_state = task_info['Status']
print(f"現在の状態: {current_state}")
# 冪等性チェック: すでに稼働中の場合は何もしない
if current_state in ['running', 'starting']:
print("タスクはすでに稼働中です")
return True
# 停止中または起動中の場合は待機
if current_state in ['stopping', 'starting']:
print(f"タスクは {current_state} 状態です。待機します。")
return False
# 再起動実行
print("タスクを再起動します")
dms.start_replication_task(
ReplicationTaskArn=task_arn,
StartReplicationTaskType='resume-processing' # CDC を継続
)
return True
except dms.exceptions.InvalidResourceStateFault as e:
print(f"タスクは遷移中です: {e}")
return False
except Exception as e:
print(f"再起動エラー: {e}")
return False
def record_history(task_arn, timestamp, reason, action, retry_count=0):
"""
再起動履歴を DynamoDB に記録
"""
try:
# 30日後に自動削除されるよう TTL を設定
expiration_time = int(
(datetime.utcnow() + timedelta(days=30)).timestamp()
)
HISTORY_TABLE.put_item(
Item={
'TaskArn': task_arn,
'Timestamp': timestamp,
'StopReason': reason[:1000], # 長すぎる場合は切り詰め
'Action': action,
'RetryCount': retry_count,
'ExpirationTime': expiration_time
}
)
print("履歴を記録しました")
except Exception as e:
print(f"履歴記録エラー: {e}")
def notify_admin(message):
"""
管理者に SNS 通知
"""
try:
sns.publish(
TopicArn=ADMIN_SNS_TOPIC,
Subject='【DMS】自動復旧アラート',
Message=message
)
print("管理者に通知しました")
except Exception as e:
print(f"通知エラー: {e}")
3. 停止理由の判別ロジック
DMS移行タスク停止のエラー分類処理を定義し、自動再起動可能なエラーパターンと管理者よりエラー処置のエラーパターンを明確しましょう。
RESTARTABLE_PATTERNS = [
# CDC 遅延関連
r'CDC load has exceeded',
r'Replication task ran out of memory',
r'Unable to allocate memory',
# ネットワーク関連
r'Network error',
r'Connection reset',
r'Connection timed out',
r'could not connect to server',
# 一時的なターゲット DB エラー
r'deadlock detected',
r'Lock wait timeout exceeded',
]
NON_RESTARTABLE_PATTERNS = [
# 設定エラー
r'Invalid configuration',
r'Authentication failed',
r'Access denied',
# データ整合性エラー
r'Duplicate key',
r'Foreign key constraint',
r'Data type mismatch',
# リソース不足(恒久的)
r'Disk full',
r'Out of disk space',
# 手動停止
r'User initiated',
r'Manual stop',
]
実装中に遭遇した問題と解決策
自動復旧ソリューションのロジック実装中、いくつ苦労した箇所を軽くて解説します。
問題:自動復旧処理の無限ループ
該当問題の事象が自動復旧処理にて管理者より対策必要のエラーが発生した場合、再起動回数が記録されてないため、復旧処理が繰り返し起動してしまいました。
解決策としては一定時間に再起動回数の制限を追加し、もし回数上限が超えた場合、自動復旧を停止し、管理者に通知します。
1. DMS Task が停止(CDC 遅延)
2. Lambda が自動再起動
3. すぐにまた停止(根本原因が未解決)
4. Lambda が自動再起動
5. すぐにまた停止
6. ... 無限ループ
def count_recent_restarts(task_arn, hours=1):
"""
過去N時間の再起動回数をカウント
"""
cutoff_time = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
response = HISTORY_TABLE.query(
KeyConditionExpression='TaskArn = :arn AND #ts > :cutoff',
ExpressionAttributeNames={'#ts': 'Timestamp'},
ExpressionAttributeValues={
':arn': task_arn,
':cutoff': cutoff_time
}
)
# AUTO_RESTART アクションのみカウント
restart_count = sum(
1 for item in response['Items']
if item.get('Action', '').startswith('AUTO_RESTART')
)
return restart_count
# メインロジックで使用
recent_restarts = count_recent_restarts(task_arn, hours=1)
if recent_restarts >= 3:
print("再起動回数が上限(過去1時間に3回)を超えました")
notify_admin(
f"DMS タスクが繰り返し停止しています\n\n"
f"Task ARN: {task_arn}\n"
f"過去1時間の再起動回数: {recent_restarts}\n\n"
f"根本原因の調査が必要です。\n"
f"可能性:\n"
f"- ターゲット DB のリソース不足\n"
f"- DMS インスタンスのスペック不足\n"
f"- ネットワークの恒久的な問題"
)
record_history(task_arn, timestamp, stop_reason, 'MAX_RETRY_EXCEEDED')
return
問題:手動停止との競合
該当問題の事象は管理者がエラー対処処理中に移行タスクを手動停止した場合、エラー停止か手動停止か区別できませんでした。
解決策としては管理者より手動停止の場合、手動停止のタグを追加し、ログに出力すれば回避できます。
1. 管理者がメンテナンスのため DMS Task を手動停止
2. CloudWatch Events が停止を検知
3. Lambda が「自動再起動可能」と判定
4. Lambda が勝手にタスクを再起動
5. 管理者が困惑(メンテナンスできない)
# 手動停止前にタグを設定
$ aws dms add-tags-to-resource \
--resource-arn arn:aws:dms:...:task:ABCDEFG \
--tags Key=AutoRestart,Value=disabled
# Lambda 関数で確認
def is_auto_restart_enabled(task_arn):
"""
タスクに AutoRestart タグがあるか確認
"""
try:
response = dms.list_tags_for_resource(ResourceArn=task_arn)
tags = {tag['Key']: tag['Value'] for tag in response.get('TagList', [])}
auto_restart = tags.get('AutoRestart', 'enabled')
return auto_restart.lower() != 'disabled'
except Exception as e:
print(f"タグ取得エラー: {e}")
return True # デフォルトは有効
# メインロジック
if not is_auto_restart_enabled(task_arn):
print("AutoRestart が無効化されています(手動停止)")
record_history(task_arn, timestamp, 'MANUAL_STOP', 'SKIPPED')
return
まとめ
一般的には異種類のデータベース移行中の環境が複雑だし、エラー発生時の原因調査および対応方法検討が難しかったです。
DMSサービスを活用し、かつ、移行中のCDCタスクエラー問題が一部自動的に復旧できて、運用負荷が大きく削減されました。
本記事が少しでも参考になりましたら幸いです。最後までご覧いただきありがとうございました!
関連リソース:
- AWS DMS 公式: https://docs.aws.amazon.com/dms/