この記事は MicroAd (マイクロアド) Advent Calendar 2019 の9日目の記事です。
はじめに
みんな大好きElasticsearchですが、
「Elasticsearch上のデータを常時監視して、特定のパターンが発生したらアラートを飛ばしたい」というときに便利なのが、ElastAlertというツールです。
ElastAlertにはRule
と呼ばれる監視パターンがデフォルトで何種類も用意されているのですが、それだけでは自分の要望を満たせないときもあります。
そんなときは、カスタムルールと呼ばれる新しい監視パターンを自分で作成して解決しましょう。
作り方
カスタムルールを作る際に参考にしたのはこの2つ。とくに後者は実用的なコードが載っていて必見です。
今回作成するカスタムルール
この記事では、
- 複数のフィールドの値をそれぞれ集計し、集計結果同士を四則演算して、その結果を閾値と比較する
ことができるルールを作成します。
例えば、Elasticsearchのあるindexにfield_a
とfield_b
があるとして、
-0.001 < sum(field_a) - sum(field_b) < 0.001
という状態でなくなったらアラートする、みたいなことがやりたいわけです。
筆者の具体的な用途の例としては、「本来は一致しているはずだけれど、互いに異なる値になっていたらまずい」2つのデータに対してこのルールを設定し、違う値になっていたらすぐに通知を飛ばす、という感じです。
デフォルトのルールの中ではMetricAggregationRule
が近い機能を持っていますが、このルールは単一のフィールドを集計した結果しか利用できないため、「集計結果同士の演算結果を監視する」ことはできません。
カスタムルールを作成
MetricAggregationRule
クラスを参考にしつつも、集計したデータを扱うルールのベースであるBaseAggregationRuleを継承して、BinaryOperationOnAggregatedMetricRule
というクラスを作成していきます。
import operator as op
from elastalert.util import EAException
from elastalert.ruletypes import BaseAggregationRule
class BinaryOperationOnAggregatedMetricRule(BaseAggregationRule):
required_options = frozenset([
'metric_agg_key_first', 'metric_agg_key_second', 'metric_agg_type_first', 'metric_agg_type_second',
'binary_operator'
])
allowed_aggregations = frozenset(['min', 'max', 'avg', 'sum', 'cardinality', 'value_count'])
allowed_binary_operators = {'add': {'func': op.add, 'sign': '+'},
'subtract': {'func': op.sub, 'sign': '-'},
'multiply': {'func': op.mul, 'sign': '*'},
'divide': {'func': op.truediv, 'sign': '/'}}
def __init__(self, *args):
super(BinaryOperationOnAggregatedMetricRule, self).__init__(*args)
self.ts_field = self.rules.get('timestamp_field', '@timestamp')
self.metric_key_first = 'metric_' + self.rules['metric_agg_key_first'] + '_' + self.rules['metric_agg_type_first']
self.metric_key_second = 'metric_' + self.rules['metric_agg_key_second'] + '_' + self.rules['metric_agg_type_second']
self.binary_operator = self.allowed_binary_operators[self.rules['binary_operator']]
self.rules['aggregation_query_element'] = self.generate_aggregation_query()
if not self.rules['metric_agg_type_first'] in self.allowed_aggregations \
or not self.rules['metric_agg_type_second'] in self.allowed_aggregations:
raise EAException("metric_agg_type must be one of %s" % (str(self.allowed_aggregations)))
if not self.rules['binary_operator'] in self.allowed_binary_operators.keys():
raise EAException("binary_operator must be one of %s" % (str(self.allowed_binary_operators.keys())))
if 'max_threshold' not in self.rules and 'min_threshold' not in self.rules:
raise EAException("BinaryOperationOnAggregatedMetricRule must have at least one of either max_threshold or min_threshold")
def get_match_str(self, match):
message = 'Threshold violation, %s_%s %s %s_%s = %s (min: %s max : %s)\n\n' % (
self.rules['metric_agg_key_first'],
self.rules['metric_agg_type_first'],
self.binary_operator['sign'],
self.rules['metric_agg_key_second'],
self.rules['metric_agg_type_second'],
str(self.binary_operator['func'](*[match[self.metric_key_first], match[self.metric_key_second]])),
self.rules.get('min_threshold'),
self.rules.get('max_threshold')
)
if self.rules.get('delete_ruletype_text'):
message = ''
top_events = [[key[11:], counts] for key, counts in match.items() if key.startswith('top_events_')]
def events_to_message(items):
message = ''
items = sorted(items, key=lambda x: x[1], reverse=True)
for term, count in items:
message += '%s : %s\n' % (term, count)
return message
for key, counts in top_events:
message += '%s:\n' % (key)
message += '%s\n' % (events_to_message(counts.items()))
return message
def generate_aggregation_query(self):
"""
custom_top_count_keys: A list of fields.
ElastAlert will perform a terms query for the top X most common values for each of the fields,
where X is 5 by default, or custom_top_count_number if it exists.
custom_top_count_number: The number of terms to list if custom_top_count_keys is set. (Optional, integer, default 5)
"""
query = {
"all_matching_docs": {
"filters": {
"filters": {
"all": {
"match_all": {}
}
}
},
'aggs': {
'topx_match_aggs': {
"filter": {
"bool": {
"must": []
}
},
'aggregations': {
}
},
self.metric_key_first: {
self.rules['metric_agg_type_first']: {
'field': self.rules['metric_agg_key_first']
}
},
self.metric_key_second: {
self.rules['metric_agg_type_second']: {
'field': self.rules['metric_agg_key_second']
}
},
'binary_operation': {
'bucket_script': {
'buckets_path': {
'first': self.metric_key_first,
'second': self.metric_key_second
},
'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
}
}
}
}
}
if self.rules.get('custom_top_count_keys'):
number = self.top_count_number = self.rules.get('custom_top_count_number', 5)
keys = self.top_count_keys = self.rules.get('custom_top_count_keys')
for key in keys:
child_query = {
'terms': {
'field': key,
'order': {'_count': 'desc'},
'size': number
},
'aggs': {
'metric_aggregation_first': {
self.rules['metric_agg_type_first']: {'field': self.rules['metric_agg_key_first']}
},
'metric_aggregation_second': {
self.rules['metric_agg_type_second']: {'field': self.rules['metric_agg_key_second']}
},
'metric_aggregation': {
'bucket_script': {
'buckets_path': {
'first': 'metric_aggregation_first',
'second': 'metric_aggregation_second'
},
'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
}
}
}
}
query['all_matching_docs']['aggs']['topx_match_aggs']['aggregations'][key] = child_query
return query
def check_matches(self, timestamp, query_key, aggregation_data):
if "compound_query_key" in self.rules:
self.check_matches_recursive(timestamp, query_key, aggregation_data, self.rules['compound_query_key'], dict())
else:
metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']
if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
match = {
self.rules['timestamp_field']: timestamp,
self.metric_key_first: metric_val_first,
self.metric_key_second: metric_val_second,
'binary_operation': binary_operation
}
if query_key is not None:
match[self.rules['query_key']] = query_key
# Set TopX counts
if self.rules.get('custom_top_count_keys'):
counts = self.get_top_counts(aggregation_data)
match.update(counts)
self.add_match(match)
def check_matches_recursive(self, timestamp, query_key, aggregation_data, compound_keys, match_data):
if len(compound_keys) < 1:
# shouldn't get to this point, but checking for safety
return
match_data[compound_keys[0]] = aggregation_data['key']
if 'bucket_aggs' in aggregation_data:
for result in aggregation_data['bucket_aggs']['buckets']:
self.check_matches_recursive(timestamp,
query_key,
result,
compound_keys[1:],
match_data)
else:
metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']
if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
match_data[self.rules['timestamp_field']] = timestamp
match_data[self.metric_key_first] = metric_val_first
match_data[self.metric_key_second] = metric_val_second
match_data['binary_operation'] = binary_operation
# add compound key to payload to allow alerts to trigger for every unique occurrence
compound_value = [match_data[key] for key in self.rules['compound_query_key']]
match_data[self.rules['query_key']] = ",".join([str(value) for value in compound_value])
# Set TopX counts
if self.rules.get('custom_top_count_keys'):
counts = self.get_top_counts(aggregation_data)
match_data.update(counts)
self.add_match(match_data)
def get_top_counts(self, aggregation_data):
"""
Counts the number of events for each unique value for each key field.
Returns a dictionary with top_events_<key> mapped to the top 5 counts for each key.
"""
all_counts = {}
number = self.top_count_number
keys = self.top_count_keys
for key in keys:
hits_terms = aggregation_data['all_matching_docs']['buckets']['all'].get('topx_match_aggs').get(key, None)
if hits_terms is None:
top_events_count = {}
else:
buckets = hits_terms.get('buckets')
terms = {}
for bucket in buckets:
terms[bucket['key']] = bucket['metric_aggregation']['value']
counts = terms.items()
counts = sorted(counts, key=lambda x: x[1], reverse=True)
top_events_count = dict(counts[:number])
# Save a dict with the top 5 events by key
all_counts['top_events_%s' % (key)] = top_events_count
return all_counts
def fulfill_condition(self, metric_val_first, metric_val_second, binary_operator):
if metric_val_first is None or metric_val_second is None:
return False
if metric_val_second == 0 and binary_operator == op.truediv:
return False
if 'max_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) > self.rules['max_threshold']:
return True
if 'min_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) < self.rules['min_threshold']:
return True
return False
コードはpythonですが、ポイントの大半は、内部で発行されるElasticsearchのクエリの構造
を理解する所にあります。
今回のクエリではbucket_script
とbuckets_path
の内容を理解しておくことが必要でした。
設定ファイルの作成
このルールを使う設定ファイルで指定する項目は、次のようになります。
- 各フィールドの集計関数:
'min', 'max', 'avg', 'sum', 'cardinality', 'value_count'
- 集計結果の演算:
+, -, ×, ÷
の4種類のうち1つ
がそれぞれ使えます。
es_host: <host_name>
es_port: <port_number>
name: your rule name
type: "elastalert_modules.custom_rules.BinaryOperationOnAggregatedMetricRule"
index: <index_name>
timestamp_field: <timestamp_field_name>
doc_type: <doc_type_name>
# metric_agg_type must be one of ['min', 'max', 'avg', 'sum', 'cardinality', 'value_count']
# binary_operator must be one of ['add', 'subtract', 'multiply', 'divide']
metric_agg_key_first: fielde_a
metric_agg_type_first: sum
metric_agg_key_second: fielde_b
metric_agg_type_second: sum
binary_operator: subtract
min_threshold: -0.0001
max_threshold: 0.00001
query_key:
- xxxxxx_id
custom_top_count_keys:
- zzzzzz_id
アラートのサンプル
設定とプログラムがうまく機能すれば、次のようなアラートが飛んでくるはずです。
your rule name
Threshold violation, fielde_a_sum - fielde_b_sum = 0.25 (min: -0.0001 max : 0.00001)
zzzzzz_id:
19 : 0.25
binary_operation: 0.25
xxxxxx_id: 19
time: 2019-12-08T00:00:00.00000000Z
metric_fielde_a_sum: 1.0
metric_fielde_b_sum: 0.75
num_hits: 5000
num_matches: 1
おわりに
意図通り動くクエリの発行さえできれば、大抵の監視パターンは実現可能です。
ただし、時間を指定する部分をカスタマイズするときだけ、少し困るケースが出てきます。
というのも、時間を指定する部分に関しては、カスタムルールとは別の箇所でクエリが組み立てられているため、そのままでは時間指定までカスタマイズする事が難しいからです。
でも大丈夫。なんとかなります。なんとかなる様子を次の機会に紹介できればと思います。