この記事はデータ処理パイプライン「Logstash」の入門チュートリアルです。
Logstash で CSV 形式のログファイルを Elasticsearch へ取り込む方法を解説していきます。
このチュートリアルでは、Elasticsearch  を使用します。
まだ Elasticsearch を使ったことがない方は、別記事「はじめての Elasticsearch」を参照してください。
Logstash とは
Logstashは、Elastic 社が開発しているオープンソースのデータ処理パイプラインです。
あらゆるデータソースを格納庫(スタッシュ)へ取り込むことができます。
動作環境
- Mac OS X 10.14.5
- Java 11.0.2
- Elasticsearch 7.1.0
- Logstash 7.1.0
インストール
あらかじめ、別記事「はじめての Elasticsearch」を参照して、Elasticsearch をインストールおよび起動しておいてください。
続いて、Logstash をインストールします。
https://artifacts.elastic.co/downloads/logstash/logstash-7.1.0.tar.gz
tar -xzf logstash-7.1.0.tar.gz
cd logstash-7.1.0
テストデータを作成
「日時」「ログレベル」「エラーメッセージ」「ユーザーID」のカラムを持つ CSV 形式のログファイルをテストデータとして使用します。
以下の CSV ファイルを適当なフォルダに作成してください。
20190524_log.csv
Date,Level,ErrorMessage,UserId
2019-05-24 10:00:00,INFO,Success.,1
2019-05-24 11:00:00,ERROR,An error has occurred.Please wait a moment and try again.,1
20190525_log.csv
Date,Level,ErrorMessage,UserId
2019-05-25 10:00:00,INFO,Success.,1
2019-05-25 11:00:00,INFO,Success.,1
2019-05-25 12:00:00,ERROR,An error has occurred.Please wait a moment and try again.,2
2019-05-25 13:00:00,INFO,Success.,2
設定ファイルを作成
Logstash の設定ファイルを作成します。
config/logstash.conf
input {
    file {
        mode => "tail"
        path => ["/Users/myuser/work/elasticsearch/log/*_log.csv"]
        sincedb_path => "/Users/myuser/work/elasticsearch/log/sincedb"
        start_position => "beginning"
        codec => plain { 
            charset => "UTF-8"
        }
    }
}
filter {
    csv {
        columns => ["Date", "Level", "ErrorMessage","UserId"]
        convert => {
            "UserId" => "integer"
        }
        skip_header => true
    }
    date {
        match => ["Date", "yyyy-MM-dd HH:mm:ss"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "log"
    }
    stdout {
        codec => rubydebug
    }
}
input の設定
input ブロックでは、データ入力に関する設定を行います。
mode => "tail"
ファイル取り込みモードを指定します。
ファイル取り込みモードには、Tail モードと Read モードの 2 種類があります。
Tail モードでは、ファイルにデータが追加されると、それを検知して追加されたデータを取り込むことができます。
path => ["/Users/myuser/work/elasticsearch/log/*_log.csv"]
取り込むファイルのパスを指定します。
先ほど作成したテストデータのパスを指定してください。
sincedb_path => "/Users/myuser/work/elasticsearch/log/sincedb"
sincedb ファイルのパスを指定します。
sincedb ファイルは、ファイルをどこまで取り込んだのかを記録しておくためのファイルです。
もう一度、同じファイルを取り込みたい場合は、sincedb ファイルを削除してください。
sincedb ファイルを作成したくない場合は、「/dev/null」(Windows の場合は「nul」)を指定します。
start_position => "beginning"
ファイルの取り込み位置を指定します。
「beginning(ファイルの先頭)」または「end(ファイルの末尾)」を指定してください。
codec => plain { 
    charset => "UTF-8"
}
取り込むファイルの文字コードを指定します。
デフォルトでは「UTF-8」となりますので、Windows 環境の場合は「SJIS」に変更してください。
※詳細は File input plugin のリファレンスを参照してください。
filter の設定
filter ブロックでは、データ変換に関する設定を行います。
columns => ["Date", "Level", "ErrorMessage","UserId"]
カラム名を指定します。
convert => {
    "UserId" => "integer"
}
カラムのデータ型を指定します。
「UserId」カラムを数値型に変換するように指定します。
skip_header => true
ヘッダー行がある場合は、読み込みをスキップするように指定します。
date {
    match => ["Date", "yyyy-MM-dd HH:mm:ss"]
}
Elasticsearch で「timestamp」として使用するカラムの日時フォーマットを指定します。
※詳細は Csv filter plugin のリファレンスを参照してください。
output の設定
output ブロックでは、データ出力に関する設定を行います。
elasticsearch {
    hosts => ["localhost:9200"]
    index => "log"
}
出力先の Elasticsearch の URI とインデックス名を指定します。
※詳細は Elasticsearch output plugin のリファレンスを参照してください。
stdout {
    codec => rubydebug
}
Elasticsearch への出力と同時に、コマンドプロンプトにも出力するように指定します。
データ取り込みを実行
Logstash を実行して、CSV 形式のログファイルを Elasticsearch へ取り込みます。
sudo bin/logstash -f config/logstash.conf
成功すれば、取り込まれたデータがコマンドプロンプトに出力されます。
{
            "Date" => "2019-05-24 10:00:00",
        "@version" => "1",
            "path" => "/Users/myuser/work/elasticsearch/log/20190525_log.csv",
    "ErrorMessage" => "Success.",
            "host" => "local",
      "@timestamp" => 2019-05-25T04:00:00.000Z,
           "Level" => "INFO",
          "UserId" => 1,
         "message" => "2019-05-24 10:00:00,INFO,Success.,1"
}
・・・以下省略
また、Elasticsearch でインデックスを検索して確認することもできます。
コマンド
GET /log/_search
実行結果
{
  "took" : 45,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 12,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "log",
        "_type" : "_doc",
        "_id" : "_nCI8WoBy6JJU2RqTkdE",
        "_score" : 1.0,
        "_source" : {
          "Date" : "2019-05-24 10:00:00",
          "ErrorMessage" : "Success.",
          "Level" : "INFO",
          "@timestamp" : "2019-05-24T01:00:00.000Z",
          "@version" : "1",
          "path" : "/Users/myuser/work/elasticsearch/log/20190524_log.csv",
          "UserId" : 1,
          "message" : "2019-05-24 10:00:00,INFO,Success.,1",
          "host" : "local"
        }
      },
・・・以下省略

