3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

OpenSearchドメインの設定変更ステータスの遷移を通知する

Posted at

はじめに

業務で「Amazon OpenSearch Service」を利用している。
保存されているデータ量の増加を受けて、定期的にデータノードを追加、あるいはボリュームサイズを増加させている。

データノードの増加であれば、Blue/Greenデプロイが実行されないので、比較的短時間で設定変更が完了する。

しかし、ボリュームサイズの増加となると、Blue/Greenデプロイが実行される。
既存のデータノードに対して直接ボリュームサイズを増加させることができないので、増加後のボリュームサイズを確保したデータノードを別環境として作成し、既存のデータノードから新しいデータノードにデータを移行する必要がある。
そのため、保存されているデータが膨大な場合は、ボリュームサイズの変更が完了し、既存のデータノードから新しいデータノードにデータの移行が完了するまで数時間、あるいは1日弱を要することがある。

設定変更が完了したかどうか、長時間にわたって定期的にコンソールを確認しに行くのは手間がかかる。
そこで、設定変更が完了したら通知する仕組みを構築できないかと考えた。

要件

OpenSearchドメインの「設定変更ステータス」の遷移を通知する。

システム構成

今回作成したシステムの構成は以下のとおり。

image.png

当初、EventBridgeルールのイベントパターンとして、OpenSearchドメインの設定変更ステータスを指定し、SNSトピックにメッセージを送信できないかと考えた。
しかし、指定できるイベントパターンとして対応していなかった。

そのため、別の手段として、Lambda関数がOpenSearchドメインの設定変更ステータスを定期的に取得し、ステータスが遷移した場合に、SNSトピックにメッセージを送信するという仕組みを採用した。

ただ、Lambda関数には、実行時間が最大15分という制約がある。
設定変更ステータスが「完了」になるまで15分以上かかる場合は、タイムアウトとなり、通知が送信されない。

そのため、1回のLambda関数の実行において、下記の処理を実行することにした。

① OpenSearchドメインの設定変更ステータスを取得
② ステータスをファイルに出力し、S3バケットに保存
③ 前回保存したファイルに記載されているステータスと異なる場合に、SNSトピックにメッセージを送信

上記の処理にすることで、Lambda関数の最大実行時間による影響を回避できる。

各サービスの設定

SNSトピックの作成

・メッセージを受信するSNSトピックを作成する。
・サブスクリプションに、通知メールを受信したいメールアドレスなどを設定する。

S3バケットの作成

・設定変更ステータスが記載されたファイルを保存するS3バケットを作成する。

Lambda関数の作成

・Lambda関数の「コード」として、以下のPythonスクリプトを設定する。
・「環境変数」には、ステータスを取得したいOpenSearchドメイン、先の手順で作成したSNSトピック、S3バケットを設定する。プレフィックスおよびファイル名は任意で良いが、作業ごとにその都度、日付や作業名などを含めると、作業ごとにプレフィックスが分かれるので後で見やすくなる。

sample.py
#boto3インポート
import boto3
#difflibインポート
import difflib
#環境変数の取得
import os

domainName = os.environ['domain_name']
bucket = os.environ['s3_bucket']
prefix = os.environ['s3_prefix']
fileName = os.environ['file_name']
topic = os.environ['sns_topic']

def lambda_handler(event, context):

  print('message: DomainName: ' + domainName)

  #「DescribeDomainChangeProgress」APIリクエスト
  client_openSearch = boto3.client('opensearch')
  response_openSearch = client_openSearch.describe_domain_change_progress(
    DomainName = domainName
  )

  #受け取ったAPIレスポンスから「設定変更ステータス」のみを取得
  changeProgressStatus = response_openSearch['ChangeProgressStatus']
  configChangeStatus = changeProgressStatus.get('ConfigChangeStatus')

  print('message: ConfigChangeStatus: ' + configChangeStatus)

  #「設定変更ステータス」をJSONファイルとしてS3バケットに格納
  client_s3 = boto3.client('s3')
  response_s3_put = client_s3.put_object(
    Bucket = bucket,
    Key = prefix + '/' + fileName,
    Body = configChangeStatus
  )

  print('message: JSON file has been put in S3 bucket: ' + bucket + '/' + prefix + '/' + fileName)

  #S3バケット内にあるJSONファイルのバージョン情報を取得
  response_s3_list_obj_ver = client_s3.list_object_versions(
    Bucket = bucket,
    Prefix = prefix + '/' + fileName
  )

  print('message: Getting versions of ' + fileName + ' has been completed')

  #前のバージョンが存在しない場合は処理終了
  if 'Versions' not in response_s3_list_obj_ver or len(response_s3_list_obj_ver['Versions']) < 2:
    print('message: No previous version available for comparison')
    return

  #バージョンの新しい順に並び替えて、最新2バージョンのバージョン情報を取得
  latest_versions = sorted(response_s3_list_obj_ver['Versions'], key=lambda v: v['LastModified'], reverse=True)[:2]

  #最新バージョンのバージョンIDを取得
  latest_version_id = latest_versions[0]['VersionId']
  #2番目に新しいバージョンのバージョンIDを取得
  previous_version_id = latest_versions[1]['VersionId']

  print('message: Latest version id: ' + latest_version_id)
  print('message: Previous version id: ' + previous_version_id)

  #最新バージョンのJSONファイルの中身を取得
  latest_object = client_s3.get_object(
    Bucket = bucket, Key = prefix + '/' + fileName, VersionId = latest_version_id
  )['Body'].read().decode('utf-8')
  
  #2番目に新しいバージョンのJSONファイルの中身を取得
  previous_object = client_s3.get_object(
    Bucket = bucket, Key = prefix + '/' + fileName, VersionId = previous_version_id
  )['Body'].read().decode('utf-8')

  #最新バージョンと前回バージョンで値が同じ場合、処理終了
  if previous_object == latest_object:
    print('message: DomainConfigChangeStatus has not changed yet')
    return

  #値が異なる場合、差分情報を取得
  print('message: DomainConfigChangeStatus has changed')
  response_diff = difflib.ndiff(previous_object.split(), latest_object.split())

  diff_result = '\n'.join(response_diff)

  #SNSトピックに送信するメッセージの本文
  message = f"""

  Notification:
  OpenSearch DomainConfigChangeStatus has changed.

  Domain:
  {domainName}

  Status transition:
  {diff_result}

  """

  #SNSトピックにメッセージを送信
  client_sns = boto3.client('sns')

  response_sns = client_sns.publish(
    TopicArn = topic,
    Message = message,
    Subject = 'OpenSearch DomainConfigChangeStatus has changed'
  )

  print('message: Message has been sent to SNS topic: ' + topic)

  return

EventBridgeルールの作成

・Lambda関数を定期的に実行するたのEventBridgeルールを作成する。
・スケジュールを「10分ごと」など任意の時間に設定し、ターゲット先ほど作成したLambda関数を指定する。

おわりに

今回は、OpenSearchドメインの設定変更ステータスが遷移した時に通知するために、LambdaとS3を利用してステータスの差分を検出するという仕組みにした。

ただ、もう少し簡潔な仕組みできないだろうかと考えている。
他の手段としてStep Functionsを利用できるかもしれないということを耳にしたので、実現可能かどうか試してみたい。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?