背景
弊社ではRailsを使ったWebアプリケーションサービスを展開しており、複数のEC2インスタンスに対してRailsを導入しております。
複数のEC2インスタンス上で動かしているRailsのログは個々のサーバ上に生成される仕組みになっており、ログを参照する場合は該当のサーバへログインして直接ログを見るという方法をとっていました。
しかし、段々とサービスの規模が大きくなり、複数のサーバ上で生成されるログの比較も困難であることから、ログ検索の改善を目的としたログ集約のインフラ構築を行うことにしました。
使用ツール
- Fluentd https://www.fluentd.org/
- AWS Athena https://aws.amazon.com/jp/athena/features/
- AWS Kinesis https://aws.amazon.com/jp/kinesis/
各種ログを集約し、ログ検索に使用したツールは上記の通りです。
Fluentdはオープンソースソフトウェアのログ集約ツールです。公式ドキュメントの充実ぶりも素晴らしく、AWS専用のプラグインもあります。
AWS Athenaはログ分析ツールであり、GCPで言うところのBig Queryに位置付けられます。
AWS Kinesisはストリーミングデータをリアルタイム収集してくれるサービスで、Kinesisの中で今回はKinesis Data Firehoseを使用しました。
集約対象ログ
- Railsログ
- メールログ
- Apacheログ
- SlowQueryログ
メールログはメールサーバで使用しているpostfixログを指しております。
SlowQueryログに関しましては、AWS RDSを使用しておりAWS側の設定でCloudWatch Logsで収集できるようにしています。
ログ集約基盤イメージ図
ログ集約の基盤構成イメージは上記の通りとなっております。各種サーバにFluentdを導入してS3へ格納後、S3からAthenaでログ検索ができるようにGlueを使ってテーブル定義します。
SlowQueryログに関しましては既にCloudWatch Logsに収集されており、SlowQueryログがCloudWatch Logsに格納されたタイミングでKinesisで設定したLambdaが動き、S3へ配信するように設定しています。
それでは、具体的な実装手順について記載いたします。
基盤構築手順
①Fluentdの導入
各種サーバにFluentdを入れます。Fluetndのインストール手順などは公式リファレンス参照で割愛しますが、インストール後の初期設定手順は公式リファレンスにはない情報なので記しておきます。
インストールが完了しましたら、ログをS3に送るようにするのですが実行ユーザであるtd-agent
では権限エラーでログが送れないので、実行ユーザをroot
ユーザに変更します。(fluentdのバージョンによって設定先が異なります)
実行ユーザ変更
※V3
-- TD_AGENT_USER=td-agent #削除
-- TD_AGENT_GROUP=td-agent #削除
++ TD_AGENT_USER=root #追加
++ TD_AGENT_GROUP=root #追加
※V4
-- User=td-agent #削除
-- Group=td-agent #削除
++ User=root #追加
++ Group=root #追加
次にプラグインfluent-plugin-multi-format-parser
を導入します。
こちらのプラグインですが、1つのログファイルの中に複数のフォーマットが混在しているとき、便利なプラグインです。format
でmulti_formatと指定して配下にpattern
でフォーマット毎にパターンを記載すればログをきれいに整形してくれます。
プラグイン導入
/opt/td-agent/embedded/bin/fluent-gem install fluent-plugin-multi-format-parser
$ /opt/td-agent/embedded/bin/fluent-gem list
*** LOCAL GEMS ***
中略
fluent-plugin-multi-format-parser (1.0.0)
中略
②ログ集約設定
設定ファイル/etc/td-agent/td-agent.conf
内に対象のログを集約し、S3へ送るように設定を行います。
- Webサーバ
#Rails Log
<source>
@type tail
@id input_rails_tail
<parse>
@type multi_format
# fluent-plugin-multi-format-parserで複数のフォーマットを拾えるようにしている
<pattern>
format multiline
format_firstline /^.,/
format1 /^(?<log_initial>.*), \[(?<date>\d{1,4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,6}) (?<process_id>[^ ]*)\] *(?<log_level>[^ ]*) -- : (?<message>[^\']*[^\]]*)/
keep_time_key true
</pattern>
<pattern>
format multiline
format_firstline /^.,/
format1 /^(?<log_initial>.*), \[(?<date>\d{1,4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,6}) (?<process_id>[^ ]*)\] *(?<log_level>[^ ]*) -- : (?<message>.*)/
keep_time_key true
</pattern>
<pattern>
format none
</pattern>
</parse>
path Railsのログが配置しているパスを指定する
tag td.rails.production.log
pos_file /var/log/td-agent/rails_log.pos
</source>
# Apache access_logs
<source>
@type tail
@id input_tail
<parse>
@type multi_format
<pattern>
format regexp
expression /^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$/
time_format %d/%b/%Y:%H:%M:%S %z
keep_time_key true
</pattern>
<pattern>
format none
</pattern>
</parse>
path /var/log/httpd/access_log
tag td.apache.access
pos_file /var/log/td-agent/apache_access_log.pos
</source>
#Apache error_logs
<source>
@type tail
@id input_error_tail
<parse>
@type multi_format
<pattern>
format regexp
expression /^\[[^ ]* (?<time>[^\]]*)\] \[(?<level>[^\]]*)\] (?<message>.*)$/
keep_time_key true
</pattern>
<pattern>
format regexp
expression /^\[ (?<log_initial>.*) (?<date>\d{1,4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{1,4}) (?<status>\d{1,3}/T[a-z,A-Z,0-9]) (?<path>[^ ]*) ]: (?<message>.*)/
keep_time_key true
</pattern>
<pattern>
format none
</pattern>
</parse>
path /var/log/httpd/error_log
tag td.apache.error
pos_file /var/log/td-agent/apache_error_log.pos
</source>
#Get hostname record and add raw data
<filter td.apache.access**>
@type record_transformer
#ログ整形されていない生データのログも追記する
<record>
raw_data ${record["host"]} ${record["user"]} ${record["time"]} ${record["method"]} ${record["path"]} ${record["code"]} ${record["size"]} ${record["referer"]} ${record["agent"]}
hostname "#{Socket.gethostname}"
sourcelog ${tag_parts[2]}
</record>
</filter>
<filter td.apache.error**>
@type record_transformer
#ログ整形されていない生データのログも追記する
<record>
raw_data ${record["time"]} ${record["level"]} ${record["message"]}
hostname "#{Socket.gethostname}"
sourcelog ${tag_parts[2]}
</record>
</filter>
<filter td.rails.**>
@type record_transformer
#ログ整形されていない生データのログも追記する
<record>
raw_data ${record["log_initial"]} ${record["date"]} ${record["process_id"]} ${record["log_level"]} ${record["message"]}
hostname "#{Socket.gethostname}"
</record>
</filter>
#Output to S3
<match td.**>
@type s3
s3_bucket 集約先のS3バケット名
s3_region S3のあるリージョン名
s3_object_key_format "%{path}year=%Y/month=%m/day=%d/#{Socket.gethostname}_%{hms_slice}.%{file_extension}"
time_slice_format %Y%m%d
path logaggregator/${tag[1]}/tag=${tag[2]}/
check_object false
<format>
@type json
</format>
<inject>
time_key fluentd_time
time_type unixtime
</inject>
<buffer tag,time>
@type file
path /tmp/fluent/s3/error
time_slice_wait 10m
timekey 10m
timekey_wait 0
chunk_limit_size 512m
</buffer>
</match>
Webサーバで上記のようにRailsログとApacheログを集約し、S3へ送信するように設定しています。(弊社のサイトはSSL通信対応なので実際にはssl_access_log
も設定していますが、大体同じような設定のため省略しています)
ここでのポイントですが、fluentdにはApacheのログを集約してくれる専用のプラグイン、apache2
とapache_error
があるのですが、あえて使わずregexp
で自分の手で正規表現を設定して集約しているところです。
これには理由がありまして、途中のfilter
ディレクティブで集約したログに追加でサーバホストネームとログ整形されていない生の状態のログも一緒に参照できるようにしたいという要望をかなえるためにログの並び方をカスタマイズする必要があり、自由に調整できるregexp
を使いました。
メールサーバも同じように設定します。
- メールサーバ
#postfix_log
<source>
@type tail
@id input_postfix_tail
<parse>
@type multi_format
<pattern>
format regexp
expression /^(?<time>[^ ]* [^ ]* [^ ]*) (?<host>[^ ]+) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?[^\:]*\: (?<key>[^:]+): ?((to|from)=(<(?<address>[^>]+)>)?)?,( ?(orig_to=<(?<orig_to>[^>]+)>),)? ?(relay=(?<relay>[^ ]+)), ?(delay=(?<delay>[^ ]+)), ?(delays=(?<delays>[^ ]+)), ?(dsn=(?<dsn>[^ ]+)), ?(status=(?<status>[^,]+))/
time_format %b %d %H:%M:%S
keep_time_key true
</pattern>
<pattern>
format regexp
expression /^(?<time>[^ ]* [^ ]* [^ ]*) (?<host>[^ ]+) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?[^\:]*\: (?<key>[^:]+): ?((to|from)=(<(?<address>[^>]+)>)?)?,( ?(orig_to=<(?<orig_to>[^>]+)>),)? ?(relay=(?<relay>[^ ]+)), ?(delay=(?<delay>[^ ]+)), ?(delays=(?<delays>[^ ]+)), ?(dsn=(?<dsn>[^ ]+)), ?(status=(?<status>[^,]+))/
time_format %b %d %H:%M:%S
keep_time_key true
</pattern>
<pattern>
format regexp
expression /^(?<time>[^ ]* [^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?[^\:]*\: *(?<message>.*)$/
time_format %b %d %H:%M:%S
keep_time_key true
</pattern>
<pattern>
format regexp
expression /^(?<time>[^ ]* [^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?[^\:]*\: *(?<message>.*)$/
time_format %b %d %H:%M:%S
keep_time_key true
</pattern>
<pattern>
format none
</pattern>
</parse>
path /var/log/maillog
tag td.postfix
pos_file /var/log/td-agent/postfix.pos
</source>
#Get hostname record and add raw data
<filter td.postfix>
@type record_transformer
<record>
raw_data ${record["time"]} ${record["host"]} ${record["ident"]} ${record["pid"]} ${record["key"]} ${record["address"]} ${record["relay"]} ${record["delay"]} ${record["delays"]} ${record["dsn"]} ${record["status"]} ${record["message"]}
hostname "#{Socket.gethostname}"
</record>
</filter>
# Output to S3
<match td.**>
@type s3
s3_bucket 集約先のS3バケット名
s3_region S3のあるリージョン名
s3_object_key_format "%{path}year=%Y/month=%m/day=%d/#{Socket.gethostname}_%{hms_slice}.%{file_extension}"
time_slice_format %Y%m%d
path logaggregator/${tag[1]}/
check_object false
<format>
@type json
</format>
<inject>
time_key fluentd_time
time_type unixtime
</inject>
<buffer tag,time>
@type file
path /tmp/fluent/s3/error
time_slice_wait 10m
timekey 10m
timekey_wait 0
chunk_limit_size 256m
</buffer>
</match>
postfix.logのログもファイル内でいくつかのログフォーマットが混在しているためfluent-plugin-multi-format-parser
を使って複数の分岐で拾えるようにしています。
またメールサーバも同様に生データのログがほしいという要望がありましたので、filter
ディレクティブに生データのログを追加する設定を行っております。
これでFluentdを再起動すれば、ログがS3へ送られます。
③AWS Glueによるクローリング
S3に格納されたログをAWS Athenaで分析するようにするためにS3内のデータをテーブルとして定義する必要があります。Glueはクローラ機能を使ってデータのテーブル定義ができ、Athenaでのログ分析を可能にしてくれるETLサービス(抽出、変換、ロード)です。公式リファレンス
Glueのコンソール画面から新規にクローラの追加を選択します。
※AWSあるあるですが、クローラの新規設定の画面UIが最近変更されましたので混乱をきたさないために画面証跡は省略しました。デフォルトのままで問題ありませんが、自分の環境に合わせて適宜変更してください。
AWS Glueのクロールの課金体系ですが、1DPU(Data Processing Unit)あたり0.44$/時となっていますが、1回のクロールで2DPU動くので実質0.88$/時となっています。さらに10分未満の時間は10分間の使用とみなされるので、最低利用料金は約0.15$となります。(https://aws.amazon.com/jp/glue/pricing/)
クローラの追加が完了し、手動実行を行えばS3に格納されているログデータを分析しAthenaにテーブルが作成されます。
いくつかのカラムにパーティション化と書かれていますが、S3のバケット名を以下のようにカラム名=/
とすれば、パーティション化されたカラムを定義することが可能となります。(HIVE形式といわれているそうです)
パーティション化することのメリットですが、検索範囲を絞ることができ、検索時間と検索量が抑えられるというメリットがあります。
SlowQueryログの転送
次にSlowQueryログの集約構築です。
まずはCloudWatch LogsからS3へ配信されるストリームを作成します。
作成過程においてLambdaによるデータ変換の有無を問われますが、ここでCloudWatch Logsのデータ形式をjson形式にしてAthenaで読み込みやすいように変換します。
import base64
import gzip
import io
import json
def lambda_handler(event, context):
records = [process_record(r) for r in event['records']]
return {'records': records }
def process_record(record):
record_id = record['recordId']
data = base64.b64decode(record['data'])
iodata = io.BytesIO(data)
with gzip.GzipFile(fileobj=iodata, mode='r') as f:
data = json.loads(f.read())
processed_data = process_data(data) + '\n'
return {
'data': base64.b64encode(processed_data.encode('utf-8')).decode('utf-8'),
'result': 'Ok',
'recordId': record_id
}
def process_data(data):
return '\n'.join([format_log_event(json) for json in data['logEvents']])
def format_log_event(j):
return json.dumps({'timestamp': j['timestamp'], 'message': j['message']})
Lambda関数の設定を行ったら配信ストリームの作成に戻ります。余談ですが、格納先のS3のプレフィックス名をHIVE形式にしておけば同じようにカラムがパーティション化されます。
これでKinesis Data Firehoseの設定は完了です。あとは、CloudWatch LogsにSlowQueryログが収集されるとKinesisが動き、S3へログを格納してくれます。
S3に格納されたログを同様にGlueでクロールしてAthenaでログ検索すれば万々歳でした。
ところがここで一つ問題が出てきました。
SlowQueryの検索目的としてクエリ時間にどれだけ時間がかかったのかクエリ時間単位で検索したいのですが、Athenaでテーブル定義を行うと、timestamp
とmessage
が定義づけられ、クエリ時間単独でカラムを作ることができませんでした。
イメージとしては、上記のslowqueryテーブルに一個query_time
のようなカラムを作り、select query_time from slowquery;
としたかったのですが残念ながら上記のLambdaでどのように修正すればいいのかわかりませんでした。
その代替案としてawslogsを使ったCLIによるログ検索を導入しました。(https://github.com/jorgebastida/awslogs)
$ awslogs get <SlowQueryログ> --start='MM/DD/YYYY' --end=’MM/DD/YYYY’
<SlowQueryログ> <リージョン> # Time: 180606 15:00:02
# User@Host: rdsadmin[rdsadmin] @ localhost [] Id: 18
# Query_time: 0.507831 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1528297202;
FLUSH SLOW LOGS ;
所感
これで一通りはログ集約基盤の構築についてご紹介いたしました。
理想としては、SlowQueryログもAthenaで検索でき、かつクエリ時間指定での検索ができるようになればいいのですが、私の考えでは改善策が浮かばないのでわかる方がいらっしゃいましたら、コメント欄にてご助言のほどよろしくお願いいたします。