3
Help us understand the problem. What are the problem?

posted at

Organization

lambda から OpenSearch ( ElasticSearch )にqueryする

はじめに

定期的にOpenSearchからデータ(ドキュメントの件数)を取得し、cloudwatchのカスタムメトリクスとして設定したかったので、lambdaからOpenSearchのデータ取得することを検討しました。curlの実行事例は多くありますが、lambda(python)はあまりなかったのでメモしておきます。

lambdaの設定

lambdaがOpenSearchにアクセスできる必要があるため、OpenSearchと同じVPCに配置しておきます。また、セキュリティグループを設定します。

OpenSearchに対してQueryする

pythonで書きました。専用ライブラリなどはなく、普通にjsonでhttpリクエストします。
query条件に応じて、URLとBodyを設定します。curlでリクエストできていれば、同じ内容でリクエストすればOK。

import json
import boto3
from urllib import request

def lambda_handler(event, context):
    
    #URLとHeader準備
    url = 'https://{endpoint}:443/{indexName}/{typeName}/_count?pretty'
    headers = {
        'Content-Type': 'application/json',
    }
    
    #query用のbody準備
    data = {
        'query': { 
            'bool': {
                'must' : {
                    'range' : {
                        'xxx_date': {
                            'lte': "2022-05-28T23:59:59.999"
                        }
                    }
                }
            }
        }
    }

    #HTTP request
    res = http_req(url, data, headers)
    
    #countだけ取り出して表示
    xxx_count = res["count"]
    print(xxx_count)
    #Cloudwatch metricsへの出力は省略

    return {
        'statusCode': 200
    }

# 引数のURL、BODY、HeaderでHTTPリクエストし、レスポンスボディをjsonで返す
def http_req(url, data, headers):
    post_req = request.Request(url, json.dumps(data).encode(), headers, method='POST')
    with request.urlopen(post_req) as res:
        body = json.load(res)
        return body

おわりに

lambdaのトリガーは、EventBridgeで定期実行にしました。低頻度であれば、EC2でcronより低コストで済みます。

Register as a new user and use Qiita more conveniently

  1. You can follow users and tags
  2. you can stock useful information
  3. You can make editorial suggestions for articles
What you can do with signing up
3
Help us understand the problem. What are the problem?