背景
「監査ログのみをS3に抽出する仕組み」を実装するに至った背景を時系列で記載します。
- RDSはデフォルト機能でCloudwatchLogsへの監査ログエクスポートができるため、最初はCloudwatchLogs内に保存された監査ログをS3に転送する方式を検討
- AWSは「CloudwatchLogsに対してログを転送する量」に対して課金が発生するため、監査ログをCloudwatchLogsに転送すると料金が高騰する
- であれば、CloudwatchLogsに監査ログを保存せず、直接S3に転送し保管する仕組みを実装したい
このような経緯があり、RDSから直接監査ログのみを抽出する仕組みを作成するに至りました。
やりたかったこと
主な要件は以下の3つです。
- Pythonを用いてRDSから直接、前日分の監査ログのみを取得したい
- 取得対象のRDSインスタンスは複数で、それぞれプレフィックスを切ってS3に保存したい
- RDSインスタンスの名称変更にも柔軟に対応したい
3つ目に関しては、「describe_db_instances()」を利用してRDSのDB識別子を取得するスクリプトを書くことで対応します。
若干脱線
AWSのドキュメントを見ると、RDSからログを一発取得できるコマンドがあります。
! aws rds download-db-log-file-portion --db-instance-identifier [RDS DB識別子] --starting-token 0 --output text --log-file-name [対象ログファイル] > [保存時の名称]
こちらを実行すると、RDSインスタンスから取得したいログファイルをダウンロードすることができるのですが、インスタンス名やログ名を手打ちで書かなければならず、また複数インスタンス分処理を書かなければならなそうだったので、今回はboto3(AWS SDK for Python)を利用します。
テスト実行
テストは、Amazon SageMakerのノートブックインスタンス上で行いました。
import boto3
import gzip
import traceback
from datetime import datetime, timedelta
RDS = boto3.client('rds', region_name={{ AWSのリージョン }}) #RDSインスタンス
S3_RESOURCE = boto3.resource('s3') # S3リソース
DIST_BUCKET = {{ 保存先バケット名 }} # ログ保存先S3バケット名
def get_db_instance_identifier(tag):
target_instance = []
for instance in RDS.describe_db_instances()['DBInstances']:
if tag in instance['DBInstanceIdentifier']:
target_instance.append(instance['DBInstanceIdentifier'])
return target_instance
def get_db_log_files(log_file_name):
log_data = ""
paginator = RDS.get_paginator("download_db_log_file_portion")
page_iterator = paginator.paginate(
DBInstanceIdentifier=db_instance_identifier,
LogFileName=log_file_name
)
for page in page_iterator:
if page["LogFileData"] is not None:
log_data += page["LogFileData"]
return log_data
if __name__ == '__main__':
try:
yesterday = datetime.now() - timedelta(1) #前日分のログ取得
target_instances = get_db_instance_identifier({{ 環境識別子 }})
print("取得するRDSインスタンス : ", target_instances)
for db_instance_identifier in target_instances:
print("target_rds : ", db_instance_identifier)
paginator = RDS.get_paginator("describe_db_log_files")
page_iterator = paginator.paginate(DBInstanceIdentifier=db_instance_identifier)
for page in page_iterator:
for log in page["DescribeDBLogFiles"]:
try:
if {{ 前日の監査ログのみを取得できる条件 }}: # 前日の監査ログのみ取得
log_file_name = log['LogFileName'].split("/")[1]+".txt.gz"
log_data = get_db_log_files(log["LogFileName"])
path = {{ バケット以降の保存先プレフィックス }} + db_instance_identifier
with gzip.open(log_file_name, mode='wt') as fp:
fp.write(log_data)
fp.close()
s3_source_bucket = S3_RESOURCE.Bucket(DIST_BUCKET)
s3_source_bucket.upload_file(log_file_name, path+'/'+log_file_name)
else:
pass
except:
print(log, "will not upload")
pass
print("Done ", db_instance_identifier)
except:
print(traceback.format_exc())
raise NameError('Program error')
処理の説明
get_db_instance_identifier(tag)
複数あるRDS/データベース/ の中から、取得したいDB識別子のリストを返す関数です。
引数のtagには、stringで部分一致をかけたい文字列を与えます。
get_db_log_files(log_file_name)
ログファイルの中身を抽出して、一つにまとめて返す関数です。
boto3の仕様上、S3へは追記をすることができないので、ログの中身をログ名ごとにまとめてからアップロードする必要があります。
引数には、ログの名前を与えます。
if name == 'main'
今回のGlue Jobが呼び出された時に実行される部分です。
target_instancesには、取得したいRDSのDB識別子を検出する文字列を与えます。
スクリプト内では、RDSごとに保存先のプレフィックスを切って保存していく形式をとっています。また保存形式ですが、txt形式で書き込んだファイルをgzip圧縮してS3に貯めるようにしています。
定期実行に関して
このPythonスクリプトを定期実行させたい場合は、GlueまたはLambdaで実行可能です。
特に外部ライブラリを使用しているわけではないので、Glueを利用する場合は、スクリプトをそのまま貼り付ければ動くと思います。
前日分のログを取得する処理を組んだので、トリガーを設定する際は、
毎日の定期実行で、日付が変わって少し経ってからのタイミングを設定すれば良いと思います!
その際は適切な権限を入れてあげてください。
補足
"error"のような識別子を、名称の部分一致で、下記の
if {{ 前日の監査ログのみを取得できる条件 }}: # 前日の監査ログのみ取得
の箇所に与えれば、監査ログだけでなくエラーログ等も分けて取得することができます。
最後に
今回の記事が少しでもみなさんのお役に立てていれば幸いです。