71
58

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

はじめての Logstash

Posted at

この記事はデータ処理パイプライン「Logstash」の入門チュートリアルです。

Logstash で CSV 形式のログファイルを Elasticsearch へ取り込む方法を解説していきます。

このチュートリアルでは、Elasticsearch を使用します。
まだ Elasticsearch を使ったことがない方は、別記事「はじめての Elasticsearch」を参照してください。

Logstash とは

Logstashは、Elastic 社が開発しているオープンソースのデータ処理パイプラインです。
あらゆるデータソースを格納庫(スタッシュ)へ取り込むことができます。

basic_logstash_pipeline.png

動作環境

  • Mac OS X 10.14.5
  • Java 11.0.2
  • Elasticsearch 7.1.0
  • Logstash 7.1.0

インストール

あらかじめ、別記事「はじめての Elasticsearch」を参照して、Elasticsearch をインストールおよび起動しておいてください。

続いて、Logstash をインストールします。

https://artifacts.elastic.co/downloads/logstash/logstash-7.1.0.tar.gz
tar -xzf logstash-7.1.0.tar.gz
cd logstash-7.1.0

テストデータを作成

「日時」「ログレベル」「エラーメッセージ」「ユーザーID」のカラムを持つ CSV 形式のログファイルをテストデータとして使用します。

以下の CSV ファイルを適当なフォルダに作成してください。

20190524_log.csv

Date,Level,ErrorMessage,UserId
2019-05-24 10:00:00,INFO,Success.,1
2019-05-24 11:00:00,ERROR,An error has occurred.Please wait a moment and try again.,1

20190525_log.csv

Date,Level,ErrorMessage,UserId
2019-05-25 10:00:00,INFO,Success.,1
2019-05-25 11:00:00,INFO,Success.,1
2019-05-25 12:00:00,ERROR,An error has occurred.Please wait a moment and try again.,2
2019-05-25 13:00:00,INFO,Success.,2

設定ファイルを作成

Logstash の設定ファイルを作成します。

config/logstash.conf

input {
    file {
        mode => "tail"
        path => ["/Users/myuser/work/elasticsearch/log/*_log.csv"]
        sincedb_path => "/Users/myuser/work/elasticsearch/log/sincedb"
        start_position => "beginning"
        codec => plain { 
            charset => "UTF-8"
        }
    }
}

filter {
    csv {
        columns => ["Date", "Level", "ErrorMessage","UserId"]
        convert => {
            "UserId" => "integer"
        }
        skip_header => true
    }
    date {
        match => ["Date", "yyyy-MM-dd HH:mm:ss"]
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "log"
    }
    stdout {
        codec => rubydebug
    }
}

input の設定

input ブロックでは、データ入力に関する設定を行います。

mode => "tail"

ファイル取り込みモードを指定します。

ファイル取り込みモードには、Tail モードRead モードの 2 種類があります。
Tail モードでは、ファイルにデータが追加されると、それを検知して追加されたデータを取り込むことができます。

path => ["/Users/myuser/work/elasticsearch/log/*_log.csv"]

取り込むファイルのパスを指定します。

先ほど作成したテストデータのパスを指定してください。

sincedb_path => "/Users/myuser/work/elasticsearch/log/sincedb"

sincedb ファイルのパスを指定します。

sincedb ファイルは、ファイルをどこまで取り込んだのかを記録しておくためのファイルです。
もう一度、同じファイルを取り込みたい場合は、sincedb ファイルを削除してください。

sincedb ファイルを作成したくない場合は、「/dev/null」(Windows の場合は「nul」)を指定します。

start_position => "beginning"

ファイルの取り込み位置を指定します。

「beginning(ファイルの先頭)」または「end(ファイルの末尾)」を指定してください。

codec => plain { 
    charset => "UTF-8"
}

取り込むファイルの文字コードを指定します。

デフォルトでは「UTF-8」となりますので、Windows 環境の場合は「SJIS」に変更してください。

※詳細は File input plugin のリファレンスを参照してください。

filter の設定

filter ブロックでは、データ変換に関する設定を行います。

columns => ["Date", "Level", "ErrorMessage","UserId"]

カラム名を指定します。

convert => {
    "UserId" => "integer"
}

カラムのデータ型を指定します。

「UserId」カラムを数値型に変換するように指定します。

skip_header => true

ヘッダー行がある場合は、読み込みをスキップするように指定します。

date {
    match => ["Date", "yyyy-MM-dd HH:mm:ss"]
}

Elasticsearch で「timestamp」として使用するカラムの日時フォーマットを指定します。

※詳細は Csv filter plugin のリファレンスを参照してください。

output の設定

output ブロックでは、データ出力に関する設定を行います。

elasticsearch {
    hosts => ["localhost:9200"]
    index => "log"
}

出力先の Elasticsearch の URI とインデックス名を指定します。

※詳細は Elasticsearch output plugin のリファレンスを参照してください。

stdout {
    codec => rubydebug
}

Elasticsearch への出力と同時に、コマンドプロンプトにも出力するように指定します。

データ取り込みを実行

Logstash を実行して、CSV 形式のログファイルを Elasticsearch へ取り込みます。

sudo bin/logstash -f config/logstash.conf

成功すれば、取り込まれたデータがコマンドプロンプトに出力されます。

{
            "Date" => "2019-05-24 10:00:00",
        "@version" => "1",
            "path" => "/Users/myuser/work/elasticsearch/log/20190525_log.csv",
    "ErrorMessage" => "Success.",
            "host" => "local",
      "@timestamp" => 2019-05-25T04:00:00.000Z,
           "Level" => "INFO",
          "UserId" => 1,
         "message" => "2019-05-24 10:00:00,INFO,Success.,1"
}
・・・以下省略

また、Elasticsearch でインデックスを検索して確認することもできます。

コマンド

GET /log/_search

実行結果

{
  "took" : 45,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 12,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "log",
        "_type" : "_doc",
        "_id" : "_nCI8WoBy6JJU2RqTkdE",
        "_score" : 1.0,
        "_source" : {
          "Date" : "2019-05-24 10:00:00",
          "ErrorMessage" : "Success.",
          "Level" : "INFO",
          "@timestamp" : "2019-05-24T01:00:00.000Z",
          "@version" : "1",
          "path" : "/Users/myuser/work/elasticsearch/log/20190524_log.csv",
          "UserId" : 1,
          "message" : "2019-05-24 10:00:00,INFO,Success.,1",
          "host" : "local"
        }
      },
・・・以下省略
71
58
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
71
58

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?