Help us understand the problem. What is going on with this article?

TABLE 形式の Aurora(MySQL 互換)スロークエリログを Elasticsearch Service (6.8) に取り込み S3 に保存するメモ(Lambda Python 3.8 版)

こちらは、

の Elasticsearch Service 6.8 / Lambda Python 3.8 対応化メモです。

1 ヶ月ほど前に、

を掲載しましたが、1 つだけ忘れたままになっていましたので追加で掲載します。

※いまは Aurora MySQL 互換版から CloudWatch Logs へほぼリアルタイムにスロークエリログを書き出すことができるので、この記事の方法よりもそちらを使うことをお勧めします。

手順

1. Amazon Elasticsearch Service の起動

ALB の元記事のとおりです(省略)。

2. Ingest Pipeline の設定

元記事と同じですが再掲します(curlコマンドで設定)。

curlでPipeline設定
$ curl -H "Content-Type: application/json" -XPUT 'https://XXXXX-XXXXXXXXXXXXXXXXXXXXXXXXXX.ap-northeast-1.es.amazonaws.com/_ingest/pipeline/slowlog' -d '{
"processors": [{
    "grok": {
      "field": "message",
      "patterns":[ "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:user_host} %{INT:query_time:int} %{INT:lock_time:int} %{INT:rows_sent:int} %{INT:rows_examined:int} %{NOTSPACE:db} %{GREEDYDATA:sql_test}" ],
      "ignore_missing": true
    }
  },{
    "remove":{
      "field": "message"
    }
  }]
}'

3. AWS Lambda ファンクションの作成

IAM Role の作成

ここまでは元記事のとおりです(省略)。

Lambda にアップロードする .zip ファイルの作成(Amazon Linux 2 上で)

Lambda用アップロードファイル.zip化
$ python3 -m venv dev
$ . dev/bin/activate
(dev) $ mkdir aurora_log_to_es_s3
(dev) $ cd aurora_log_to_es_s3/
(dev) $ pip install requests pymysql requests_aws4auth -t ./
(省略)
(dev) $ rm -rf *.dist-info
(dev) $ vi auroralog.py
(ここで「auroralog.py」のコードを入力)
(dev) $ zip -r ../auroralog.zip
auroralog.py
import boto3
import datetime
import os
import pymysql
import re
import sys
import requests
from requests_aws4auth import AWS4Auth

aurora_host = os.environ["AURORA_HOST"]
aurora_user = os.environ["AURORA_USER"]
aurora_pass = os.environ["AURORA_PASS"]

try:
    conn = pymysql.connect(aurora_host, user=aurora_user, passwd=aurora_pass, db="mysql", connect_timeout=10)
except:
    print("ERROR: Could not connect to Aurora instance : [%s]." % aurora_host)
    sys.exit()

def lambda_handler(event, context):

    print("Started")
    lasthour = datetime.datetime.today() - datetime.timedelta(hours = (1 - 9))
    exportdate1 = lasthour.strftime('%Y%m%d')
    exportdate2 = lasthour.strftime('%Y-%m-%d')
    exporttime = lasthour.strftime('%H')
    es_host = os.environ["ES_HOST"]
    es_index = os.environ["ES_INDEX_PREFIX"] + "-" + exportdate1
    s3_bucket = os.environ["S3_BUCKET"]
    s3_key = os.environ["S3_PREFIX"] + "/" + exportdate2 + "_" + exporttime
    region = os.environ["AWS_REGION"]
    service = 'es'
    credentials = boto3.Session().get_credentials()
    awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

    print("Export: " + exportdate1 + "_" + exporttime)

    with conn.cursor() as cur:
        cur.execute("SELECT CONCAT(DATE(CONVERT_TZ(start_time, '+09:00', 'UTC')), 'T', TIME(CONVERT_TZ(start_time, '+09:00', 'UTC')), 'Z ', REPLACE(user_host, ' ', ''), ' ', TIME_TO_SEC(query_time), ' ', TIME_TO_SEC(lock_time), ' ', rows_sent, ' ', rows_examined, ' ', (CASE WHEN db='' THEN '-' ELSE db END), ' ', sql_text) FROM mysql.slow_log WHERE start_time BETWEEN '%s %s:00:00' AND '%s %s:59:59.999'" % (exportdate2, exporttime, exportdate2, exporttime))
        file_data = ""
        file_count = 1
        es_data = ""

        for line in cur:
            line_data = re.sub(r"(\\t| )+", " ", re.sub(r"\?+", "?", "".join(str(line)).strip()[2:-3].replace('"', '\\"')))
            file_data += "%s\n" % line_data
            es_data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index
            es_data += '{"message":"%s"}\n' % line_data
            if len(file_data) > 3000000:
                s3_client = boto3.client("s3")
                s3_client.put_object(
                  Bucket=s3_bucket,
                  Key=s3_key + "-" + str(file_count),
                  Body=file_data
                )
                #print("--- file: %s" % file_count)
                #print(file_data)
                file_data = ""
                file_count += 1
            if len(es_data) > 3000000:
                _bulk(es_host, es_data, awsauth)
                #print("--- es")
                #print(es_data)
                es_data = ""

        if file_data != "":
            s3_client = boto3.client("s3")
            s3_client.put_object(
              Bucket=s3_bucket,
              Key=s3_key + "-" + str(file_count),
              Body=file_data
            )
            #print("--- file: %s" % file_count)
            #print(file_data)

        if es_data != "":
            _bulk(es_host, es_data, awsauth)
            #print("--- es")
            #print(es_data)

    return "Completed"

def _bulk(host, doc, awsauth):
    pipeline = os.environ["PIPELINE_NAME"]

    url = "https://%s/_bulk?pipeline=%s" % (host, pipeline)
    headers = {"Content-Type": "application/json"}
    response = request(url, awsauth, headers=headers, data=doc)

    if response.status_code != requests.codes.ok:
        print(response.text)

def request(url, awsauth, headers=None, data=None):
    return requests.post(url, auth=awsauth, headers=headers, data=data)

Lambda ファンクションの作成

ここから先は元記事と同じです(省略)。

注意点

注意点も基本的には元記事のとおりです。

ただし、Python 3 系で実行することになったため、ログに含まれる全角文字はそのまま記録されます。

機密情報・個人情報保護の関係でそのまま記録したくない場合は、Percona Toolkit に含まれる pt-fingerprint などを使って SQL をノーマライズすると良いでしょう。


Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした