Elasticsearchの6.3.0からRollupというインデックスのドキュメントを集計して別のインデックスに保存することで、保存しておくドキュメントを圧縮する機能が追加されました。
例えばサーバーのアクセスログやメトリックデータ等の時系列データについて、直近のデータは1秒間隔で参照したいけど1ヵ月前のデータは1時間間隔で集計されたデータが参照出来れば十分、といった場合にRollup機能が活用できます。
なお、RollupはX-PackのBASICライセンスで利用可能なので、Elasticsearchのバージョンを6.3.0以降にアップグレードするだけで利用可能です。
環境
Elasticsearch 6.3.2
サンプルデータ
以下のように、日付、URL、ステータスコード、レスポンス時間を持っている簡素なサーバーログが1日分存在します。
{
"date": "2018-07-01T00:00:00",
"response_time": 591,
"status_code": 200,
"url": "/test/huga/"
},
{
"date": "2018-07-01T00:00:10",
"response_time": 632,
"status_code": 404,
"url": "/test/hoge/"
},
{
"date": "2018-07-01T00:00:20",
"response_time": 1237,
"status_code": 500,
"url": "/test/piyo/"
}
Rollupジョブの作成
Create Job APIを利用して、Rollup Jobを作成できます。
PUT _xpack/rollup/job/{任意のジョブID}
{
"index_pattern": "access-log*",
"rollup_index": "rollup_access-log",
"cron": "* * * * * ?",
"page_size": 1000,
"groups": {
"date_histogram": {
"field": "date",
"interval": "1h"
},
"terms": {
"fields": [
"url",
"status_code"
]
}
},
"metrics": [
{
"field": "response_time",
"metrics": [
"avg",
"max"
]
}
]
}
リクエストボディに以下のような内容を設定します。(Rollupジョブ設定内容の公式ドキュメント)
項目 | 設定内容 |
---|---|
index_pattern | Rollup対象となるインデックスのパターン |
rollup_index | Rollup後のドキュメントを保存するインデックス |
cron | cron形式のジョブ実行間隔(設定したインデックス名が既存のマッピング設定にマッチしてしまうとそのマッピングが反映されてしまうので注意) |
page_size | 1回のAggregation毎に処理するバケット数(値が大きいほど処理速度とメモリ使用量が増加します) |
groups | グルーピングするフィールド名やグルーピング方法 |
metrics | 収集するメトリック |
Groupsの設定
groups
には、Rollupする際のグルーピング方法を設定します。
グルーピング方法として以下が利用可能です。
-
Date Histogram
日付データに対する集計を行います。date_histogram
の設定は必須です。
項目 | 必須 | 設定内容 |
---|---|---|
field | 〇 | 集約する日付型フィールド名 |
interval | 〇 | 集約する時間間隔 |
delay | Rollup対象ドキュメントの上限(既定値は全データがRollup対象) | |
time_zone | フィールドのタイムゾーン(既定値はUTC) |
interval
とdelay
にはElasticsearchで利用可能な日付フォーマットで設定を行います。
delay
に値を設定した場合、前回のジョブ実行時から現在時刻 - 設定値の日付データまでがRollup対象となります。
(例えば"1d"
を設定した場合、now - 1d
までのデータがRollup対象となります)
この設定は、例えばドキュメントの更新間隔がランダムでリアルタイムにはRollup出来ないとき、毎日00:00に前日分までのドキュメントをRollupさせるようなジョブを作成する場合に利用します。
-
Terms
keyword型か数値型のフィールド値に対する集計を行います。
項目 | 必須 | 設定内容 |
---|---|---|
fields | 〇 | 集約するkeyword型か数値型フィールド名 |
-
Histogram
数値データに対する集計を行います。
項目 | 必須 | 設定内容 |
---|---|---|
fields | 〇 | 集約する数値型フィールド名 |
interval | 〇 | 集約する間隔 |
interval
にはヒストグラムの階級の間隔を設定します。
例えば5を設定した場合、0-5, 5-10...
のような階級となります。
この値はfields
で設定した全フィールドで共通です。
Metricsの設定
metrics
には、Rollup後インデックスに集計して保存しておきたい集計データを設定します。
項目 | 必須 | 設定内容 |
---|---|---|
field | 〇 | 集計する数値型フィールド名 |
metrics | 〇 | 集計方法(min/max/sum/avg/value_count) |
avg
を選択した場合、groups
に設定した集約間隔単位での平均値となります。
そのため、その集約間隔単位をさらにまとめた単位での平均を取得したい(例えば1h単位での集約結果を元に3h単位での平均を取得したい)場合、avg
を利用すると平均の平均となってしまい正しい平均値が取得できません。
こういった場合はsum
とvalue_count
の集計結果を保存しておくことで、任意の単位での平均値を求めることが出来ます。
Rollupジョブの開始
Create Job APIを実行してジョブを作成した直後はRollupジョブが停止状態になっています。
Start Job APIを利用することでジョブを開始できます。
POST _xpack/rollup/job/{開始するジョブID}/_start
ジョブが実行されるとRollup後インデックスに以下のようなドキュメントが追加されます。
{
"_index": "rollup_access-log_14",
"_type": "_doc",
"_id": "674066619",
"_score": 1,
"_source": {
"url.terms._count": 37,
"url.terms.value": "/test/hoge/",
"date.date_histogram._count": 37,
"status_code.terms.value": 200,
"response_time.avg._count": 37,
"response_time.max.value": 1472,
"date.date_histogram.timestamp": 1530403200000,
"date.date_histogram.interval": "1h",
"response_time.avg.value": 28563,
"_rollup.version": 1,
"date.date_histogram.time_zone": "UTC",
"status_code.terms._count": 37,
"_rollup.id": "rollup_test_14"
}
},
{
"_index": "rollup_access-log_14",
"_type": "_doc",
"_id": "1469377731",
"_score": 1,
"_source": {
"url.terms._count": 25,
"url.terms.value": "/test/hoge/",
"date.date_histogram._count": 25,
"status_code.terms.value": 404,
"response_time.avg._count": 25,
"response_time.max.value": 1450,
"date.date_histogram.timestamp": 1530403200000,
"date.date_histogram.interval": "1h",
"response_time.avg.value": 17739,
"_rollup.version": 1,
"date.date_histogram.time_zone": "UTC",
"status_code.terms._count": 25,
"_rollup.id": "rollup_test_14"
}
}
ドキュメントの取得
Rollup後ドキュメントのみを取得
Rollup後ドキュメントを検索するには、以下のエンドポイントへリクエストを行います。
Rollup後ドキュメントは集計済みのデータになるため、リクエストボディに集計条件を指定する必要があります。
また、size
は0を指定しておく必要があります。
GET {インデックスパターン}/_rollup_search
{
"size": 0,
"aggs": {
"date_hist": {
"date_histogram": {
"field": "date",
"interval": "1h"
},
"aggs": {
"avg_response_time": {
"avg": {
"field": "response_time"
}
}
}
}
}
}
{
"aggregations": {
"date_hist": {
"meta": {},
"buckets": [
{
"key_as_string": "2018-07-01T00:00:00.000Z",
"key": 1530403200000,
"doc_count": 261,
"avg_response_time": {
"value": 730.3371647509579
}
},
{
"key_as_string": "2018-07-01T01:00:00.000Z",
"key": 1530406800000,
"doc_count": 360,
"avg_response_time": {
"value": 738.6527777777778
}
},
・・・
]
}
}
}
インデックスパターンの適用対象には、通常のインデックスとRollup後インデックスの両方が含まれますが、Rollup後インデックスは1つのみになるようにする必要があります。
(複数含まれていると上手く検索できません)
また、集計条件に指定出来るのはRollupジョブの作成時に設定した条件が対象となるので、それ以外の条件は指定出来ません。
例えば今回の例ではresponse_time
はavg
とmax
のみをジョブ設定で指定しているため、min
の集計を実行しようとするとエラーになってしまいます。
{
"size": 0,
"aggs": {
"min_response_time": {
"min": {
"field": "response_time"
}
}
}
}
{
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "There is not a rollup job that has a [min] agg with name [min_response_time] which also satisfies all requirements of query."
}
],
"type": "illegal_argument_exception",
"reason": "There is not a rollup job that has a [min] agg with name [min_response_time] which also satisfies all requirements of query."
},
"status": 400
}
通常のドキュメントとRollup後ドキュメントを取得
以下のように通常のインデックスとRollup後インデックスを,区切りで指定すれば、それぞれのインデックスからドキュメントを取得することが出来ます。
リクエストボディに設定する条件は、Rollup後ドキュメントのみを取得する場合と同様です。
GET {通常のインデックスのパターン},{Rollup後インデックスのパターン}/_rollup_search
Rollup後ドキュメントの圧縮量
Rollup後ドキュメントがどのくらい圧縮されるのかは、Rollupジョブの設定によります。
groups
で細かくグルーピングの設定を行っているとその分圧縮率は減りますが、ある程度元データに近い状態で保存しておくことが出来ます。
逆にグルーピングの設定量が少なければ圧縮率はあがりますが、元データから欠損する情報量は増えます。
この辺りは利用目的に合わせて適宜設定してください。
(参考までに今回の例だと 522.8KB -> 34.1KB に圧縮されました)
その他
Rollupジョブの検索
Get Rollup Jobs APIを利用することでジョブIDからRollupジョブを検索できます。
(ジョブIDの代わりに_all
を指定することで、全てのRollupジョブを取得できます)
GET _xpack/rollup/job/{ジョブID}
また、以下のエンドポイントから指定したインデックスパターンで有効なRollupジョブを取得することも出来ます。
(インデックスパターンの代わりに_all
を指定することで、全ての有効なRollupジョブを取得できます)
GET _xpack/rollup/data/{インデックスパターン}
Rollupジョブの停止
Stop Job APIを利用することでジョブを停止できます。
POST _xpack/rollup/job/{停止するジョブID}/_stop
Rollupジョブの削除
Delete Job APIを利用することでジョブを削除できます。
DELETE _xpack/rollup/job/{削除するジョブID}
ジョブを削除してもメタデータにジョブIDが残っているようで、もう一度同じジョブIDでジョブを作成しようとするとエラーになってしまいます。
Kibana上での表示
Rollup後インデックスに対するIndex Patternsを作成することで、Kibana上でRollup後ドキュメントを可視化することが可能です。
ただし、Visualizeを利用する際に通常のインデックスとRollup後インデックスを同じフィールド名で1つにまとめることができないので、DashboardにそれぞれのVisualizeを並べて表示したり、Rollup後インデックスに最新のドキュメントまで格納しておく等の工夫が必要になります。