業務でLogstashを用いてCSV形式のログデータを加工するツールを作りました。これが予想外に大苦戦。。。「Pandasだと一瞬で終わりそうなのに。。。」という考えが何度も頭をよぎりましたが、ここは諸事情でLogstashオンリーという制約下で格闘。。。 また同様の業務をやる時のために、備忘録を残しておこうと思います。
データの形式
今回は以下の様な入力データをLogstashで読み込み、各種filter処理を実行し、出力イメージとして示している様な形式のデータを出力します。
a,b,c,d,e: あいうえお: 日本語: カタカナ
{"test1":"a","test2":"b","test3":"c","test4":"d","tag1":"e", "tag2":"あいうえお","tag3":"日本語","tag4":"カタカナ"}
Logstash on Dockerの準備
今回はLogstashのフィルター機能などを研究しながら作業を進める必要が有ったため、環境構築ややり直しを容易に実行出来るDockerのコンテナを利用することにしました。LogstashのコンテナイメージをコンテナとしてDocker Composeで起動し、所望のログ編集処理を実行します。環境構築には以下の2つのファイルを用意しまし、以下のディレクトリ構成の様に格納しました。
logstashtest/
|-- docker
| |-- config
| | `-- logstash.conf
| `-- docker-compose.yml
`-- logfile
|-- input_logs.csv
`-- output_logs.csv
3 directories, 4 files
入力データと出力データを格納するディレクトリ(logfile
)と、Logstashの内部処理を定義するlogstash.conf
ファイルををvolumes
項でコンテナ外のファイルと紐づけています。
version: "2.4"
services:
logstash:
image: docker.elastic.co/logstash/logstash:7.13.2
volumes:
- ../logfile:/usr/share/logstash/logfiles
- ./config/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
データの読み込み→加工→出力の順に処理を記載します。
input {
file {
path => ["/usr/share/logstash/logfiles/input_logs.csv"]
start_position => "beginning"
sincedb_path => "/usr/share/logstash/mysincedb"
codec => plain {
charset => "SJIS"
}
}
}
filter {
# Add column names
csv {
columns => [
"test1",
"test2",
"test3",
"test4",
"test5"
]
}
# Create some new tags
grok {
match => {
"test5" => "%{DATA:tag1}: %{DATA:tag2}: %{DATA:tag3}: %{GREEDYDATA:tag4}"
}
}
# Remove unnecessary field names
mutate {
remove_field => [
"@version",
"host",
"path",
"@timestamp",
"message",
"tags",
"test5"
]
}
}
output {
if ([test1]) {
file {
path => ["/usr/share/logstash/logfiles/output_logs.csv"]
}
}
}
読み込み部(input)
start_position => "beginning"
でファイルの先頭から読み込みます。sincedb_path
に任意のファイルパスを指定することで、どこまでファイルを読み込んだかをバイト単位で記録してくれるsicedbにすぐにアクセス出来ます。codec
項では文字コードを指定できます。
処理部(filter)
3種類のフィルターを用います。csvフィルターで読み込んだデータにカラム名を追加します。このカラム名を用いて、以降の行程を実行します。続けて、grokフィルターで"test5"項で正規パターンに一致する部分の分割方法を定義します。ここではDATA(任意の文字の繰り返し)形式の項目が:
区切りになっている部分を分割し、定義したカラム名tag〇
を追加します。最後にmutate
フィルターで出力データには不要なカラム名を削除します。
出力部(output)
if文の使い方の備忘録として条件分岐を記載していますが、この分岐は無くても動作します。この書き方でtest1項が存在する場合、パスで指定したファイルに加工されたデータが書き込まれます。
実行結果
実行結果は以下の様になりました。辞書形式なので並びはランダムになっていますが、イメージ通りの結果が得られました。
{"test4":"d","tag1":"e","tag2":"あいうえお","test3":"c","test1":"a","tag3":"日本語","tag4":"カタカナ","test2":"b"}
Reference
- Logstash File input plugin
- Logstash File output plugin
- Logstash Grok filter plugin
- Logstash CSV filter plugin
- Logstash on Docker config
- はじめての Logstash
- Logstash File input: sincedb_path
- Logstashの実践的な説明
- Kibana(Elasticsearch)+Logstashでオタクの出費を可視化してみる
- logstash-pattern @ GitHub
- logstash check if field exists
- logstash条件付き出力をelasticsearch(filebeatホスト名ごとのインデックス) - elasticsearch、logstash、logstash-grok、logstash-configuration
- i-FilterのアクセスログをLogstashで正規化してセキュリティログ分析に活用する方法
- [正規表現] .*?は最短マッチではない