Help us understand the problem. What is going on with this article?

ElastAlertでカスタムルールを作成する

この記事は MicroAd (マイクロアド) Advent Calendar 2019 の9日目の記事です。

はじめに

みんな大好きElasticsearchですが、
「Elasticsearch上のデータを常時監視して、特定のパターンが発生したらアラートを飛ばしたい」というときに便利なのが、ElastAlertというツールです。

ElastAlertにはRuleと呼ばれる監視パターンがデフォルトで何種類も用意されているのですが、それだけでは自分の要望を満たせないときもあります。
そんなときは、カスタムルールと呼ばれる新しい監視パターンを自分で作成して解決しましょう。

作り方

カスタムルールを作る際に参考にしたのはこの2つ。とくに後者は実用的なコードが載っていて必見です。

今回作成するカスタムルール

この記事では、

  • 複数のフィールドの値をそれぞれ集計し、集計結果同士を四則演算して、その結果を閾値と比較する

ことができるルールを作成します。

例えば、Elasticsearchのあるindexにfield_afield_bがあるとして、

-0.001 < sum(field_a) - sum(field_b) < 0.001

という状態でなくなったらアラートする、みたいなことがやりたいわけです。

筆者の具体的な用途の例としては、「本来は一致しているはずだけれど、互いに異なる値になっていたらまずい」2つのデータに対してこのルールを設定し、違う値になっていたらすぐに通知を飛ばす、という感じです。

デフォルトのルールの中ではMetricAggregationRuleが近い機能を持っていますが、このルールは単一のフィールドを集計した結果しか利用できないため、「集計結果同士の演算結果を監視する」ことはできません。

カスタムルールを作成

MetricAggregationRuleクラスを参考にしつつも、集計したデータを扱うルールのベースであるBaseAggregationRuleを継承して、BinaryOperationOnAggregatedMetricRuleというクラスを作成していきます。

elastalert/elastalert_modules/custum_rules.py
import operator as op
from elastalert.util import EAException
from elastalert.ruletypes import BaseAggregationRule

