はじめに
指定された名前の複数インデクスをFilebeatが更新する際にそのインデクスをElasticsearchのIndex lifecycle management (ILM)を使って自動ロールオーバーするやり方について説明する機会があったのでその備忘録。せっかく(?)なのでKibanaをなるべく使って設定してみました。
※少し前のバージョンのElasticsearchを使います。データストリームを使うといろいろと捗りそうですがその場合にはこちらをご参照ください。
利用するソフトウェア
- OS: Ubuntu 18.04
- Elasticsearch (7.8.1)
- Kibana (7.8.1)
- Filebeat(7.8.1)
やりたいこと
ホストA/Bのイベントログ
ホストAおよびBは何らかのイベントログをFilebeatが読み込むログファイルへ転送しているが、Elasticsearchのインデクスに格納する際にはホスト毎に別々のインデクスへ格納したい。またインデクス名「eventlog-host-a」(エイリアス)とか「eventlog-host-a-*」で参照する。
Filebeatからの更新はエイリアス経由でWrite indexへ行う。一方でILMがインデクスを自動的にロールオーバーする(「eventlog-host-a-000002」とか)。
※Filebeatのデフォルトテンプレートの自動アップロードも有効化しておきます(イベントログが独自フォーマットの場合にはElasticsearch側にマッピングテンプレート定義をするかもしれませんが、FilebeatがElasticsearchへ転送する形式にはECSで定義されているフィールドがいくつか追加されるため)。
イベントログ例
説明のために以下のような適当なイベントログを生成しておきます。
{
"event_id": "89b20ab8-09d1-4f82-873c-b1ddc9856718",
"host_name": "host-a",
"data": {
"data_id": "4f2c4521-fedb-4c6d-86a2-1be84722a6c5",
...
}
}
- event_id: イベントログのユニークID
- host_name: ホスト名。host_aまたはhost_bが入ります
- data: 適当なデータ
eventlog-*.jsonには以下のようにインデントなし形式で書き込まれます。この例ではホストAとホストBのログが同一ファイルへ混在しています。
...
{"event_id": "e3ce617d-9220-4e20-b63c-68640089090e", "host_name": "host-a", "data": {"data_id": "c1a83642-e2a0-406a-96ac-29808d4666c4"}}
{"event_id": "71a90178-aa21-40bd-9009-edaf4c0e817d", "host_name": "host-a", "data": {"data_id": "c41fd91a-6509-447c-a38e-df9156491de5"}}
{"event_id": "305675ca-bf06-4366-81a7-859093fa4f7d", "host_name": "host-a", "data": {"data_id": "389e6038-c15d-4419-9309-cb3f7450174c"}}
{"event_id": "6572d7e7-67df-4e5f-8e8b-aebe7c42abc2", "host_name": "host-b", "data": {"data_id": "a8825f3a-8259-4f22-970e-1bb2700ad879"}}
{"event_id": "b3ff56b5-4fc3-4192-95d7-e0f20317c27b", "host_name": "host-b", "data": {"data_id": "d83e944e-06f3-4ca4-ba46-4a93348b1da4"}}
{"event_id": "69871416-a0ae-4b41-8d55-437855090564", "host_name": "host-b", "data": {"data_id": "8938e8b0-a5e3-4931-b427-513f435b634f"}}
...
FilebeatによってElasticsearchのインデクスへ書き込まれるドキュメント形式は以下です。
{
"_index": "eventlog-host-a-000001",
"_type": "_doc",
"_id": "89b20ab8-09d1-4f82-873c-b1ddc9856718",
"_version": 1,
"_score": null,
"_source": {
"@timestamp": "2020-12-05T09:03:15.468Z",
"log": {
"file": {
"path": "/.../eventlog/eventlog-1.json"
},
"offset": 0
},
"input": {
"type": "log"
},
"ecs": {
"version": "1.5.0"
},
"host": {
"os": {
"version": "18.04.3 LTS (Bionic Beaver)",
"family": "debian",
"name": "Ubuntu",
"kernel": "4.15.0-126-generic",
"codename": "bionic",
"platform": "ubuntu"
},
"id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"containerized": false,
"ip": [
"192.168.0.100"
],
"name": "beats-test",
"mac": [
"00:00:00:00:00:00"
],
"hostname": "beats-test",
"architecture": "x86_64"
},
"agent": {
"ephemeral_id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"name": "beats-test",
"type": "filebeat",
"version": "7.8.1",
"hostname": "beats-test"
},
"json": {
"host_name": "host-a",
"data": {
"data_id": "4f2c4521-fedb-4c6d-86a2-1be84722a6c5"
}
}
},
"fields": {
"@timestamp": [
"2020-12-05T09:03:15.468Z"
],
"suricata.eve.timestamp": [
"2020-12-05T09:03:15.468Z"
]
},
"sort": [
1607396595468
]
}
元データはjsonフィールド配下へ入ります(filebeat.yml:json.keys_under_root: false)。またドキュメントIDには元データのevent_idの値を使っています(filebeat.yml:json.document_id: event_id)。
インデクステンプレートの設定
Filebeatが書き込むイベントログのマッピングを定義し、インデクステンプレートをkibanaで作成します。ILMはこのテンプレートにマッチするインデクスに対して適用しますのでホスト毎に用意します。
※ECS部分についてはFilebeatが自動作成してくれるものを使うので上図のjsonフィールド配下だけ定義します。
ホストA用の定義例
"mappings": {
"properties": {
"json": {
"properties": {
"data": {
"properties": {
"data_id": {
"type": "keyword",
"ignore_above": 1024
}
}
},
"host_name": {
"type": "keyword",
"ignore_above": 1024
},
"event_id": {
"type": "keyword",
"ignore_above": 1024
}
}
}
}
}
ホストBも同様に定義しておきます。
Index lifecycle management (ILM)の設定
対象インデクス毎(ホスト毎)にILMポリシーを作成します。#両ホストで共通のILMポリシーでもOKです。
ホストA用の定義例
ILMポリシーを作成
ここでは説明のため5ドキュメントをロールオーバーの閾値に設定しています。
ILMポリシーをホストA用のインデクステンプレートへ設定
各ILMポリシー行のActionsからインデクステンプレートへ紐づけます。
ホスト毎にILMの対象インデクスをエイリアス名(eventlog-host-a/eventlog-host-b)でラップします。
ブートストラップのインデクスを作成
ILMが監視を開始するホスト毎の元インデクス(空でOK)をつくります。ポイントは
* Write Index指定をすること("is_write_index": true):
* インデクス名に番号(-0000001)をつけること(例:eventlog-host-a-000001)
です。
詳細はRollover index APIを参照ください。
ここではKibanaのDev Toolsコンソールから作成しています。
この辺りがわかりづらい・・・
Filebeatの設定
/etc/filebeat/filebeat.ymlを設定します。
...
# ============================== Filebeat inputs ===============================
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
#イベントログファイルがおかれるパス(e.g. eventlog-1.json)
- /.../eventlog/*.json
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ['^DBG']
# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
#include_lines: ['^ERR', '^WARN']
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: ['.gz$']
# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
# level: debug
# review: 1
#jsonフィールドの設定
json.keys_under_root: false
#ドキュメントIDとしてユニーク想定のイベントIDを使います(インデクス再実行時の重複回避)
json.document_id: event_id
# ======================= Elasticsearch template setting =======================
setup.template.settings:
index.number_of_shards: 1
index.number_of_replicas: 0 #シングルノードなのでレプリカ無効化
#index.codec: best_compression
#_source.enabled: false
#Filebeatデフォルトのインデクステンプレートは自動設定にしておきます(ECSのため)
setup.template.enabled: true
#Filebeatのマッピングテンプレートを適用します
setup.template.pattern: "eventlog-*"
#Filebeat側ではILMを無効化
setup.ilm.enabled: false
...
# ================================== Outputs ===================================
# Configure what output to use when sending the data collected by the beat.
# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["localhost:9200"]
# Protocol - either `http` (default) or `https`.
#protocol: "https"
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
#username: "elastic"
#password: "changeme"
#ホスト毎にインデクスを振り分けます
#(元イベントデータのhost_nameフィールド値を使います)
indices:
#filebeatがエイリアス名に対して更新をかけるように設定
- index: eventlog-host-a
when.equals:
json.host_name: host-a
- index: eventlog-host-b
when.equals:
json.host_name: host-b
...
# ================================= Processors =================================
# Configure processors to enhance or manipulate events generated by the beat.
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
#元イベントデータをjsonパーシングしてからElasticsearchへ転送します
- decode_json_fields:
fields: [message]
process_array: true
max_depth: 8
overwrite_keys: true
add_error_key: true
# ================================== Logging ===================================
# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
#logging.level: debug
# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]
...
イベントログを投入!
ホスト毎にイベント(ドキュメント)を投入してみます。ILMの閾値が5ドキュメントのためロールオーバー対象になります。しばらくすると・・・
新しいインデクスeventlog-host-a-000002/eventlog-host-b-000002が自動作成されています。ILMがホスト毎インデクスとしてロールオーバー処理してくれてますね。
Tips
- ElasticsearchのILMのポーリング間隔(デフォルト:10分)をデバッグ時には短めにしておいてもいいかもです。
おまけ
適当なサンプルイベントデータを生成するスクリプトです。
import os
import sys
import uuid
import json
import copy
template = {
"event_id": None,
"host_name": None,
"data": {
"data_id": None
}
}
for i in range(10):
event = copy.deepcopy(template)
event["event_id"] = "{}".format(uuid.uuid4())
event["host_name"] = "host-a"
event["data"]["data_id"] = "{}".format(uuid.uuid4())
print(json.dumps(event))
for i in range(10):
event = copy.deepcopy(template)
event["event_id"] = "{}".format(uuid.uuid4())
event["host_name"] = "host-b"
event["data"]["data_id"] = "{}".format(uuid.uuid4())
print(json.dumps(event))