LoginSignup
10

More than 5 years have passed since last update.

AWS LambdaからAmazon Elasticsearch Serviceにつないでみる

Last updated at Posted at 2017-12-01

この投稿は、AWS Lambda Advent Calendar 2017の初日の投稿になります。

初日なので簡単なのをば!

Elasticsearchは今までローカルやプライベートネットワーク上にインストールして使うことが多く、マネージドサービスを使ったことがなかったので、今回Amazon Elsaticsearch Serviceを使ってみました。

で、色々なサイトを参考させてもらい、設定〜インデックス作成まで実施したので、その備忘録を載せます。

準備

準備ですが、クラスタを作成するだけです。当然t2.smallで作成。Elasticsearchのバージョンは5.5を選択しました。他はデフォルトの設定です。

スクリーンショット 2017-11-19 18.52.14.png

Lambdaからアクセスしてインデックスを作成してみる

AWS Lambdaに適当な関数を作成して、とりあえずアクセスしてみます。こちらを参考にさせてもらいました。

Lambda から elasticsearch service に何かする [cloudpack OSAKA blog]

このコードでアクセスするためには、STSのAssumeRolests:AssumeRoleがIAM Roleに付与されている必要があります。ポイントとなるポリシーの設定はこんな感じです。(もちろんテスト的な設定なので、本来はきちんとアクセス権限を設計しましょう)

        {
            "Action": [
                "sts:AssumeRole"
            ],
            "Resource": [
                "*"
            ],
            "Effect": "Allow"
        }

ソースコードはこんな感じで動きました。ENDPOINT、REGION、ROLE_ARN、S3_BUCKET、S3_OBJECTは環境変数からの定義となります。

import os
import sys

import boto3

sys.path.append(os.path.join(
    os.path.abspath(os.path.dirname(__file__)), 'lib'))
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

ENDPOINT = os.environ['ES_ENDPOINT']
REGION = os.environ['REGION']
ROLE_ARN = os.environ['ROLE_ARN']
S3_BUCKET = os.environ['S3_BUCKET']
S3_OBJECT = os.environ['S3_OBJECT']


def run(event, context):
    es_client = connect_es(ENDPOINT)

    if event['method'] == "create-index":
        s3 = boto3.client('s3')
        index_doc = s3.get_object(Bucket=S3_BUCKET, Key=S3_OBJECT)['Body'].read()
        print(index_doc)
        create_index(es_client, event['index'], index_doc)
        return "Success"
    return es_client.info()

def connect_es(es_endpoint):

    print('Connecting to the ES Endpoint {0}'.format(es_endpoint))
    credentials = get_credential()
    awsauth = AWS4Auth(credentials['access_key'], credentials['secret_key'], REGION, 'es', session_token=credentials['token'])

    try:
        es_client = Elasticsearch(
            hosts=[{'host': es_endpoint, 'port': 443}],
            http_auth=awsauth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection)
        return es_client
    except Exception as E:
        print("Unable to connect to {0}".format(es_endpoint))
        print(E)
        exit(3)

def create_index(es_client, index_name, index_doc):
 try:
  res = es_client.indices.exists(index_name)
  print("Index Exists ... {}".format(res))
  if res is False:
   es_client.indices.create(index_name, body=index_doc)
 except Exception as E:
  print("Unable to Create Index {0}".format("metadata-store"))
  print(E)
  exit(4)

def get_credential():
    client = boto3.client('sts')
    assumedRoleObject = client.assume_role(
        RoleArn=ROLE_ARN,
        RoleSessionName="Access_to_ES_from_lambda"
    )
    credentials = assumedRoleObject['Credentials']
    return { 'access_key': credentials['AccessKeyId'],
             'secret_key': credentials['SecretAccessKey'],
             'token': credentials['SessionToken'] }

コードを見れば分かる通り、S3にアクセスするため、権限の設定が必要になります。

S3に配置するJSONデータの準備

今回はS3にインデックスの設定を保持します。こんな感じのdata.jsonを配置します。この辺はAWS Solution Architectのブログを参考にします。

【AWS Database Blog】AWS Lambda と Pythonを使ってメタデータをAmazon Elasticsearch Serviceにインデクシング

{
    "dataRecord": {
        "properties": {
            "createdDate": {
                "type": "date",
                "format": "dateOptionalTime"
            },
            "objectKey": {
                "type": "string",
                "format": "dateOptionalTime"
            },
            "content_type": {
                "type": "string"
            },
            "content_length": {
                "type": "long"
            },
            "metadata": {
                "type": "string"
            }
        }
    },
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    }
}

Lambdaを実行

以下のJSONをパラメータとして実行します。JSONパラメータを送らなければ、Elasticsearch Clientの情報をReturnするようになってます。

{
  "method": "create-index",
  "index": "sample-index"
}

アクセスしてみる

アクセスしたところ、無事インデックスが作成されました。sample-indexが増えているのがわかります。

スクリーンショット 2017-12-01 18.29.10.png

まとめ

今回初めてAmazon Elasticsearch Serviceを利用しましたが、準備段階の手間が省けているのはありがたいですね。またAWS上のサービスなので、Lambdaなどの別サービスからのアクセスも簡単でした。

おまけ

ちなみに最初、受け取ったElasticsearch ClientをそのままReturnしていたのですが、その時のエラーメッセージがこんな感じ。一見するとJSONで保持できない=パラメータが異なった?と受け取れます。

{
  "errorMessage": "<Elasticsearch([{'host': 'HOSTNAME.us-east-1.es.amazonaws.com', 'port': 443}])> is not JSON serializable",
  "errorType": "TypeError",
  "stackTrace": [
    [
      "/var/lang/lib/python3.6/json/__init__.py",
      238,
      "dumps",
      "**kw).encode(obj)"
    ],
    [
      "/var/lang/lib/python3.6/json/encoder.py",
      199,
      "encode",
      "chunks = self.iterencode(o, _one_shot=True)"
    ],
    [
      "/var/lang/lib/python3.6/json/encoder.py",
      257,
      "iterencode",
      "return _iterencode(o, 0)"
    ],
    [
      "/var/runtime/awslambda/bootstrap.py",
      110,
      "decimal_serializer",
      "raise TypeError(repr(o) + \" is not JSON serializable\")"
    ]
  ]
}

これ、原因が

    return es_client

es_clientをそのまま返していたから。

    return es_client.info()

とすれば動くのですが、こんなの気づけんって。。。せめてピンポイントでエラー発生行番号が出力されれば話は別ですが、不親切すぎなエラーメッセージ(^^;どこかで改善されることを期待っす。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
10