1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Logstash on DockerでCSVデータ前処理ツールを作る

Last updated at Posted at 2021-06-22

業務でLogstashを用いてCSV形式のログデータを加工するツールを作りました。これが予想外に大苦戦。。。「Pandasだと一瞬で終わりそうなのに。。。」という考えが何度も頭をよぎりましたが、ここは諸事情でLogstashオンリーという制約下で格闘。。。 :japanese_ogre::japanese_ogre::japanese_ogre: また同様の業務をやる時のために、備忘録を残しておこうと思います。:dancer::dancer_tone1::dancer_tone2::dancer_tone3::dancer_tone4::dancer_tone5:

データの形式

今回は以下の様な入力データをLogstashで読み込み、各種filter処理を実行し、出力イメージとして示している様な形式のデータを出力します。

入力データ
a,b,c,d,e: あいうえお: 日本語: カタカナ
出力イメージ
{"test1":"a","test2":"b","test3":"c","test4":"d","tag1":"e", "tag2":"あいうえお","tag3":"日本語","tag4":"カタカナ"}

Logstash on Dockerの準備

今回はLogstashのフィルター機能などを研究しながら作業を進める必要が有ったため、環境構築ややり直しを容易に実行出来るDockerのコンテナを利用することにしました。LogstashのコンテナイメージをコンテナとしてDocker Composeで起動し、所望のログ編集処理を実行します。環境構築には以下の2つのファイルを用意しまし、以下のディレクトリ構成の様に格納しました。

ディレクトリ構成
logstashtest/
|-- docker
|   |-- config
|   |   `-- logstash.conf
|   `-- docker-compose.yml
`-- logfile
    |-- input_logs.csv
    `-- output_logs.csv

3 directories, 4 files

入力データと出力データを格納するディレクトリ(logfile)と、Logstashの内部処理を定義するlogstash.confファイルををvolumes項でコンテナ外のファイルと紐づけています。

docker-compose.yml
version: "2.4"
services:
    logstash:
        image: docker.elastic.co/logstash/logstash:7.13.2
        volumes:
            - ../logfile:/usr/share/logstash/logfiles
            - ./config/logstash.conf:/usr/share/logstash/pipeline/logstash.conf

データの読み込み→加工→出力の順に処理を記載します。

logstash.conf
input {
    file {
        path => ["/usr/share/logstash/logfiles/input_logs.csv"]
        start_position => "beginning"
        sincedb_path => "/usr/share/logstash/mysincedb"
        codec => plain {
            charset => "SJIS"
        }
    }
}
filter {
  # Add column names
  csv {
      columns => [
          "test1",
          "test2",
          "test3",
          "test4",
          "test5"
      ]
  }
  # Create some new tags
  grok {
      match => {
          "test5" => "%{DATA:tag1}: %{DATA:tag2}: %{DATA:tag3}: %{GREEDYDATA:tag4}"
      }
  }
  # Remove unnecessary field names
  mutate {
      remove_field => [
          "@version",
          "host",
          "path",
          "@timestamp",
          "message",
          "tags",
          "test5"
      ]
  }
}
output {
    if ([test1])  {
        file {
            path => ["/usr/share/logstash/logfiles/output_logs.csv"]
        }
    }
}

読み込み部(input)

start_position => "beginning"でファイルの先頭から読み込みます。sincedb_pathに任意のファイルパスを指定することで、どこまでファイルを読み込んだかをバイト単位で記録してくれるsicedbにすぐにアクセス出来ます。codec項では文字コードを指定できます。

処理部(filter)

3種類のフィルターを用います。csvフィルターで読み込んだデータにカラム名を追加します。このカラム名を用いて、以降の行程を実行します。続けて、grokフィルターで"test5"項で正規パターンに一致する部分の分割方法を定義します。ここではDATA(任意の文字の繰り返し)形式の項目が: 区切りになっている部分を分割し、定義したカラム名tag〇を追加します。最後にmutateフィルターで出力データには不要なカラム名を削除します。

出力部(output)

if文の使い方の備忘録として条件分岐を記載していますが、この分岐は無くても動作します。この書き方でtest1項が存在する場合、パスで指定したファイルに加工されたデータが書き込まれます。

実行結果

実行結果は以下の様になりました。辞書形式なので並びはランダムになっていますが、イメージ通りの結果が得られました。

{"test4":"d","tag1":"e","tag2":"あいうえお","test3":"c","test1":"a","tag3":"日本語","tag4":"カタカナ","test2":"b"}

Reference

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?