3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ALB のアクセスログを Elasticsearch Service 6.8 に取り込むメモ(Lambda Python 3.8 版)

Last updated at Posted at 2020-01-05

こちらは、

を、

  • Lambda Python 3.8
  • Elasticsearch Service 6.8

対応にするための手順(差分のみ)です。

先に GitHub リポジトリにコードのみ掲載しておきましたが、こちらに補足説明を記載します。

※タイトルから「CLB」を外しましたが、おそらく動くのではないかと思います(未確認)。

手順

1. Amazon Elasticsearch Service の起動

元記事のとおりです(省略)。

2. Ingest Pipeline の設定

こちらも元記事のとおりです(省略)。

3. AWS Lambda ファンクションの作成

IAM Role の作成

ここまでは元記事のとおりです(省略)。

Lambda ファンクションの作成

.zip ファイル化までが元記事と異なります。

こちらの記事を参考に、Amazon Linux 2 上で Python 3 のコードを .zip 化します。

Lambda用アップロードファイル.zip化
$ python3 -m venv dev
$ . dev/bin/activate
(dev) $ mkdir alb_log_to_es_s3
(dev) $ cd alb_log_to_es_s3/
(dev) $ pip install requests requests_aws4auth -t ./
(省略)
(dev) $ rm -rf *.dist-info
(dev) $ vi lambda_function.py
(ここで「lambda_function.py」のコードを入力)
(dev) $ zip -r ../alblog.zip *
lambda_function.py
import boto3
import os
import gzip
import requests
from datetime import datetime
from requests_aws4auth import AWS4Auth

def lambda_handler(event, context):
    print('Started')
    es_index = os.environ['ES_INDEX_PREFIX'] + "-" + datetime.strftime(datetime.now(), "%Y%m%d")
    region = os.environ["AWS_REGION"]
    service = 'es'
    credentials = boto3.Session().get_credentials()
    awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]

    s3 = boto3.resource('s3')
    s3.Bucket(bucket).download_file(key, '/tmp/log.gz')

    with gzip.open('/tmp/log.gz', mode='rt') as f:
        data = ""

        for line in f:
            data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index
            data += '{"message":"%s"}\n' % line.strip().replace('"', '\\"')
    
            if len(data) > 3000000:
                _bulk(data, awsauth)
                data = ""

        if data != "":
            _bulk(data, awsauth)

    return 'Completed'

def _bulk(data, awsauth):    
    es_host = os.environ['ES_HOST']
    pipeline = os.environ['PIPELINE_NAME']
    url = 'https://%s/_bulk?pipeline=%s' % (es_host, pipeline)

    headers = {'Content-Type': 'application/json'}
    response = request(url, awsauth, headers=headers, data=data)
    
    if response.status_code != requests.codes.ok:
        print(response.text)

def request(url, awsauth, headers=None, data=None):
    return requests.post(url, auth=awsauth, headers=headers, data=data)

今回のバージョンではContent-Type: application/x-ndjsonがエラーになります。そのため、Content-Type: application/jsonを指定しています。

  • 環境変数
  • アクセスログが保存されている S3 バケットのトリガー
  • Lambda の実行用ロール・タイムアウト時間・メモリサイズ・ネットワーク設定

などは元記事のとおりです(省略)。

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?