class BinaryOperationOnAggregatedMetricRule(BaseAggregationRule):

    required_options = frozenset([
        'metric_agg_key_first', 'metric_agg_key_second', 'metric_agg_type_first', 'metric_agg_type_second',
        'binary_operator'
    ])
    allowed_aggregations = frozenset(['min', 'max', 'avg', 'sum', 'cardinality', 'value_count'])
    allowed_binary_operators = {'add': {'func': op.add, 'sign': '+'},
                                'subtract': {'func': op.sub, 'sign': '-'},
                                'multiply': {'func': op.mul, 'sign': '*'},
                                'divide': {'func': op.truediv, 'sign': '/'}}

    def __init__(self, *args):
        super(BinaryOperationOnAggregatedMetricRule, self).__init__(*args)
        self.ts_field = self.rules.get('timestamp_field', '@timestamp')
        self.metric_key_first = 'metric_' + self.rules['metric_agg_key_first'] + '_' + self.rules['metric_agg_type_first']
        self.metric_key_second = 'metric_' + self.rules['metric_agg_key_second'] + '_' + self.rules['metric_agg_type_second']
        self.binary_operator = self.allowed_binary_operators[self.rules['binary_operator']]
        self.rules['aggregation_query_element'] = self.generate_aggregation_query()

        if not self.rules['metric_agg_type_first'] in self.allowed_aggregations \
            or not self.rules['metric_agg_type_second'] in self.allowed_aggregations:
            raise EAException("metric_agg_type must be one of %s" % (str(self.allowed_aggregations)))

        if not self.rules['binary_operator'] in self.allowed_binary_operators.keys():
            raise EAException("binary_operator must be one of %s" % (str(self.allowed_binary_operators.keys())))

        if 'max_threshold' not in self.rules and 'min_threshold' not in self.rules:
            raise EAException("BinaryOperationOnAggregatedMetricRule must have at least one of either max_threshold or min_threshold")

    def get_match_str(self, match):
        message = 'Threshold violation, %s_%s %s %s_%s = %s (min: %s max : %s)\n\n' % (
            self.rules['metric_agg_key_first'],
            self.rules['metric_agg_type_first'],
            self.binary_operator['sign'],
            self.rules['metric_agg_key_second'],
            self.rules['metric_agg_type_second'],
            str(self.binary_operator['func'](*[match[self.metric_key_first], match[self.metric_key_second]])),
            self.rules.get('min_threshold'),
            self.rules.get('max_threshold')
        )
        if self.rules.get('delete_ruletype_text'):
            message = ''

        top_events = [[key[11:], counts] for key, counts in match.items() if key.startswith('top_events_')]

        def events_to_message(items):
            message = ''
            items = sorted(items, key=lambda x: x[1], reverse=True)
            for term, count in items:
                message += '%s : %s\n' % (term, count)
            return message

        for key, counts in top_events:
            message += '%s:\n' % (key)
            message += '%s\n' % (events_to_message(counts.items()))

        return message

    def generate_aggregation_query(self):
        """
        custom_top_count_keys: A list of fields.
            ElastAlert will perform a terms query for the top X most common values for each of the fields,
            where X is 5 by default, or custom_top_count_number if it exists.
        custom_top_count_number: The number of terms to list if custom_top_count_keys is set. (Optional, integer, default 5)
        """

        query = {
            "all_matching_docs": {
                "filters": {
                    "filters": {
                        "all": {
                            "match_all": {}
                        }
                    }
                },
                'aggs': {
                    'topx_match_aggs': {
                        "filter": {
                            "bool": {
                                "must": []
                            }
                        },
                        'aggregations': {
                        }
                    },
                    self.metric_key_first: {
                        self.rules['metric_agg_type_first']: {
                            'field': self.rules['metric_agg_key_first']
                        }
                    },
                    self.metric_key_second: {
                        self.rules['metric_agg_type_second']: {
                            'field': self.rules['metric_agg_key_second']
                        }
                    },
                    'binary_operation': {
                        'bucket_script': {
                            'buckets_path': {
                                'first': self.metric_key_first,
                                'second': self.metric_key_second
                            },
                            'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
                        }
                    }
                }
            }
        }

        if self.rules.get('custom_top_count_keys'):
            number = self.top_count_number = self.rules.get('custom_top_count_number', 5)
            keys = self.top_count_keys = self.rules.get('custom_top_count_keys')
            for key in keys:
                child_query = {
                    'terms': {
                        'field': key,
                        'order': {'_count': 'desc'},
                        'size': number
                    },
                    'aggs': {
                        'metric_aggregation_first': {
                            self.rules['metric_agg_type_first']: {'field': self.rules['metric_agg_key_first']}
                        },
                        'metric_aggregation_second': {
                            self.rules['metric_agg_type_second']: {'field': self.rules['metric_agg_key_second']}
                        },
                        'metric_aggregation': {
                            'bucket_script': {
                                'buckets_path': {
                                    'first': 'metric_aggregation_first',
                                    'second': 'metric_aggregation_second'
                                },
                                'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
                            }
                        }

                    }
                }
                query['all_matching_docs']['aggs']['topx_match_aggs']['aggregations'][key] = child_query
        return query

    def check_matches(self, timestamp, query_key, aggregation_data):
        if "compound_query_key" in self.rules:
            self.check_matches_recursive(timestamp, query_key, aggregation_data, self.rules['compound_query_key'], dict())
        else:
            metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
            metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
            binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']

            if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
                match = {
                    self.rules['timestamp_field']: timestamp,
                    self.metric_key_first: metric_val_first,
                    self.metric_key_second: metric_val_second,
                    'binary_operation': binary_operation
                }
                if query_key is not None:
                    match[self.rules['query_key']] = query_key

                # Set TopX counts
                if self.rules.get('custom_top_count_keys'):
                    counts = self.get_top_counts(aggregation_data)
                    match.update(counts)

                self.add_match(match)

    def check_matches_recursive(self, timestamp, query_key, aggregation_data, compound_keys, match_data):
        if len(compound_keys) < 1:
            # shouldn't get to this point, but checking for safety
            return

        match_data[compound_keys[0]] = aggregation_data['key']
        if 'bucket_aggs' in aggregation_data:
            for result in aggregation_data['bucket_aggs']['buckets']:
                self.check_matches_recursive(timestamp,
                                             query_key,
                                             result,
                                             compound_keys[1:],
                                             match_data)
        else:
            metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
            metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
            binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']

            if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
                match_data[self.rules['timestamp_field']] = timestamp
                match_data[self.metric_key_first] = metric_val_first
                match_data[self.metric_key_second] = metric_val_second
                match_data['binary_operation'] = binary_operation

                # add compound key to payload to allow alerts to trigger for every unique occurrence
                compound_value = [match_data[key] for key in self.rules['compound_query_key']]
                match_data[self.rules['query_key']] = ",".join([str(value) for value in compound_value])

                # Set TopX counts
                if self.rules.get('custom_top_count_keys'):
                    counts = self.get_top_counts(aggregation_data)
                    match_data.update(counts)

                self.add_match(match_data)

    def get_top_counts(self, aggregation_data):
        """
        Counts the number of events for each unique value for each key field.
        Returns a dictionary with top_events_<key> mapped to the top 5 counts for each key.
        """
        all_counts = {}
        number = self.top_count_number
        keys = self.top_count_keys
        for key in keys:

            hits_terms = aggregation_data['all_matching_docs']['buckets']['all'].get('topx_match_aggs').get(key, None)
            if hits_terms is None:
                top_events_count = {}
            else:
                buckets = hits_terms.get('buckets')

                terms = {}
                for bucket in buckets:
                    terms[bucket['key']] = bucket['metric_aggregation']['value']
                counts = terms.items()
                counts = sorted(counts, key=lambda x: x[1], reverse=True)
                top_events_count = dict(counts[:number])

            # Save a dict with the top 5 events by key
            all_counts['top_events_%s' % (key)] = top_events_count

        return all_counts

    def fulfill_condition(self, metric_val_first, metric_val_second, binary_operator):
        if metric_val_first is None or metric_val_second is None:
            return False
        if metric_val_second == 0 and binary_operator == op.truediv:
            return False
        if 'max_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) > self.rules['max_threshold']:
            return True
        if 'min_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) < self.rules['min_threshold']:
            return True
        return False

