LoginSignup
5

More than 5 years have passed since last update.

ElasticsearchのRollup機能を使ってドキュメントを圧縮する

Last updated at Posted at 2018-07-29

Elasticsearchの6.3.0からRollupというインデックスのドキュメントを集計して別のインデックスに保存することで、保存しておくドキュメントを圧縮する機能が追加されました。
例えばサーバーのアクセスログやメトリックデータ等の時系列データについて、直近のデータは1秒間隔で参照したいけど1ヵ月前のデータは1時間間隔で集計されたデータが参照出来れば十分、といった場合にRollup機能が活用できます。
なお、RollupはX-PackのBASICライセンスで利用可能なので、Elasticsearchのバージョンを6.3.0以降にアップグレードするだけで利用可能です。

環境

Elasticsearch 6.3.2

サンプルデータ

以下のように、日付、URL、ステータスコード、レスポンス時間を持っている簡素なサーバーログが1日分存在します。

{
    "date": "2018-07-01T00:00:00",
    "response_time": 591,
    "status_code": 200,
    "url": "/test/huga/"
},
{
    "date": "2018-07-01T00:00:10",
    "response_time": 632,
    "status_code": 404,
    "url": "/test/hoge/"
},
{
    "date": "2018-07-01T00:00:20",
    "response_time": 1237,
    "status_code": 500,
    "url": "/test/piyo/"
}

Rollupジョブの作成

Create Job APIを利用して、Rollup Jobを作成できます。

エンドポイント
PUT _xpack/rollup/job/{任意のジョブID}
リクエストボディの例
{
  "index_pattern": "access-log*",
  "rollup_index": "rollup_access-log",
  "cron": "* * * * * ?",
  "page_size": 1000,
  "groups": {
    "date_histogram": {
      "field": "date",
      "interval": "1h"
    },
    "terms": {
      "fields": [
        "url",
        "status_code"
      ]
    }
  },
  "metrics": [
    {
      "field": "response_time",
      "metrics": [
        "avg",
        "max"
      ]
    }
  ]
}

