集約データを転送、分析、可視化するためのシステムを作るために使われるソフトウェアについて、アーキテクチャーを調べたり触ってみたりした上で分かったことのメモ書き。
Fluentd
一言でいえば、ログ集約用のソフトウェア、ログのフォーマットもJSONやCSV等幅広く選択可能だったり、ログ転送先もS3やその他対象が広く、例えば各種ログを集約してデータレイクにするなどの使い方が可能なのでは?と思う。ちなみにFluentd自体はrubyのgemだが、それをLinuxサーバで使いやすくするためにrpm化したものがtd-agentである。簡単に、構築の仕方を記載しておく(我流)。
###①td-agentのマスターとなるconf。基本的なログレベルや、ログ種別ごとのconfの在りかを指定しておく。`
<system>
log_level error
</system>
#外部ファイル読み込み
@include config.d/*.conf
###②3パターン程度各ログ種別の特性にあったconfの書き方をガイド
#Nginx
#Accesslog Source
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/td-agent/pos/nginx.accesslog.pos
tag nginx.accesslog
<parse>
@type json
keep_time_key true
time_format %d/%b/%Y:%H:%M:%S %z
</parse>
</source>
#Errorlog Source
<source>
@type tail
path /var/log/nginx/error.log
pos_file /var/log/td-agent/pos/nginx.errorlog.pos
tag nginx.errorlog
<parse>
@type regexp
expression /^(?<time>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<log_level>\w+)\] (?<pid>\d+).(?<tid>\d+): (?<message>.*)$/
keep_time_key true
time_format %Y/%m/%d %H:%M:%S
</parse>
</source>
#Add Key:Value
<filter nginx.*>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
</record>
</filter>
#Accesslog Destination
<match nginx.accesslog>
@type s3
s3_bucket test-log-env
s3_region ap-northeast-1
s3_object_key_format "%{path}/year=%Y/month=%m/day=%d/#{Socket.gethostname}/${tag}.#{Socket.gethostname}.%{time_slice}.%{index}.%{file_extension}"
path fluentd/${tag[0]}/${tag[1]}
store_as gzip
time_slice_format %Y%m%d%H%M%S
output_tag false
output_time false
<buffer tag,time>
@type file
path /var/log/td-agent/buffer/nginx.accesslog/s3
timekey 60
timekey_wait 60
</buffer>
</match>
#Errorlog Destination
<match nginx.errorlog>
@type s3
s3_bucket test-log-env
s3_region ap-northeast-1
s3_object_key_format "%{path}/year=%Y/month=%m/day=%d/#{Socket.gethostname}/${tag}.#{Socket.gethostname}.%{time_slice}.%{index}.%{file_extension}"
path fluentd/${tag[0]}/${tag[1]}
store_as gzip
time_slice_format %Y%m%d%H%M%S
output_tag false
output_time false
<buffer tag,time>
@type file
path /var/log/td-agent/buffer/nginx.errorlog/s3
timekey 60
timekey_wait 60
</buffer>
</match>
#Java
#TEST Application Log
<source>
@type tail
path /opt/java/logs/application*.log
pos_file /var/log/td-agent/pos/java-test.applicationlog.pos
tag java-test.applicationlog
<parse>
@type multiline
format_firstline /^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}.\d{3}/
format1 /^(?<time>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}.\d{3})\s+(?<thread>.+)\s+(?<level>.)\s+(?<logger>.+)#(?<method>\S+)\s+:(?<line>\d{1}*)\s+(?<msg>.+)$/
keep_time_key true
time_format %Y/%m/%d %H:%M:%S.%L
</parse>
</source>
#Add Key:Value
<filter java-test.*>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
</record>
</filter>
#TEST Application Log
<match java-test.applicationlog>
@type s3
s3_bucket test-log-env
s3_region ap-northeast-1
s3_object_key_format "%{path}/year=%Y/month=%m/day=%d/#{Socket.gethostname}/${tag}.#{Socket.gethostname}.%{time_slice}.%{index}.%{file_extension}"
path fluentd/${tag[0]}/${tag[1]}
store_as gzip
time_slice_format %Y%m%d%H%M%S
output_tag false
output_time false
<buffer tag,time>
@type file
path /var/log/td-agent/buffer/java-test.applicationlog/s3
timekey 60
timekey_wait 60
</buffer>
</match>
#Aurora(PostgreSQL)
#PostgreSQL errorログ
#########RDS Master
<source>
@type rds_pgsql_log
region ap-northeast-1
db_instance_identifier test-env-rds01
tag aurora.postgresqllog.test-env-rds01
pos_file /var/log/td-agent/pos/aurora.postgresqllog.rds01.pos
</source>
<filter aurora.postgresqllog.test-env-rds01>
@type record_transformer
<record>
db_identifer test-env-rds01
</record>
</filter>
<match aurora.postgresqllog.test-env-rds01>
@type copy
<store>
@type exec
command cat $1 | logger -p local5.debug
format json
<buffer>
@type file
path /var/log/td-agent/buffer/aurora.postgresqllog.rds01/file
flush_interval 60s
</buffer>
</store>
<store>
@type s3
s3_bucket test-log-env
s3_region ap-northeast-1
s3_object_key_format %{path}/year=%Y/month=%m/day=%d/${tag[2]}/${tag}.%{time_slice}.%{index}.%{file_extension}
path fluentd/${tag[0]}/${tag[1]}
store_as gzip
time_slice_format %Y%m%d%H%M%S
output_tag false
output_time false
<buffer tag,time>
@type file
path /var/log/td-agent/buffer/aurora.postgresqllog.rds01/s3
timekey 60
timekey_wait 60
</buffer>
</store>
</match>
###パラメータに関して詳細
`sourceディレクティブ``
ログの入力方法を決める。
デフォルトで使えるものは、標準入力、ファイル、ポート指定のHTTP通信等あり。
in_tailプラグイン(@tail)
in_tailはログを末尾から読み取るプラグイン。syslogやapache access.logのようにファイルに吐き出させるログをfluentdに読み込ませる時に使用するプラグイン。
tag
ログを指定するID。matchディレクティブで指定する「debug.test」とかがタグの一例。
path
fluentdに送り込みたいログファイル名
pos_file
どこまで読み込んだかを記録するファイル
<parse>~</parse>
パースの方式を指定する。syslogやnginx等で事前定義されているものもあるので便利
matchディレクティブ
ここでマッチしたログに、指定した処理を適用する。
デフォルトで使えるものには、標準出力、ファイル、他のfluentサーバーへの転送、
これまでに挙げた方法を組み合わせる、等がある。
@type s3
転送先を定義。この例ではAWSのS3。S3転送できる権限を付けておくことが必要
s3_bucket
AWS S3バケット名を指定。
s3_region
AWS S3の存在するリージョンを指定。
time_slice_format
出力されるファイル名の一部になる。下記は出力時に実際の時刻に変換される
%Y: 年(4桁の数字)
%m: 月 (01..12)
%d: 日付 (01..31)
%H: 時間(00..23)
%M: 分(00..59)
%S: 秒(00..60)
timekey_zone
タイムゾーン指定
format
S3にアップロードするファイルの形式指定。デフォルトはout_fileで、他にjson, ltsv, single_valueがある
<buffer>~</buffer>
buffer_type
fluentdが受け取ったデータをどう保存するか、メモリかファイルが選べる
path
buffer_timeがfileの場合のファイル名
time_key
time_key_wait
includeディレクティブ
複数の設定ファイルを設定することができる。/etc/log/httpd/conf.d/*.confのようなイメージ。
tag(タグ)
ログを指定するID。matchディレクティブで指定する「debug.test」とかがタグの一例。
AWS Athenaでログ分析する。
#nginxログ用
CREATE EXTERNAL TABLE IF NOT EXISTS fluentd.nginx (
`remote` string,
`host` string,
`user` string,
`time` string,
`method` string,
`path` string,
`code` string,
`size` string,
`referer` string,
`agent` string,
`forwarder` string,
`hostname` string
) PARTITIONED BY (
year int,
month int,
day int
)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
WITH serdeproperties ( 'paths'='remote,host,user,time,method,path,code,size,referer, agent,forwarder,hostname')
LOCATION 's3://test12349999/fluentd/nginx/accesslog/';
#Auroraログ用
CREATE EXTERNAL TABLE IF NOT EXISTS fluentd.aurora (
`time` string,
`remote_host` string,
`user_name` string,
`database_name` string,
`process_id` string,
`severity` string,
`message` string,
`hostname` string
) PARTITIONED BY (
year int,
month int,
day int
)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
WITH serdeproperties ( 'paths'='time,remote_host,user_name,database_name,process_id,severity,message,hostname')
LOCATION 's3://test12349999/fluentd/aurora/postgresqllog/';
MSCK REPAIR TABLE access_database.access_table;
ALTER TABLE nginx ADD PARTITION (year=2019,month=09,day=01);
ALTER TABLE nginx DROP PARTITION (year=2019,month=09,day=01);