こちらは、
を、
- Lambda Python 3.8
- Elasticsearch Service 6.8
対応にするための手順(差分のみ)です。
先に GitHub リポジトリにコードのみ掲載しておきましたが、こちらに補足説明を記載します。
※タイトルから「CLB」を外しましたが、おそらく動くのではないかと思います(未確認)。
手順
1. Amazon Elasticsearch Service の起動
元記事のとおりです(省略)。
2. Ingest Pipeline の設定
こちらも元記事のとおりです(省略)。
3. AWS Lambda ファンクションの作成
IAM Role の作成
ここまでは元記事のとおりです(省略)。
Lambda ファンクションの作成
.zip ファイル化までが元記事と異なります。
こちらの記事を参考に、Amazon Linux 2 上で Python 3 のコードを .zip 化します。
- 手順メモ:AWS Lambdaでzipファイルをアップロードするまでに必要なこと(swallowgreen さん)
Lambda用アップロードファイル.zip化
$ python3 -m venv dev
$ . dev/bin/activate
(dev) $ mkdir alb_log_to_es_s3
(dev) $ cd alb_log_to_es_s3/
(dev) $ pip install requests requests_aws4auth -t ./
(省略)
(dev) $ rm -rf *.dist-info
(dev) $ vi lambda_function.py
(ここで「lambda_function.py」のコードを入力)
(dev) $ zip -r ../alblog.zip *
lambda_function.py
import boto3
import os
import gzip
import requests
from datetime import datetime
from requests_aws4auth import AWS4Auth
def lambda_handler(event, context):
print('Started')
es_index = os.environ['ES_INDEX_PREFIX'] + "-" + datetime.strftime(datetime.now(), "%Y%m%d")
region = os.environ["AWS_REGION"]
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
s3 = boto3.resource('s3')
s3.Bucket(bucket).download_file(key, '/tmp/log.gz')
with gzip.open('/tmp/log.gz', mode='rt') as f:
data = ""
for line in f:
data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index
data += '{"message":"%s"}\n' % line.strip().replace('"', '\\"')
if len(data) > 3000000:
_bulk(data, awsauth)
data = ""
if data != "":
_bulk(data, awsauth)
return 'Completed'
def _bulk(data, awsauth):
es_host = os.environ['ES_HOST']
pipeline = os.environ['PIPELINE_NAME']
url = 'https://%s/_bulk?pipeline=%s' % (es_host, pipeline)
headers = {'Content-Type': 'application/json'}
response = request(url, awsauth, headers=headers, data=data)
if response.status_code != requests.codes.ok:
print(response.text)
def request(url, awsauth, headers=None, data=None):
return requests.post(url, auth=awsauth, headers=headers, data=data)
※今回のバージョンではContent-Type: application/x-ndjson
がエラーになります。そのため、Content-Type: application/json
を指定しています。
- 環境変数
- アクセスログが保存されている S3 バケットのトリガー
- Lambda の実行用ロール・タイムアウト時間・メモリサイズ・ネットワーク設定
などは元記事のとおりです(省略)。