この投稿は、AWS Lambda Advent Calendar 2017の初日の投稿になります。
初日なので簡単なのをば!
Elasticsearchは今までローカルやプライベートネットワーク上にインストールして使うことが多く、マネージドサービスを使ったことがなかったので、今回Amazon Elsaticsearch Serviceを使ってみました。
で、色々なサイトを参考させてもらい、設定〜インデックス作成まで実施したので、その備忘録を載せます。
#準備
準備ですが、クラスタを作成するだけです。当然t2.smallで作成。Elasticsearchのバージョンは5.5を選択しました。他はデフォルトの設定です。
#Lambdaからアクセスしてインデックスを作成してみる
AWS Lambdaに適当な関数を作成して、とりあえずアクセスしてみます。こちらを参考にさせてもらいました。
Lambda から elasticsearch service に何かする [cloudpack OSAKA blog]
このコードでアクセスするためには、STSのAssumeRolests:AssumeRole
がIAM Roleに付与されている必要があります。ポイントとなるポリシーの設定はこんな感じです。(もちろんテスト的な設定なので、本来はきちんとアクセス権限を設計しましょう)
{
"Action": [
"sts:AssumeRole"
],
"Resource": [
"*"
],
"Effect": "Allow"
}
ソースコードはこんな感じで動きました。ENDPOINT、REGION、ROLE_ARN、S3_BUCKET、S3_OBJECTは環境変数からの定義となります。
import os
import sys
import boto3
sys.path.append(os.path.join(
os.path.abspath(os.path.dirname(__file__)), 'lib'))
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
ENDPOINT = os.environ['ES_ENDPOINT']
REGION = os.environ['REGION']
ROLE_ARN = os.environ['ROLE_ARN']
S3_BUCKET = os.environ['S3_BUCKET']
S3_OBJECT = os.environ['S3_OBJECT']
def run(event, context):
es_client = connect_es(ENDPOINT)
if event['method'] == "create-index":
s3 = boto3.client('s3')
index_doc = s3.get_object(Bucket=S3_BUCKET, Key=S3_OBJECT)['Body'].read()
print(index_doc)
create_index(es_client, event['index'], index_doc)
return "Success"
return es_client.info()
def connect_es(es_endpoint):
print('Connecting to the ES Endpoint {0}'.format(es_endpoint))
credentials = get_credential()
awsauth = AWS4Auth(credentials['access_key'], credentials['secret_key'], REGION, 'es', session_token=credentials['token'])
try:
es_client = Elasticsearch(
hosts=[{'host': es_endpoint, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection)
return es_client
except Exception as E:
print("Unable to connect to {0}".format(es_endpoint))
print(E)
exit(3)
def create_index(es_client, index_name, index_doc):
try:
res = es_client.indices.exists(index_name)
print("Index Exists ... {}".format(res))
if res is False:
es_client.indices.create(index_name, body=index_doc)
except Exception as E:
print("Unable to Create Index {0}".format("metadata-store"))
print(E)
exit(4)
def get_credential():
client = boto3.client('sts')
assumedRoleObject = client.assume_role(
RoleArn=ROLE_ARN,
RoleSessionName="Access_to_ES_from_lambda"
)
credentials = assumedRoleObject['Credentials']
return { 'access_key': credentials['AccessKeyId'],
'secret_key': credentials['SecretAccessKey'],
'token': credentials['SessionToken'] }
コードを見れば分かる通り、S3にアクセスするため、権限の設定が必要になります。
#S3に配置するJSONデータの準備
今回はS3にインデックスの設定を保持します。こんな感じのdata.jsonを配置します。この辺はAWS Solution Architectのブログを参考にします。
【AWS Database Blog】AWS Lambda と Pythonを使ってメタデータをAmazon Elasticsearch Serviceにインデクシング
{
"dataRecord": {
"properties": {
"createdDate": {
"type": "date",
"format": "dateOptionalTime"
},
"objectKey": {
"type": "string",
"format": "dateOptionalTime"
},
"content_type": {
"type": "string"
},
"content_length": {
"type": "long"
},
"metadata": {
"type": "string"
}
}
},
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
}
}
#Lambdaを実行
以下のJSONをパラメータとして実行します。JSONパラメータを送らなければ、Elasticsearch Clientの情報をReturnするようになってます。
{
"method": "create-index",
"index": "sample-index"
}
#アクセスしてみる
アクセスしたところ、無事インデックスが作成されました。sample-indexが増えているのがわかります。
#まとめ
今回初めてAmazon Elasticsearch Serviceを利用しましたが、準備段階の手間が省けているのはありがたいですね。またAWS上のサービスなので、Lambdaなどの別サービスからのアクセスも簡単でした。
#おまけ
ちなみに最初、受け取ったElasticsearch ClientをそのままReturnしていたのですが、その時のエラーメッセージがこんな感じ。一見するとJSONで保持できない=パラメータが異なった?と受け取れます。
{
"errorMessage": "<Elasticsearch([{'host': 'HOSTNAME.us-east-1.es.amazonaws.com', 'port': 443}])> is not JSON serializable",
"errorType": "TypeError",
"stackTrace": [
[
"/var/lang/lib/python3.6/json/__init__.py",
238,
"dumps",
"**kw).encode(obj)"
],
[
"/var/lang/lib/python3.6/json/encoder.py",
199,
"encode",
"chunks = self.iterencode(o, _one_shot=True)"
],
[
"/var/lang/lib/python3.6/json/encoder.py",
257,
"iterencode",
"return _iterencode(o, 0)"
],
[
"/var/runtime/awslambda/bootstrap.py",
110,
"decimal_serializer",
"raise TypeError(repr(o) + \" is not JSON serializable\")"
]
]
}
これ、原因が
return es_client
es_clientをそのまま返していたから。
return es_client.info()
とすれば動くのですが、こんなの気づけんって。。。せめてピンポイントでエラー発生行番号が出力されれば話は別ですが、不親切すぎなエラーメッセージ(^^;どこかで改善されることを期待っす。