こちらは、
の Elasticsearch Service 6.8 / Lambda Python 3.8 対応化メモです。
1 ヶ月ほど前に、
- ALB のアクセスログを Elasticsearch Service 6.8 に取り込むメモ(Lambda Python 3.8 版)
- CloudFront のアクセスログを Elasticsearch Service 6.8 に取り込むメモ(Lambda Python 3.8 版)
- Elasticsearch Service 6.8 に取り込んだログ(INDEX)を Curator で削除するメモ(Lambda Python 3.8 版)
を掲載しましたが、1 つだけ忘れたままになっていましたので追加で掲載します。
※いまは Aurora MySQL 互換版から CloudWatch Logs へほぼリアルタイムにスロークエリログを書き出すことができるので、この記事の方法よりもそちらを使うことをお勧めします。
手順
1. Amazon Elasticsearch Service の起動
ALB の元記事のとおりです(省略)。
2. Ingest Pipeline の設定
元記事と同じですが再掲します(curl
コマンドで設定)。
curlでPipeline設定
$ curl -H "Content-Type: application/json" -XPUT 'https://XXXXX-XXXXXXXXXXXXXXXXXXXXXXXXXX.ap-northeast-1.es.amazonaws.com/_ingest/pipeline/slowlog' -d '{
"processors": [{
"grok": {
"field": "message",
"patterns":[ "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:user_host} %{INT:query_time:int} %{INT:lock_time:int} %{INT:rows_sent:int} %{INT:rows_examined:int} %{NOTSPACE:db} %{GREEDYDATA:sql_test}" ],
"ignore_missing": true
}
},{
"remove":{
"field": "message"
}
}]
}'
3. AWS Lambda ファンクションの作成
IAM Role の作成
ここまでは元記事のとおりです(省略)。
Lambda にアップロードする .zip ファイルの作成(Amazon Linux 2 上で)
Lambda用アップロードファイル.zip化
$ python3 -m venv dev
$ . dev/bin/activate
(dev) $ mkdir aurora_log_to_es_s3
(dev) $ cd aurora_log_to_es_s3/
(dev) $ pip install requests pymysql requests_aws4auth -t ./
(省略)
(dev) $ rm -rf *.dist-info
(dev) $ vi auroralog.py
(ここで「auroralog.py」のコードを入力)
(dev) $ zip -r ../auroralog.zip *
auroralog.py
import boto3
import datetime
import os
import pymysql
import re
import sys
import requests
from requests_aws4auth import AWS4Auth
aurora_host = os.environ["AURORA_HOST"]
aurora_user = os.environ["AURORA_USER"]
aurora_pass = os.environ["AURORA_PASS"]
try:
conn = pymysql.connect(aurora_host, user=aurora_user, passwd=aurora_pass, db="mysql", connect_timeout=10)
except:
print("ERROR: Could not connect to Aurora instance : [%s]." % aurora_host)
sys.exit()
def lambda_handler(event, context):
print("Started")
lasthour = datetime.datetime.today() - datetime.timedelta(hours = (1 - 9))
exportdate1 = lasthour.strftime('%Y%m%d')
exportdate2 = lasthour.strftime('%Y-%m-%d')
exporttime = lasthour.strftime('%H')
es_host = os.environ["ES_HOST"]
es_index = os.environ["ES_INDEX_PREFIX"] + "-" + exportdate1
s3_bucket = os.environ["S3_BUCKET"]
s3_key = os.environ["S3_PREFIX"] + "/" + exportdate2 + "_" + exporttime
region = os.environ["AWS_REGION"]
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
print("Export: " + exportdate1 + "_" + exporttime)
with conn.cursor() as cur:
cur.execute("SELECT CONCAT(DATE(CONVERT_TZ(start_time, '+09:00', 'UTC')), 'T', TIME(CONVERT_TZ(start_time, '+09:00', 'UTC')), 'Z ', REPLACE(user_host, ' ', ''), ' ', TIME_TO_SEC(query_time), ' ', TIME_TO_SEC(lock_time), ' ', rows_sent, ' ', rows_examined, ' ', (CASE WHEN db='' THEN '-' ELSE db END), ' ', sql_text) FROM mysql.slow_log WHERE start_time BETWEEN '%s %s:00:00' AND '%s %s:59:59.999'" % (exportdate2, exporttime, exportdate2, exporttime))
file_data = ""
file_count = 1
es_data = ""
for line in cur:
line_data = re.sub(r"(\\t| )+", " ", re.sub(r"\?+", "?", "".join(str(line)).strip()[2:-3].replace('"', '\\"')))
file_data += "%s\n" % line_data
es_data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index
es_data += '{"message":"%s"}\n' % line_data
if len(file_data) > 3000000:
s3_client = boto3.client("s3")
s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key + "-" + str(file_count),
Body=file_data
)
#print("--- file: %s" % file_count)
#print(file_data)
file_data = ""
file_count += 1
if len(es_data) > 3000000:
_bulk(es_host, es_data, awsauth)
#print("--- es")
#print(es_data)
es_data = ""
if file_data != "":
s3_client = boto3.client("s3")
s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key + "-" + str(file_count),
Body=file_data
)
#print("--- file: %s" % file_count)
#print(file_data)
if es_data != "":
_bulk(es_host, es_data, awsauth)
#print("--- es")
#print(es_data)
return "Completed"
def _bulk(host, doc, awsauth):
pipeline = os.environ["PIPELINE_NAME"]
url = "https://%s/_bulk?pipeline=%s" % (host, pipeline)
headers = {"Content-Type": "application/json"}
response = request(url, awsauth, headers=headers, data=doc)
if response.status_code != requests.codes.ok:
print(response.text)
def request(url, awsauth, headers=None, data=None):
return requests.post(url, auth=awsauth, headers=headers, data=data)
Lambda ファンクションの作成
ここから先は元記事と同じです(省略)。
注意点
注意点も基本的には元記事のとおりです。
ただし、Python 3 系で実行することになったため、ログに含まれる全角文字はそのまま記録されます。
機密情報・個人情報保護の関係でそのまま記録したくない場合は、Percona Toolkit に含まれる pt-fingerprint などを使って SQL をノーマライズすると良いでしょう。