Elasticsearch
Logstash
Filebeat

Elasticsearch にログを重複・欠損なく格納する案

More than 1 year has passed since last update.

はじめに

Elasticsearch を持っているなら、ログは余すことなく入れておきたくなるだろう。
パースは後でもできるので、とりあえず入れておくだけ。
ただし、情報量を落とすことなくログを入れたい。
ログは件数が意味をもつこともあるので、重複は避けたいし、欠損してほしくはない。

そのためには、個々のログに対して一意となる ID を指定すればよさそうである。

もしもログ送信元 (Logstash など) や送信先 (Elasticsearch) に異常が発生し再送の必要が生じても、
ID があれば重複なく格納できるようになるため、再送を十分に行えるなら欠損分を補えるだろう。

  • ちなみに、もし重複を許すなら、途中まで読んでいたログファイルをもう1度読み込みなおせばよい。
  • なお、もし再送を十分な回数できるなら、ID が(Elasticsearch の自動生成 ID など)ログに対して一意でなくとも 何度もインデックスを消して送りなおせば重複なく格納することはできなくはないが、 ログの量が多くなると時間がかかりすぎる恐れがある。

ここではログに ID を指定し、それを Elasticsearch に格納する方法について述べる。

※なお、Elasticsearch の自動生成 ID を使わないことは、
インデックス作成性能を低下させる (参考:Elasticsearch Reference) ようである。

ログに ID を指定して Elasticsearch に格納する方法

ID の設計方針については参考として後述するが、
たいていの場合は、ファイルパスとログの位置(≒行番号)から ID を作ればなんとかなると思われるので、
その方法について説明する。

Logstash だとログの位置は取れないため Filebeat でファイルを読み込むことになる。
Filebeat では source フィールドにファイルパス、offset フィールドにログの位置が記録される。
しかし Filebeat 単体ではそれら2つのフィールドから ID を生成するということまではできないので、
Logstash の Filter プラグインか、Elasticsearch の Ingest Node に頼ることになる。

Logstash の Filter プラグインで ID を生成する場合は、

  1. Filebeat の出力先に Logstash を指定して、
  2. Logstash Filter プラグイン等で ID を生成し、
  3. Logstash Elasticsearch Output プラグインで ID を指定する。

以下に設定例を示す。

filebeat.yml
filebeat.prospectors:
- input_type: log
  paths:
  - ${LOG_PATH}
output.logstash:
  hosts:
  - ${LOGSTASH_HOST}:${LOGSTASH_PORT}
logstash.conf
input {
  beats {
    port => 5044
  }
}
filter {
  mutate {
    # 元のsourceは残しておきたいため一時退避。
    # 退避先はメタデータにしておけば Elasticsearch には格納されないため、煩わしくない。
    copy => {'source' => '[@metadata][source]'}

    # 退避したメタデータフィールドを使って、ID 用に変換。
    # この例では、ファイルパスのスラッシュをアンダースコアに変えている。
    gsub => ['[@metadata][source]', '/', '_']
  }
}
output {
  elasticsearch {
    hosts => '${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}'

    # ID を生成。
    # この例では、ファイルパスにログ位置をアンダースコアで連結している。
    document_id => '%{[@metadata][source]}_%{offset}'
  }
}

Ingest Node で ID を生成する場合は、

  1. Elasticsearch に ID 生成用の Pipeline を作成しておき、
  2. Filebeat の出力先に Elasticsearch と Pipeline を指定する。

以下に設定例を示す。

_ingest/pipeline/generate-id
{
  "generate-id" : {
    "description" : "Generate ID with source and offset",
    "processors" : [
      {
        "set" : {
          "field" : "_id",
          "value" : "{{source}}_{{offset}}"
        }
      }
    ]
  }
}
filebeat.yml
filebeat.prospectors:
- input_type: log
  paths:
  - ${LOG_PATH}
output.elasticsearch:
  hosts:
  - ${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}
  pipeline: generate-id

(参考)ID の設計方針

まず、あるファイルパスが特定のログの実体を指し示す場合を考える。
すなわち、ファイルは後から別の内容で上書きされるということがなく追記されるのみであり、
たとえば、ログローテーションしないような場合である。

この場合は、ファイルパスとログの位置(≒行番号)から ID を作れる。

/var/log/foo/2017-11-10.log  ->  var_log_foo_2017-11-10_log_123456

もしも別のホストで同じファイルパスが現れる可能性があるなら、ホスト名や IP アドレスを付加する。

host01:/var/log/foo/2017-11-10.log  ->  host01_var_log_foo_2017-11-10_log_123456
host12:/var/log/foo/2017-11-10.log  ->  host12_var_log_foo_2017-11-10_log_123456

ホストが時間によって増減し、同ホスト名、同 IP アドレスが何度か現れる可能性があるとしたら、
ある時間帯では一意だろうから、増減に対して十分小さな時間(日付など)を付加する。

2017/11/10; host23:/var/log/foo/app.log  ->  2017-11-10-host23_var_log_foo_app_log_123456
2017/11/13; host23:/var/log/foo/app.log  ->  2017-11-13-host23_var_log_foo_app_log_123456

次に、あるファイルパスに異なるログの実体が配置される場合を考える。
前述と逆で、たとえばログローテーションする場合である。

この場合でも、ある時間帯では一意だろうから、
ログローテーション間隔に対して十分小さな時間帯を ID に付加すればよい。

2017/11/10; host23:/var/log/messages  ->  2017-11-10-host23_var_log_messages_123456
2017/11/13; host23:/var/log/messages  ->  2017-11-13-host23_var_log_messages_123456

なお ID の長さに制限がある (参考:Elasticsearch のフォーラム) ようなので、
一意性を損なわない程度にファイルパスやホスト名などを短くする必要が生じるかもしれない。

(参考)ハッシュ値による ID は衝突率に注意

ハッシュ値は、衝突する(異なる入力に対して同じ結果が出力される)可能性があるので、
ID として使用する場合にはログの欠損が生じる可能性があることを念頭に入れなければならない。
インデックスあたりのログの件数が少なければ無視できる確率とみなせるかもしれないが、
1度もハッシュが衝突しない確率は意外と大きいので、
ログの欠損を絶対に許さない場合はハッシュ値を使うべきではない。

逆に言うと ID をきちんと設計できる情報がそろっているのに、
それを ID にせずにハッシュ化して、衝突させてしまったりするともったいない。
ハッシュは多少ログが欠損してもよい場合で、ID の設計が面倒なときに使えばよい。