コードはpythonですが、ポイントの大半は、内部で発行されるElasticsearchのクエリの構造を理解する所にあります。

今回のクエリではbucket_scriptbuckets_pathの内容を理解しておくことが必要でした。

設定ファイルの作成

このルールを使う設定ファイルで指定する項目は、次のようになります。

  • 各フィールドの集計関数:'min', 'max', 'avg', 'sum', 'cardinality', 'value_count'
  • 集計結果の演算:+, -, ×, ÷の4種類のうち1つ

がそれぞれ使えます。

rule_setting.yaml
es_host: <host_name>
es_port: <port_number>

name: your rule name
type: "elastalert_modules.custom_rules.BinaryOperationOnAggregatedMetricRule"

index: <index_name>
timestamp_field: <timestamp_field_name>
doc_type: <doc_type_name>

# metric_agg_type must be one of ['min', 'max', 'avg', 'sum', 'cardinality', 'value_count']
# binary_operator must be one of ['add', 'subtract', 'multiply', 'divide']
metric_agg_key_first: fielde_a
metric_agg_type_first: sum
metric_agg_key_second: fielde_b
metric_agg_type_second: sum
binary_operator: subtract

min_threshold: -0.0001
max_threshold: 0.00001

query_key:
  - xxxxxx_id

custom_top_count_keys:
  - zzzzzz_id

アラートのサンプル

設定とプログラムがうまく機能すれば、次のようなアラートが飛んでくるはずです。

your rule name

Threshold violation, fielde_a_sum - fielde_b_sum = 0.25 (min: -0.0001 max : 0.00001)

zzzzzz_id:
19 : 0.25

binary_operation: 0.25
xxxxxx_id: 19
time: 2019-12-08T00:00:00.00000000Z
metric_fielde_a_sum: 1.0
metric_fielde_b_sum: 0.75
num_hits: 5000
num_matches: 1

おわりに

意図通り動くクエリの発行さえできれば、大抵の監視パターンは実現可能です。

ただし、時間を指定する部分をカスタマイズするときだけ、少し困るケースが出てきます。
というのも、時間を指定する部分に関しては、カスタムルールとは別の箇所でクエリが組み立てられているため、そのままでは時間指定までカスタマイズする事が難しいからです。

でも大丈夫。なんとかなります。なんとかなる様子を次の機会に紹介できればと思います。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした