LoginSignup
0
0

More than 3 years have passed since last update.

TABLE 形式の Aurora(MySQL 互換)スロークエリログを Elasticsearch Service (6.8) に取り込み S3 に保存するメモ(Lambda Python 3.8 版)

Last updated at Posted at 2020-02-09

こちらは、

の Elasticsearch Service 6.8 / Lambda Python 3.8 対応化メモです。

1 ヶ月ほど前に、

を掲載しましたが、1 つだけ忘れたままになっていましたので追加で掲載します。

※いまは Aurora MySQL 互換版から CloudWatch Logs へほぼリアルタイムにスロークエリログを書き出すことができるので、この記事の方法よりもそちらを使うことをお勧めします。

手順

1. Amazon Elasticsearch Service の起動

ALB の元記事のとおりです(省略)。

2. Ingest Pipeline の設定

元記事と同じですが再掲します(curlコマンドで設定)。

curlでPipeline設定
$ curl -H "Content-Type: application/json" -XPUT 'https://XXXXX-XXXXXXXXXXXXXXXXXXXXXXXXXX.ap-northeast-1.es.amazonaws.com/_ingest/pipeline/slowlog' -d '{
"processors": [{
    "grok": {
      "field": "message",
      "patterns":[ "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:user_host} %{INT:query_time:int} %{INT:lock_time:int} %{INT:rows_sent:int} %{INT:rows_examined:int} %{NOTSPACE:db} %{GREEDYDATA:sql_test}" ],
      "ignore_missing": true
    }
  },{
    "remove":{
      "field": "message"
    }
  }]
}'

3. AWS Lambda ファンクションの作成

IAM Role の作成

ここまでは元記事のとおりです(省略)。

Lambda にアップロードする .zip ファイルの作成(Amazon Linux 2 上で)

Lambda用アップロードファイル.zip化
$ python3 -m venv dev
$ . dev/bin/activate
(dev) $ mkdir aurora_log_to_es_s3
(dev) $ cd aurora_log_to_es_s3/
(dev) $ pip install requests pymysql requests_aws4auth -t ./
(省略)
(dev) $ rm -rf *.dist-info
(dev) $ vi auroralog.py
(ここで「auroralog.py」のコードを入力)
(dev) $ zip -r ../auroralog.zip *
auroralog.py
import boto3
import datetime
import os
import pymysql
import re
import sys
import requests
from requests_aws4auth import AWS4Auth

aurora_host = os.environ["AURORA_HOST"]
aurora_user = os.environ["AURORA_USER"]
aurora_pass = os.environ["AURORA_PASS"]

try:
    conn = pymysql.connect(aurora_host, user=aurora_user, passwd=aurora_pass, db="mysql", connect_timeout=10)
except:
    print("ERROR: Could not connect to Aurora instance : [%s]." % aurora_host)
    sys.exit()

def lambda_handler(event, context):

    print("Started")
    lasthour = datetime.datetime.today() - datetime.timedelta(hours = (1 - 9))
    exportdate1 = lasthour.strftime('%Y%m%d')
    exportdate2 = lasthour.strftime('%Y-%m-%d')
    exporttime = lasthour.strftime('%H')
    es_host = os.environ["ES_HOST"]
    es_index = os.environ["ES_INDEX_PREFIX"] + "-" + exportdate1
    s3_bucket = os.environ["S3_BUCKET"]
    s3_key = os.environ["S3_PREFIX"] + "/" + exportdate2 + "_" + exporttime
    region = os.environ["AWS_REGION"]
    service = 'es'
    credentials = boto3.Session().get_credentials()
    awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

    print("Export: " + exportdate1 + "_" + exporttime)

    with conn.cursor() as cur:
        cur.execute("SELECT CONCAT(DATE(CONVERT_TZ(start_time, '+09:00', 'UTC')), 'T', TIME(CONVERT_TZ(start_time, '+09:00', 'UTC')), 'Z ', REPLACE(user_host, ' ', ''), ' ', TIME_TO_SEC(query_time), ' ', TIME_TO_SEC(lock_time), ' ', rows_sent, ' ', rows_examined, ' ', (CASE WHEN db='' THEN '-' ELSE db END), ' ', sql_text) FROM mysql.slow_log WHERE start_time BETWEEN '%s %s:00:00' AND '%s %s:59:59.999'" % (exportdate2, exporttime, exportdate2, exporttime))
        file_data = ""
        file_count = 1
        es_data = ""

        for line in cur:
            line_data = re.sub(r"(\\t| )+", " ", re.sub(r"\?+", "?", "".join(str(line)).strip()[2:-3].replace('"', '\\"')))
            file_data += "%s\n" % line_data
            es_data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index
            es_data += '{"message":"%s"}\n' % line_data
            if len(file_data) > 3000000:
                s3_client = boto3.client("s3")
                s3_client.put_object(
                  Bucket=s3_bucket,
                  Key=s3_key + "-" + str(file_count),
                  Body=file_data
                )
                #print("--- file: %s" % file_count)
                #print(file_data)
                file_data = ""
                file_count += 1
            if len(es_data) > 3000000:
                _bulk(es_host, es_data, awsauth)
                #print("--- es")
                #print(es_data)
                es_data = ""

        if file_data != "":
            s3_client = boto3.client("s3")
            s3_client.put_object(
              Bucket=s3_bucket,
              Key=s3_key + "-" + str(file_count),
              Body=file_data
            )
            #print("--- file: %s" % file_count)
            #print(file_data)

        if es_data != "":
            _bulk(es_host, es_data, awsauth)
            #print("--- es")
            #print(es_data)

    return "Completed"

def _bulk(host, doc, awsauth):
    pipeline = os.environ["PIPELINE_NAME"]

    url = "https://%s/_bulk?pipeline=%s" % (host, pipeline)
    headers = {"Content-Type": "application/json"}
    response = request(url, awsauth, headers=headers, data=doc)

    if response.status_code != requests.codes.ok:
        print(response.text)

def request(url, awsauth, headers=None, data=None):
    return requests.post(url, auth=awsauth, headers=headers, data=data)

Lambda ファンクションの作成

ここから先は元記事と同じです(省略)。

注意点

注意点も基本的には元記事のとおりです。

ただし、Python 3 系で実行することになったため、ログに含まれる全角文字はそのまま記録されます。

機密情報・個人情報保護の関係でそのまま記録したくない場合は、Percona Toolkit に含まれる pt-fingerprint などを使って SQL をノーマライズすると良いでしょう。


0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0