はじめに
業務で「Amazon OpenSearch Service」を利用している。
保存されているデータ量の増加を受けて、定期的にデータノードを追加、あるいはボリュームサイズを増加させている。
データノードの増加であれば、Blue/Greenデプロイが実行されないので、比較的短時間で設定変更が完了する。
しかし、ボリュームサイズの増加となると、Blue/Greenデプロイが実行される。
既存のデータノードに対して直接ボリュームサイズを増加させることができないので、増加後のボリュームサイズを確保したデータノードを別環境として作成し、既存のデータノードから新しいデータノードにデータを移行する必要がある。
そのため、保存されているデータが膨大な場合は、ボリュームサイズの変更が完了し、既存のデータノードから新しいデータノードにデータの移行が完了するまで数時間、あるいは1日弱を要することがある。
設定変更が完了したかどうか、長時間にわたって定期的にコンソールを確認しに行くのは手間がかかる。
そこで、設定変更が完了したら通知する仕組みを構築できないかと考えた。
要件
OpenSearchドメインの「設定変更ステータス」の遷移を通知する。
システム構成
今回作成したシステムの構成は以下のとおり。
当初、EventBridgeルールのイベントパターンとして、OpenSearchドメインの設定変更ステータスを指定し、SNSトピックにメッセージを送信できないかと考えた。
しかし、指定できるイベントパターンとして対応していなかった。
そのため、別の手段として、Lambda関数がOpenSearchドメインの設定変更ステータスを定期的に取得し、ステータスが遷移した場合に、SNSトピックにメッセージを送信するという仕組みを採用した。
ただ、Lambda関数には、実行時間が最大15分という制約がある。
設定変更ステータスが「完了」になるまで15分以上かかる場合は、タイムアウトとなり、通知が送信されない。
そのため、1回のLambda関数の実行において、下記の処理を実行することにした。
① OpenSearchドメインの設定変更ステータスを取得
② ステータスをファイルに出力し、S3バケットに保存
③ 前回保存したファイルに記載されているステータスと異なる場合に、SNSトピックにメッセージを送信
上記の処理にすることで、Lambda関数の最大実行時間による影響を回避できる。
各サービスの設定
SNSトピックの作成
・メッセージを受信するSNSトピックを作成する。
・サブスクリプションに、通知メールを受信したいメールアドレスなどを設定する。
S3バケットの作成
・設定変更ステータスが記載されたファイルを保存するS3バケットを作成する。
Lambda関数の作成
・Lambda関数の「コード」として、以下のPythonスクリプトを設定する。
・「環境変数」には、ステータスを取得したいOpenSearchドメイン、先の手順で作成したSNSトピック、S3バケットを設定する。プレフィックスおよびファイル名は任意で良いが、作業ごとにその都度、日付や作業名などを含めると、作業ごとにプレフィックスが分かれるので後で見やすくなる。
#boto3インポート
import boto3
#difflibインポート
import difflib
#環境変数の取得
import os
domainName = os.environ['domain_name']
bucket = os.environ['s3_bucket']
prefix = os.environ['s3_prefix']
fileName = os.environ['file_name']
topic = os.environ['sns_topic']
def lambda_handler(event, context):
print('message: DomainName: ' + domainName)
#「DescribeDomainChangeProgress」APIリクエスト
client_openSearch = boto3.client('opensearch')
response_openSearch = client_openSearch.describe_domain_change_progress(
DomainName = domainName
)
#受け取ったAPIレスポンスから「設定変更ステータス」のみを取得
changeProgressStatus = response_openSearch['ChangeProgressStatus']
configChangeStatus = changeProgressStatus.get('ConfigChangeStatus')
print('message: ConfigChangeStatus: ' + configChangeStatus)
#「設定変更ステータス」をJSONファイルとしてS3バケットに格納
client_s3 = boto3.client('s3')
response_s3_put = client_s3.put_object(
Bucket = bucket,
Key = prefix + '/' + fileName,
Body = configChangeStatus
)
print('message: JSON file has been put in S3 bucket: ' + bucket + '/' + prefix + '/' + fileName)
#S3バケット内にあるJSONファイルのバージョン情報を取得
response_s3_list_obj_ver = client_s3.list_object_versions(
Bucket = bucket,
Prefix = prefix + '/' + fileName
)
print('message: Getting versions of ' + fileName + ' has been completed')
#前のバージョンが存在しない場合は処理終了
if 'Versions' not in response_s3_list_obj_ver or len(response_s3_list_obj_ver['Versions']) < 2:
print('message: No previous version available for comparison')
return
#バージョンの新しい順に並び替えて、最新2バージョンのバージョン情報を取得
latest_versions = sorted(response_s3_list_obj_ver['Versions'], key=lambda v: v['LastModified'], reverse=True)[:2]
#最新バージョンのバージョンIDを取得
latest_version_id = latest_versions[0]['VersionId']
#2番目に新しいバージョンのバージョンIDを取得
previous_version_id = latest_versions[1]['VersionId']
print('message: Latest version id: ' + latest_version_id)
print('message: Previous version id: ' + previous_version_id)
#最新バージョンのJSONファイルの中身を取得
latest_object = client_s3.get_object(
Bucket = bucket, Key = prefix + '/' + fileName, VersionId = latest_version_id
)['Body'].read().decode('utf-8')
#2番目に新しいバージョンのJSONファイルの中身を取得
previous_object = client_s3.get_object(
Bucket = bucket, Key = prefix + '/' + fileName, VersionId = previous_version_id
)['Body'].read().decode('utf-8')
#最新バージョンと前回バージョンで値が同じ場合、処理終了
if previous_object == latest_object:
print('message: DomainConfigChangeStatus has not changed yet')
return
#値が異なる場合、差分情報を取得
print('message: DomainConfigChangeStatus has changed')
response_diff = difflib.ndiff(previous_object.split(), latest_object.split())
diff_result = '\n'.join(response_diff)
#SNSトピックに送信するメッセージの本文
message = f"""
Notification:
OpenSearch DomainConfigChangeStatus has changed.
Domain:
{domainName}
Status transition:
{diff_result}
"""
#SNSトピックにメッセージを送信
client_sns = boto3.client('sns')
response_sns = client_sns.publish(
TopicArn = topic,
Message = message,
Subject = 'OpenSearch DomainConfigChangeStatus has changed'
)
print('message: Message has been sent to SNS topic: ' + topic)
return
EventBridgeルールの作成
・Lambda関数を定期的に実行するたのEventBridgeルールを作成する。
・スケジュールを「10分ごと」など任意の時間に設定し、ターゲット先ほど作成したLambda関数を指定する。
おわりに
今回は、OpenSearchドメインの設定変更ステータスが遷移した時に通知するために、LambdaとS3を利用してステータスの差分を検出するという仕組みにした。
ただ、もう少し簡潔な仕組みできないだろうかと考えている。
他の手段としてStep Functionsを利用できるかもしれないということを耳にしたので、実現可能かどうか試してみたい。
