はじめに
Aurora MySQL の監査ログは CloudWatch Logs に保存する設定があります。
Amazon CloudWatch Logs への Amazon Aurora MySQL ログの発行 - Amazon Aurora
この設定をしておくことで、 CloudWatch Logs に監査ログなどが発行され、CloudWatch Logs Insights でログを解析できます。
しかし、CloudWatch Logs Insights の独特なクエリで解析する必要があったり、一度に10,000件ずつしか処理できなかったりと、多少制限があります。
ですが、少し工夫することで、 Athena から解析することも可能です。Athena では、使い慣れた SQL を使用して件数をほとんど気にせず解析ができます。便利ですね。
Aurora MySQL の監査ログも CloudWatch Logs から S3 にエクスポートすれば、 Athena で解析できます。
この記事ではその方法を紹介します!!
CloudWatch Logs のロググループを S3 にエクスポートする
CloudWatch Logs には、ログを S3 にエクスポートする機能があるので、それを使います。
Amazon S3 へのログデータのエクスポート - Amazon CloudWatch ログ
AWS の Console からエクスポートする方法と、AWS CLI からエクスポートする方法があり、それぞれドキュメントに記載されています。
- コンソールを使用してログデータを Amazon S3 にエクスポートする - Amazon CloudWatch ログ
- AWS CLI を使用してログデータを Amazon S3 にエクスポートする - Amazon CloudWatch ログ
基本、ドキュメントに従えばエクスポートできると思いますが、追加でいくつか工夫が必要です。
S3 のバケットにライフサイクルを設定
エクスポート機能でS3に保存されたオブジェクトのストレージクラスは Standard です。長期間保存する場合や解析の頻度が低い場合は Glacier 系にしておいた方がコストが削減できるので、設定しておきましょう。
例: Glacier Instant Retrieval に変更するライフサイクル
エクスポートするパスを Athena で解析しやすいようにする
最終目的は Athena で解析することなので、Athena で解析しやすいようにエクスポートする必要があります。
特に、大量のデータをエクスポートする場合、 Athena でパーティションが作成できるよう工夫しておくと、Athena の料金を抑えることができます。
例として、パーティションを1日ごとに作成したい場合は、 S3 bucket prefix (CLI の場合は --destination-prefix
オプション) を YYYY/mm/dd
形式にして、1日ずつエクスポートします。AWS CLI で 1日ごとにエクスポートタスクを作成するスクリプトを用意すると楽です。
1日ごとに、1ヶ月分のエクスポートタスクを作成するスクリプトの例 (Ruby)
複数のエクスポートタスクを作成しようとしたら失敗したので、1つのタスクが終わるのを待って、次のタスクを作成しています。
require 'time'
require 'json'
S3_BUCKET_NAME = 'XXXXX'
LOG_GROUP_NAME = '/aws/rds/cluster/xxxxxxx-cluster/audit'
if ARGV[0].nil?
puts "Usage: ruby export_cloud_watch_logs_1month.rb YYYY-mm-dd"
exit
end
start_date = Time.parse(ARGV[0] + ' 00:00:00 UTC')
def export_command(date)
start_time = date
end_time = start_time + 60 * 60 * 24 # 1日分
<<~COMMAND
aws logs create-export-task \\
--task-name "export-#{start_time.strftime('%Y-%m-%d')}" \\
--log-group-name "#{LOG_GROUP_NAME}" \\
--from #{start_time.to_i}000 \\
--to #{end_time.to_i}000 \\
--destination "#{S3_BUCKET_NAME}" \\
--destination-prefix "#{start_time.strftime('%Y/%m/%d')}"
COMMAND
end
def check_export_task_command(task_id)
<<~COMMAND
aws logs describe-export-tasks --task-id #{task_id}
COMMAND
end
date = start_date
while start_date.month == date.month
puts "create export task for #{date.strftime('%Y-%m-%d')}"
result = `#{export_command(date)}`
task_id = JSON.parse(result)['taskId']
puts "export task created: #{task_id}"
puts "check export task status for #{task_id}"
loop do
result = `#{check_export_task_command(task_id)}`
status = JSON.parse(result)['exportTasks'][0]['status']['code']
print "\rstatus: #{status}"
break if status == 'COMPLETED'
sleep 60
end
puts "\nexport task completed: #{task_id}"
date = date + 60 * 60 * 24
end
実行中は以下のように表示されます。
λ ruby export_cloud_watch_logs_1month.rb 2024-09-01
create export task for 2024-09-01
export task created: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
check export task status for xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
status: COMPLETED
export task completed: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
create export task for 2024-09-02
export task created: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
check export task status for xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
status: RUNNING
Athena でテーブルを作成する
CloudWatch から S3 にログをエクスポートしたら、あとは Athena で テーブルを作成するだけです。
エクスポートされたファイルをみてみると、以下の形式になっています。
2024-11-09T02:34:33.980Z 1731119673980283,xxxxx-cluster,...
2024-11-09T02:34:33.980Z 1731119673980283,xxxxx-cluster,...
元のCloudWatch Logs のロググループだと、 Timestamp カラムと Message カラムが存在し、それがスペース区切りで出力されていそうです。
- Timestamp カラム:
2024-11-09T02:34:33.980Z
- Message カラム:
1731119673980283,xxxxx-cluster,...
Athena でテーブルを作成する方法はいくつかありますが、今回のようなデータの場合、2024-11-09T02:34:33.980Z 1731119673980283
までを 1つ目のカラムとして扱い、CSVとして処理すると楽だと思います。OpenCSVSerde
で S3 のログを CSV として扱い、 Athena のテーブルを作成するクエリは以下です。
(テーブル名やS3バケット名は適宜変更してください。)
CREATE EXTERNAL TABLE `xxxxx_cluster_audit_log_from_cloudwatch_logs` (
`timestamp` string,
`serverhost` string,
`username` string,
`host` string,
`connectionid` string,
`queryid` string,
`operation` string,
`database` string,
`object` string,
`retcode` string
)
PARTITIONED BY (`date` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'escapeChar' = '\\',
'quoteChar' = '\'',
'separatorChar' = ','
)
LOCATION 's3://xxxxx-audit-log-import-from-cloudwatch-logs/'
TBLPROPERTIES (
'projection.enabled' = 'true',
'projection.date.type' = 'date',
'projection.date.format' = 'yyyy/MM/dd',
'storage.location.template' = 's3://xxxxx-audit-log-import-from-cloudwatch-logs/${date}/',
'projection.date.range' = '2021/12/01,NOW',
'projection.date.interval'='1',
'projection.date.interval.unit'='DAYS'
);
また、timestamp カラムに 2024-11-09T02:34:33.980Z 1731119673980283
が入っているので、SELECTの際に工夫が必要です。1731119673980283
の部分を bigint として扱いたい場合は SELECT のときに cast(element_at(split(timestamp, ' '), -1) as bigint)
のように書く必要があります。
もう1点、Athena で SELECT
する際に注意が必要です。
CloudWatch Logs から S3 にエクスポートすると、 エクスポートのたびに S3 に aws-logs-write-test
という名前のファイルが作成されます。中身は Permission Check Successful
という文字列が入っているので、これも除外する必要があります。
以上を考慮すると、 Athena でSELECT 時にログの timestamp を取得するためには、以下のようにクエリを書く必要があります。
SELECT from_unixtime(cast(element_at(split(timestamp, ' '), -1) as bigint) / 1000000)
FROM qiita_cluster_from_cloudwatch_logs
WHERE date = '2021/12/01'
AND timestamp != 'Permission Check Successful'
LIMIT 100
無事に timestamp が取得できていそうです。
(OpenCSVSerde
で無理やりCSVとして扱っているためこのような考慮が必要ですが、RegexSerDe
などで工夫してテーブルを作成すれば、不要になるかもしれないです。)
まとめ
- Aurora MySQL の監査ログ を CloudWatch Logs から S3 にエクスポートすれば Athena から解析できる
- Athena で解析できるようにするため、エクスポートやテーブル作成でいくつか工夫する必要がある
また、今回は CloudWatch Logs 経由でログを S3 に保存しましたが、Aurora のインスタンスから S3 に直接ログを転送することも可能です。詳細は下記の記事に記載してあります。そちらもぜひ参考にしてみてください。