リクエストボディに以下のような内容を設定します。(Rollupジョブ設定内容の公式ドキュメント

項目 設定内容
index_pattern Rollup対象となるインデックスのパターン
rollup_index Rollup後のドキュメントを保存するインデックス
cron cron形式のジョブ実行間隔(設定したインデックス名が既存のマッピング設定にマッチしてしまうとそのマッピングが反映されてしまうので注意)
page_size 1回のAggregation毎に処理するバケット数(値が大きいほど処理速度とメモリ使用量が増加します)
groups グルーピングするフィールド名やグルーピング方法
metrics 収集するメトリック

Groupsの設定

groupsには、Rollupする際のグルーピング方法を設定します。
グルーピング方法として以下が利用可能です。

  • Date Histogram
    日付データに対する集計を行います。date_histogramの設定は必須です。
項目 必須 設定内容
field 集約する日付型フィールド名
interval 集約する時間間隔
delay Rollup対象ドキュメントの上限(既定値は全データがRollup対象)
time_zone フィールドのタイムゾーン(既定値はUTC)

intervaldelayにはElasticsearchで利用可能な日付フォーマットで設定を行います。
delayに値を設定した場合、前回のジョブ実行時から現在時刻 - 設定値の日付データまでがRollup対象となります。
(例えば"1d"を設定した場合、now - 1dまでのデータがRollup対象となります)
この設定は、例えばドキュメントの更新間隔がランダムでリアルタイムにはRollup出来ないとき、毎日00:00に前日分までのドキュメントをRollupさせるようなジョブを作成する場合に利用します。

  • Terms
    keyword型か数値型のフィールド値に対する集計を行います。
項目 必須 設定内容
fields 集約するkeyword型か数値型フィールド名
  • Histogram
    数値データに対する集計を行います。
項目 必須 設定内容
fields 集約する数値型フィールド名
interval 集約する間隔

intervalにはヒストグラムの階級の間隔を設定します。
例えば5を設定した場合、0-5, 5-10...のような階級となります。
この値はfieldsで設定した全フィールドで共通です。

Metricsの設定

metricsには、Rollup後インデックスに集計して保存しておきたい集計データを設定します。

項目 必須 設定内容
field 集計する数値型フィールド名
metrics 集計方法(min/max/sum/avg/value_count)

avgを選択した場合、groupsに設定した集約間隔単位での平均値となります。
そのため、その集約間隔単位をさらにまとめた単位での平均を取得したい(例えば1h単位での集約結果を元に3h単位での平均を取得したい)場合、avgを利用すると平均の平均となってしまい正しい平均値が取得できません。
こういった場合はsumvalue_countの集計結果を保存しておくことで、任意の単位での平均値を求めることが出来ます。

Rollupジョブの開始

Create Job APIを実行してジョブを作成した直後はRollupジョブが停止状態になっています。
Start Job APIを利用することでジョブを開始できます。

エンドポイント
POST _xpack/rollup/job/{開始するジョブID}/_start

ジョブが実行されるとRollup後インデックスに以下のようなドキュメントが追加されます。

ドキュメントの例
{
    "_index": "rollup_access-log_14",
    "_type": "_doc",
    "_id": "674066619",
    "_score": 1,
    "_source": {
      "url.terms._count": 37,
      "url.terms.value": "/test/hoge/",
      "date.date_histogram._count": 37,
      "status_code.terms.value": 200,
      "response_time.avg._count": 37,
      "response_time.max.value": 1472,
      "date.date_histogram.timestamp": 1530403200000,
      "date.date_histogram.interval": "1h",
      "response_time.avg.value": 28563,
      "_rollup.version": 1,
      "date.date_histogram.time_zone": "UTC",
      "status_code.terms._count": 37,
      "_rollup.id": "rollup_test_14"
    }
  },
  {
    "_index": "rollup_access-log_14",
    "_type": "_doc",
    "_id": "1469377731",
    "_score": 1,
    "_source": {
      "url.terms._count": 25,
      "url.terms.value": "/test/hoge/",
      "date.date_histogram._count": 25,
      "status_code.terms.value": 404,
      "response_time.avg._count": 25,
      "response_time.max.value": 1450,
      "date.date_histogram.timestamp": 1530403200000,
      "date.date_histogram.interval": "1h",
      "response_time.avg.value": 17739,
      "_rollup.version": 1,
      "date.date_histogram.time_zone": "UTC",
      "status_code.terms._count": 25,
      "_rollup.id": "rollup_test_14"
    }
}

ドキュメントの取得

Rollup後ドキュメントのみを取得

Rollup後ドキュメントを検索するには、以下のエンドポイントへリクエストを行います。
Rollup後ドキュメントは集計済みのデータになるため、リクエストボディに集計条件を指定する必要があります。
また、sizeは0を指定しておく必要があります。

エンドポイント
GET {インデックスパターン}/_rollup_search
リクエストボディの例
{
  "size": 0,
  "aggs": {
    "date_hist": {
      "date_histogram": {
        "field": "date",
        "interval": "1h"
      },
      "aggs": {
        "avg_response_time": {
          "avg": {
            "field": "response_time"
          }
        }
      }
    }
  }
}
response(aggregations)
{
  "aggregations": {
    "date_hist": {
      "meta": {},
      "buckets": [
        {
          "key_as_string": "2018-07-01T00:00:00.000Z",
          "key": 1530403200000,
          "doc_count": 261,
          "avg_response_time": {
            "value": 730.3371647509579
          }
        },
        {
          "key_as_string": "2018-07-01T01:00:00.000Z",
          "key": 1530406800000,
          "doc_count": 360,
          "avg_response_time": {
            "value": 738.6527777777778
          }
        },
        ・・・
      ]
    }
  }
}

インデックスパターンの適用対象には、通常のインデックスとRollup後インデックスの両方が含まれますが、Rollup後インデックスは1つのみになるようにする必要があります。
(複数含まれていると上手く検索できません)

また、集計条件に指定出来るのはRollupジョブの作成時に設定した条件が対象となるので、それ以外の条件は指定出来ません。
例えば今回の例ではresponse_timeavgmaxのみをジョブ設定で指定しているため、minの集計を実行しようとするとエラーになってしまいます。

エラーになるリクエストの例
{
  "size": 0,
  "aggs": {
    "min_response_time": {
      "min": {
        "field": "response_time"
      }
    }
  }
}
エラーレスポンス
{
  "error": {
    "root_cause": [
      {
        "type": "illegal_argument_exception",
        "reason": "There is not a rollup job that has a [min] agg with name [min_response_time] which also satisfies all requirements of query."
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "There is not a rollup job that has a [min] agg with name [min_response_time] which also satisfies all requirements of query."
  },
  "status": 400
}

通常のドキュメントとRollup後ドキュメントを取得

以下のように通常のインデックスとRollup後インデックスを,区切りで指定すれば、それぞれのインデックスからドキュメントを取得することが出来ます。
リクエストボディに設定する条件は、Rollup後ドキュメントのみを取得する場合と同様です。

エンドポイント
GET {通常のインデックスのパターン},{Rollup後インデックスのパターン}/_rollup_search

Rollup後ドキュメントの圧縮量

Rollup後ドキュメントがどのくらい圧縮されるのかは、Rollupジョブの設定によります。
groupsで細かくグルーピングの設定を行っているとその分圧縮率は減りますが、ある程度元データに近い状態で保存しておくことが出来ます。
逆にグルーピングの設定量が少なければ圧縮率はあがりますが、元データから欠損する情報量は増えます。
この辺りは利用目的に合わせて適宜設定してください。
(参考までに今回の例だと 522.8KB -> 34.1KB に圧縮されました)

その他

Rollupジョブの検索

Get Rollup Jobs APIを利用することでジョブIDからRollupジョブを検索できます。
(ジョブIDの代わりに_allを指定することで、全てのRollupジョブを取得できます)

エンドポイント
GET _xpack/rollup/job/{ジョブID}

また、以下のエンドポイントから指定したインデックスパターンで有効なRollupジョブを取得することも出来ます。
(インデックスパターンの代わりに_allを指定することで、全ての有効なRollupジョブを取得できます)

エンドポイント
GET _xpack/rollup/data/{インデックスパターン}

Rollupジョブの停止

Stop Job APIを利用することでジョブを停止できます。

エンドポイント
POST _xpack/rollup/job/{停止するジョブID}/_stop

Rollupジョブの削除

Delete Job APIを利用することでジョブを削除できます。

エンドポイント
DELETE _xpack/rollup/job/{削除するジョブID}

ジョブを削除してもメタデータにジョブIDが残っているようで、もう一度同じジョブIDでジョブを作成しようとするとエラーになってしまいます。

Kibana上での表示

Rollup後インデックスに対するIndex Patternsを作成することで、Kibana上でRollup後ドキュメントを可視化することが可能です。
ただし、Visualizeを利用する際に通常のインデックスとRollup後インデックスを同じフィールド名で1つにまとめることができないので、DashboardにそれぞれのVisualizeを並べて表示したり、Rollup後インデックスに最新のドキュメントまで格納しておく等の工夫が必要になります。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5