Help us understand the problem. What is going on with this article?

ALB のアクセスログを Elasticsearch Service 6.8 に取り込むメモ(Lambda Python 3.8 版)

こちらは、

を、

  • Lambda Python 3.8
  • Elasticsearch Service 6.8

対応にするための手順(差分のみ)です。

先に GitHub リポジトリにコードのみ掲載しておきましたが、こちらに補足説明を記載します。

※タイトルから「CLB」を外しましたが、おそらく動くのではないかと思います(未確認)。

手順

1. Amazon Elasticsearch Service の起動

元記事のとおりです(省略)。

2. Ingest Pipeline の設定

こちらも元記事のとおりです(省略)。

3. AWS Lambda ファンクションの作成

IAM Role の作成

ここまでは元記事のとおりです(省略)。

Lambda ファンクションの作成

.zip ファイル化までが元記事と異なります。

こちらの記事を参考に、Amazon Linux 2 上で Python 3 のコードを .zip 化します。

Lambda用アップロードファイル.zip化
$ python3 -m venv dev
$ . dev/bin/activate
(dev) $ mkdir alb_log_to_es_s3
(dev) $ cd alb_log_to_es_s3/
(dev) $ pip install requests requests_aws4auth -t ./
(省略)
(dev) $ rm -rf *.dist-info
(dev) $ vi lambda_function.py
(ここで「lambda_function.py」のコードを入力)
(dev) $ zip -r ../alblog.zip
lambda_function.py
import boto3
import os
import gzip
import requests
from datetime import datetime
from requests_aws4auth import AWS4Auth

def lambda_handler(event, context):
    print('Started')
    es_index = os.environ['ES_INDEX_PREFIX'] + "-" + datetime.strftime(datetime.now(), "%Y%m%d")
    region = os.environ["AWS_REGION"]
    service = 'es'
    credentials = boto3.Session().get_credentials()
    awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]

    s3 = boto3.resource('s3')
    s3.Bucket(bucket).download_file(key, '/tmp/log.gz')

    with gzip.open('/tmp/log.gz', mode='rt') as f:
        data = ""

        for line in f:
            data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index
            data += '{"message":"%s"}\n' % line.strip().replace('"', '\\"')

            if len(data) > 3000000:
                _bulk(data, awsauth)
                data = ""

        if data != "":
            _bulk(data, awsauth)

    return 'Completed'

def _bulk(data, awsauth):    
    es_host = os.environ['ES_HOST']
    pipeline = os.environ['PIPELINE_NAME']
    url = 'https://%s/_bulk?pipeline=%s' % (es_host, pipeline)

    headers = {'Content-Type': 'application/json'}
    response = request(url, awsauth, headers=headers, data=data)

    if response.status_code != requests.codes.ok:
        print(response.text)

def request(url, awsauth, headers=None, data=None):
    return requests.post(url, auth=awsauth, headers=headers, data=data)

今回のバージョンではContent-Type: application/x-ndjsonがエラーになります。そのため、Content-Type: application/jsonを指定しています。

  • 環境変数
  • アクセスログが保存されている S3 バケットのトリガー
  • Lambda の実行用ロール・タイムアウト時間・メモリサイズ・ネットワーク設定

などは元記事のとおりです(省略)。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away