背景・目的
異なるリージョンから、DDBとOpenSearchへの更新を試してみます。
実践
下記のような構成で試してみます。
① OpenSearch Serverlessの作成
コレクションの作成
-
AWSにサインインします
-
OpenSearchに移動します
-
ナビゲーションペインの「サーバレス>ダッシュボード」に移動します
-
「コレクションを作成」をクリックします
-
下記を入力し、「次へ」をクリックします
- コレクション名:任意
- コレクションタイプ:検索
- デプロイタイプ:オフ
- セキュリティ:標準作成
- 暗号化:AWS 所有キーを使用する
- ネットワークアクセスの設定
- 次からコレクションにアクセス:パブリック
- リソースタイプ
- OpenSearchエンドポイントへのアクセスを有効にする
- OpenSearch Dashboardsへのアクセスを有効にする
-
データアクセスの設定で、「スキップして後で設定する」をクリックします
-
確認画面で、「送信」をクリックします
データアクセス管理
- OpenSearchに移動します
- ナビゲーションペインで「セキュリティ>データアクセスポリシー」をクリックします
- 「アクセスポリシーを作成」をクリックします
- 下記を入力し、「作成」をクリックします
初期データの作成
-
OpenSearchダッシュボードに移動します
-
ナビゲーションペインで「Dev Tools」に移動します
-
インデックスを登録します
PUT test-index3 === { "acknowledged": true, "shards_acknowledged": true, "index": "test-index3" }
-
ドキュメントを登録します
PUT test-index3/_doc/1 { "id":1, "data":"text1" } === { "_index": "test-index3", "_id": "1", "_version": 1, "result": "created", "_shards": { "total": 0, "successful": 0, "failed": 0 }, "_seq_no": 0, "_primary_term": 0 }
-
内容を確認します。登録されていました
GET test-index3/_search { "query": { "match_all": {} } } ==== { "took": 18, "timed_out": false, "_shards": { "total": 0, "successful": 0, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1, "hits": [ { "_index": "test-index3", "_id": "1", "_score": 1, "_source": { "id": 1, "data": "text1" } } ] } }
② DynamoDBの作成
- DynamoDBに移動します
- ダッシュボードで「テーブルの作成」をクリックします
- 下記を入力し、「テーブルの作成」をクリックします
初期データの作成
- 下記の内容を登録します
% aws dynamodb put-item \ --table-name example_table \ --item '{"id": {"S": "initial_id"}, "data": {"S": "initial_data"}}' \ --region ap-northeast-1 \ %
- 内容を確認します。作成されていました
% aws dynamodb get-item \ --table-name example_table \ --key '{"id": {"S": "initial_id"}}' \ --region ap-northeast-1 { "Item": { "id": { "S": "initial_id" }, "data": { "S": "initial_data" } } } %
③ 更新用のLambdaを作成
関数の作成
- Lambdaに移動します
- リージョンはus-east-2に移動します
- 「関数を作成」ボタンをクリックします
- 下記を入力し、「関数の作成」ボタンをクリックします
- 関数名:任意
- ランタイム:Python 3.12
- アーキテクチャ:arm64
- 下記のコードを貼り付けて、「Deploy」ボタンをクリックします
from opensearchpy import OpenSearch, RequestsHttpConnection from requests_aws4auth import AWS4Auth import boto3 def update_opensearch_index(): client = boto3.client('opensearchserverless') service = 'aoss' region = 'ap-northeast-1' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = 'XXXXX.ap-northeast-1.aoss.amazonaws.com' client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=awsauth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, timeout=60 ) index_name ='test-index3' response = client.index( index=index_name, body={ 'data': 'test data 2' }, id='2', ) print('\nDocument added:') print(response) def put_ddb_item(): dynamodb = boto3.resource('dynamodb',region_name='ap-northeast-1') table = dynamodb.Table('example_table') response = table.put_item( Item={ 'id': 'id:2', 'data': 'test data 2' } ) print('\nItem added:') print(response) def lambda_handler(event, context): update_opensearch_index() put_ddb_item()
レイヤーの作成
LambdaのIAMロールの作成
OpenSearch Serverless
- 許可ポリシーで「許可を追加」>「インラインポリシーを作成」をクリックします
- 下記を指定して、「次へ」をクリックします
- ポリシー名を入力し、「ポリシーの作成」をクリックします
DynamodDB
- 許可ポリシーで「許可を追加」>「ポリシーを追加」をクリックします
- 「AmazonDynamoDBFullAccess」を追加します
OpenSearch Serverlessのデータアクセスコントロール
- OpenSearch Serverlessに移動します
- 該当するコレクションのアクセスポリシーを選択します
- 上記で作成したIAMロールをプリンシパルに追加します
④ 参照用のLambdaを作成
関数の作成
- Lambdaに移動します
- リージョンはap-northeast-1に移動します
- 「関数を作成」ボタンをクリックします
- 下記を入力し、「関数の作成」ボタンをクリックします
- 関数名:任意
- ランタイム:Python 3.12
- アーキテクチャ:arm64
- 下記のコードを貼り付けて、「Deploy」ボタンをクリックします
from opensearchpy import OpenSearch, RequestsHttpConnection from requests_aws4auth import AWS4Auth import boto3 import json def get_ddb_item(): dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1') table = dynamodb.Table('example_table') response = table.get_item(Key={'id': 'id:2'}) dynamo_data = response.get('Item', {}) print(f"DynamoDB Data: {dynamo_data}") return dynamo_data def get_opensearch_index(): client = boto3.client('opensearchserverless') service = 'aoss' region = 'ap-northeast-1' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = 'XXXXX.ap-northeast-1.aoss.amazonaws.com' client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=awsauth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, timeout=60 ) index_name = "test-index3" response = client.get( index=index_name, id='2' ) print('\nDocument added:') print(response) return response def lambda_handler(event, context): ddb_response = get_ddb_item() opensearch_response = get_opensearch_index() return { "statusCode": 200, "body": json.dumps({ "message":"Update successful", "dynamodb":ddb_response, "opensearch-serverless":opensearch_response }) }
レイヤーの作成
- 「レイヤーの追加」をクリックします
- 下記を入力し、「追加」をクリックします(opensearch-pyが含まれています)
- レイヤーソース:AWSレイヤー
- AWSレイヤー:AWSSDKPandas-Python312-Arm64
- バージョン:15
LambdaのIAMロールの指定
すでに作成したIAMロールを指定します。
- Lambdaの「設定」タブの「アクセス権限」をクリックします
- 「編集」をクリックします
- 実行ロールを「既存のロールを使用する」を選択後、上記で作成済みのものを指定します
⑤ API Gatewayの作成
-
API Gatewayに移動します
-
REST APIで構築をクリックします
-
下記を入力し、「APIを作成」をクリックします
- 新しいAPIを選択
- API名:任意
- API エンドポイントタイプ:リージョン
-
「メソッドを作成」をクリックします
-
下記を入力して、「メソッドを作成」をクリックします
-
「APIをデプロイ」をクリックします
-
下記を入力し、「デプロイ」をクリックします
- ステージ:新しいステージ
- ステージ名:prd
⑥ 登録
- ③で作成したLambda関数を開きます
- 「テスト」タブをクリックし、テストをクリックします
- 成功しました
START RequestId: XXXXX Version: $LATEST Document added: {'_index': 'test-index3', '_id': '2', '_version': 14, 'result': 'updated', '_shards': {'total': 0, 'successful': 0, 'failed': 0}, '_seq_no': 0, '_primary_term': 0} Item added: {'ResponseMetadata': {'RequestId': 'XXXXXX', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Sat, 28 Dec 2024 14:20:00 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': 'XXXXXXX', 'x-amz-crc32': '2745614147'}, 'RetryAttempts': 0}} END RequestId: XXXXX REPORT RequestId: XXXXX Duration: 4362.21 ms Billed Duration: 4363 ms Memory Size: 128 MB Max Memory Used: 102 MB Init Duration: 1226.51 ms
⑦ 確認
- ブラウザからAPI Gatewayのエンドポイントにアクセスします
- 成功しました
{ "statusCode": 200, "body": "{\"message\": \"Update successful\", \"dynamodb\": {\"id\": \"id:2\", \"data\": \"test data 2\"}, \"opensearch-serverless\": {\"_index\": \"test-index3\", \"_id\": \"2\", \"_version\": 14, \"_seq_no\": 13, \"_primary_term\": 1, \"found\": true, \"_source\": {\"data\": \"test data 2\"}}}" }
考察
今回、AWS Lambdaを利用したクロスリージョンアクセスを試してみました。
出来ることは理解しておりましたが、実際に試したことはなかったので良い経験になりました。
次回は、LambdaをVPC内に配置して、Transit Gatewayでリージョン間を接続した構成で試してみようと思います。
参考