Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
0
Help us understand the problem. What is going on with this article?
@hmatsu47

TABLE 形式の Aurora(MySQL 互換)スロークエリログを Elasticsearch Service (6.8) に取り込み S3 に保存するメモ(Lambda Python 3.8 版)

こちらは、

の Elasticsearch Service 6.8 / Lambda Python 3.8 対応化メモです。

1 ヶ月ほど前に、

を掲載しましたが、1 つだけ忘れたままになっていましたので追加で掲載します。

※いまは Aurora MySQL 互換版から CloudWatch Logs へほぼリアルタイムにスロークエリログを書き出すことができるので、この記事の方法よりもそちらを使うことをお勧めします。

手順

1. Amazon Elasticsearch Service の起動

ALB の元記事のとおりです(省略)。

2. Ingest Pipeline の設定

元記事と同じですが再掲します(curlコマンドで設定)。

curlでPipeline設定
$ curl -H "Content-Type: application/json" -XPUT 'https://XXXXX-XXXXXXXXXXXXXXXXXXXXXXXXXX.ap-northeast-1.es.amazonaws.com/_ingest/pipeline/slowlog' -d '{
"processors": [{
    "grok": {
      "field": "message",
      "patterns":[ "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:user_host} %{INT:query_time:int} %{INT:lock_time:int} %{INT:rows_sent:int} %{INT:rows_examined:int} %{NOTSPACE:db} %{GREEDYDATA:sql_test}" ],
      "ignore_missing": true
    }
  },{
    "remove":{
      "field": "message"
    }
  }]
}'

3. AWS Lambda ファンクションの作成

IAM Role の作成

ここまでは元記事のとおりです(省略)。

Lambda にアップロードする .zip ファイルの作成(Amazon Linux 2 上で)

Lambda用アップロードファイル.zip化
$ python3 -m venv dev
$ . dev/bin/activate
(dev) $ mkdir aurora_log_to_es_s3
(dev) $ cd aurora_log_to_es_s3/
(dev) $ pip install requests pymysql requests_aws4auth -t ./
(省略)
(dev) $ rm -rf *.dist-info
(dev) $ vi auroralog.py
(ここで「auroralog.py」のコードを入力)
(dev) $ zip -r ../auroralog.zip *
auroralog.py
import boto3
import datetime
import os
import pymysql
import re
import sys
import requests
from requests_aws4auth import AWS4Auth

aurora_host = os.environ["AURORA_HOST"]
aurora_user = os.environ["AURORA_USER"]
aurora_pass = os.environ["AURORA_PASS"]

try:
    conn = pymysql.connect(aurora_host, user=aurora_user, passwd=aurora_pass, db="mysql", connect_timeout=10)
except:
    print("ERROR: Could not connect to Aurora instance : [%s]." % aurora_host)
    sys.exit()

def lambda_handler(event, context):

    print("Started")
    lasthour = datetime.datetime.today() - datetime.timedelta(hours = (1 - 9))
    exportdate1 = lasthour.strftime('%Y%m%d')
    exportdate2 = lasthour.strftime('%Y-%m-%d')
    exporttime = lasthour.strftime('%H')
    es_host = os.environ["ES_HOST"]
    es_index = os.environ["ES_INDEX_PREFIX"] + "-" + exportdate1
    s3_bucket = os.environ["S3_BUCKET"]
    s3_key = os.environ["S3_PREFIX"] + "/" + exportdate2 + "_" + exporttime
    region = os.environ["AWS_REGION"]
    service = 'es'
    credentials = boto3.Session().get_credentials()
    awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

    print("Export: " + exportdate1 + "_" + exporttime)

    with conn.cursor() as cur:
        cur.execute("SELECT CONCAT(DATE(CONVERT_TZ(start_time, '+09:00', 'UTC')), 'T', TIME(CONVERT_TZ(start_time, '+09:00', 'UTC')), 'Z ', REPLACE(user_host, ' ', ''), ' ', TIME_TO_SEC(query_time), ' ', TIME_TO_SEC(lock_time), ' ', rows_sent, ' ', rows_examined, ' ', (CASE WHEN db='' THEN '-' ELSE db END), ' ', sql_text) FROM mysql.slow_log WHERE start_time BETWEEN '%s %s:00:00' AND '%s %s:59:59.999'" % (exportdate2, exporttime, exportdate2, exporttime))
        file_data = ""
        file_count = 1
        es_data = ""

        for line in cur:
            line_data = re.sub(r"(\\t| )+", " ", re.sub(r"\?+", "?", "".join(str(line)).strip()[2:-3].replace('"', '\\"')))
            file_data += "%s\n" % line_data
            es_data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index
            es_data += '{"message":"%s"}\n' % line_data
            if len(file_data) > 3000000:
                s3_client = boto3.client("s3")
                s3_client.put_object(
                  Bucket=s3_bucket,
                  Key=s3_key + "-" + str(file_count),
                  Body=file_data
                )
                #print("--- file: %s" % file_count)
                #print(file_data)
                file_data = ""
                file_count += 1
            if len(es_data) > 3000000:
                _bulk(es_host, es_data, awsauth)
                #print("--- es")
                #print(es_data)
                es_data = ""

        if file_data != "":
            s3_client = boto3.client("s3")
            s3_client.put_object(
              Bucket=s3_bucket,
              Key=s3_key + "-" + str(file_count),
              Body=file_data
            )
            #print("--- file: %s" % file_count)
            #print(file_data)

        if es_data != "":
            _bulk(es_host, es_data, awsauth)
            #print("--- es")
            #print(es_data)

    return "Completed"

def _bulk(host, doc, awsauth):
    pipeline = os.environ["PIPELINE_NAME"]

    url = "https://%s/_bulk?pipeline=%s" % (host, pipeline)
    headers = {"Content-Type": "application/json"}
    response = request(url, awsauth, headers=headers, data=doc)

    if response.status_code != requests.codes.ok:
        print(response.text)

def request(url, awsauth, headers=None, data=None):
    return requests.post(url, auth=awsauth, headers=headers, data=data)

Lambda ファンクションの作成

ここから先は元記事と同じです(省略)。

注意点

注意点も基本的には元記事のとおりです。

ただし、Python 3 系で実行することになったため、ログに含まれる全角文字はそのまま記録されます。

機密情報・個人情報保護の関係でそのまま記録したくない場合は、Percona Toolkit に含まれる pt-fingerprint などを使って SQL をノーマライズすると良いでしょう。


0
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
hmatsu47
名古屋で士業向けWebサービスのインフラ構築管理、たまにアプリケーション開発をやっています。 業務利用しているもの、個人研究など、気長にのんびり投稿していきます。ニッチ狙いが多めです。 IPA RISS(001158)・NW・DB/日商・大商2級コレクター?(簿記・ビジネス法務・ビジネス会計)。
infra-workshop
インフラ技術を勉強したい人たちのためのオンライン勉強会です

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
0
Help us understand the problem. What is going on with this article?