5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

NTTテクノクロスAdvent Calendar 2024

Day 22

Elasticsearchのデータを抜け漏れなく全件エクスポートする方法

Last updated at Posted at 2024-12-21

はじめに

この記事は、NTTテクノクロス Advent Calendar 2024 シリーズ2の22日目の記事です。

こんにちは、NTTテクノクロスの長江です。Elasticsearchを活用したログ基盤の設計/構築/保守などを担当しています。過去にはビッグデータの活用/分析のPythonエンジニアもやっていました。

最近Elasticsearchの導入や更改にあたって、「Elasticsearchのドキュメント (データ1件のこと) を全件取得し、外部へエクスポートしたい」という要望をよく耳にします。収集しているデータを外部の分析ツールで加工・集計し、活用していきたいケースが増加しているようです。

Elasticsearchには、標準で多くの可視化機能機械学習機能が備わっています。最近ではクエリ言語「ES|QL」がリリースされ、加工や集計もラクになりました。

ですが、どうしても生ドキュメントを抜け漏れなく全件取得してエクスポートしたいシチュエーションも存在するため、ご紹介します。動作確認した環境はElasticsearch 8.13です。

本来Elasticsearchへドキュメントを投入することを「インデックス」(動詞) 、取り出すことを「検索」と呼びますが、本記事ではユースケースも踏まえ、よくあるデータ連携の言い方として「投入」 「取得」と表現します。

結論

最初に結論を述べます。

  • ドキュメントをソートして取得するクエリを実行します。1回で取得できなかった残り分を取得するための search_after オプションを付与し、空になるまで繰り返し実行することで全件取得可能です。
  • ドキュメントが順次投入されている中で「この瞬間の全件を取得する」には「Point in time」機能が有効です。
  • 一方、定期投入されるデータを重複なく定期取得していくには、Elasticsearchへ投入された瞬間の時刻を付与しておいて、時間範囲を指定しながら定期取得する方法が有効です。

全件取得の難しさ

まずは普通にElasticsearchからデータを取得する方法を見てみましょう。普通の _search クエリだと、デフォルトで10件までしか取得しません。

取得のクエリ
GET some-index/_search
出力結果 (重要な部分のみ抜粋)
{
  "hits": {
    "hits": [
      {
        "_source": {
          "@timestamp": "2024-01-01T09:00:00.000Z",
          "host": {
            "name": "aaa-server-01"
          },
          "system": {
            "cpu": {
              "total": {
                "pct": 0.125
              }
            }
          }
        }
      },
      (中略)
      {
        "_source": {
          "@timestamp": "2024-01-02T09:00:00.000Z",
          "host": {
            "name": "aaa-server-01"
          },
          "system": {
            "cpu": {
              "total": {
                "pct": 0.25
              }
            }
          }
        }
      },
    ]
  }
}

size パラメータを指定すると取得件数を変更できますが、次のように上限10000を超えるとエラーになってしまいます。

取得ドキュメント数を指定した取得のクエリ
GET some-index/_search
{
  "size": 20000
}
出力結果 (重要な部分のみ抜粋)
{
  "error": {
    "root_cause": [
      {
        "type": "illegal_argument_exception",
        "reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [20000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."
      },
      (中略)
    ]
    (中略)
  }
  (中略)
}

回避策として、エラーメッセージの2文目にも書かれている「Scroll API」がかつてはよく使われていました。しかし、本記事執筆時の最新版である8.17では、この用途でのScroll APIの使用は推奨されていません

また、3文目に書かれているIndexの設定項目 index.max_result_window を変更することで上限を大きくできますが、経験上パフォーマンス影響が大きいためあまりオススメではありません。

search_afterを使ったドキュメントの連続取得

そこで、現在推奨されている全件取得の方法をご紹介します。

指定したフィールドでソートした状態でドキュメントを取得し、1回のクエリで取得しきれなかった「残り」を取得するクエリを繰り返し実行することで、ドキュメントを少しずつ全件取得することが可能です。

まずは、ソートする設定を含めたドキュメント取得を実行します。例では @timestamp, host.name で昇順ソートし、1000件取得します。

ソートを追加した全件取得クエリ
GET some-index/_search
{
  "size": 1000,
  "sort": [
    {
      "@timestamp": "asc"
    },
    {
      "host.name": "asc"
    }
  ]
}
出力結果 (重要な部分のみ抜粋)
{
  "hits": {
    "hits": [
      {
        "_source": {
          "@timestamp": "2024-01-01T09:00:00.000Z",
          "host": {
            "name": "aaa-server-01"
          },
          "system": {
            "cpu": {
              "total": {
                "pct": 0.125
              }
            }
          }
        },
        "sort": [
          1704034800000,
          "aaa-server-01"
        ]
      },
      (中略)
      {
        "_source": {
          "@timestamp": "2024-02-01T09:00:00.000Z",
          "host": {
            "name": "aaa-server-01"
          },
          "system": {
            "cpu": {
              "total": {
                "pct": 0.25
              }
            }
          }
        },
        "sort": [
          1706713200000,
          "aaa-server-01"
        ]
      },
    ]
  }
}

取得結果の最後のドキュメントに含まれる sort フィールドを、 search_after に指定して続きを取得します。

sortフィールド部分の抜粋
        "sort": [
          1706713200000,
          "aaa-server-01"
        ]
続きの取得クエリ
GET some-index/_search
{
  "size": 1000,
  "sort": [
    {
      "@timestamp": "asc"
    },
    {
      "host.name": "asc"
    }
  ],
  "search_after": [
    1706713200000,
    "aaa-server-01"
  ]
}
出力結果 (重要な部分のみ抜粋)
{
  "hits": {
    "hits": [
      {
        "_source": {
          "@timestamp": "2024-02-01T09:10:00.000Z",
          "host": {
            "name": "aaa-server-01"
          },
          "system": {
            "cpu": {
              "total": {
                "pct": 0.125
              }
            }
          }
        },
        "sort": [
          1704035400000,
          "aaa-server-01"
        ]
      },
      (中略)
      {
        "_source": {
          "@timestamp": "2024-03-01T09:00:00.000Z",
          "host": {
            "name": "aaa-server-01"
          },
          "system": {
            "cpu": {
              "total": {
                "pct": 0.25
              }
            }
          }
        },
        "sort": [
          1709218800000,
          "aaa-server-01"
        ]
      },
    ]
  }
}

これを、取得結果が空になるまで繰り返すことで全件取得可能です。

ドキュメントの取得アプローチ

ここまででドキュメントの全件取得は出来るようになりました。しかし、データとは得てして流動的なものです。ドキュメント投入が完了したIndexから一度取得しておしまいではなく、ドキュメント投入と並行して取得したり、取得を何度も行ったりする必要があることがほとんどです。

例えば、

  • ある時点の状態の完全なコピーを取り出して外部で分析したいような「今あるドキュメントだけを全て取得したい」ケース
  • Elasticsearchをデータ連携の中継地点のようにみなして「定期投入されるドキュメントを重複なく定期取得したい」ケース

等が考えられます。

どちらも search_after を用いて実現しますが、それぞれもう少し考慮が必要です。

今あるドキュメントだけを全て取得したい

多くの場合、Elasticsearchへは継続的にドキュメントが投入され続けています。投入の裏で「この瞬間の全件を取得する」際は注意が必要です。 search_after はあくまで「ソートした時のこの値より後ろを取得する」という指定であるため、

  • 最初のクエリ結果: 同時刻に別で投入された、新規ドキュメントを含まない
  • 続きのクエリ結果: 最初のクエリのタイミングで投入された、新規ドキュメントを含む

という一貫性のない状態で取得を行ってしまう可能性があります。

そんな時は、Point in time (以下PIT) が有効です。PITは、その瞬間のIndexの状態を一時保存し、たとえ実際には新しいドキュメントの投入が行われていても、元の状態を保って取得できるようにする機構です。

まずはPITを作成します。作成時には有効期間の指定が必要で、例では2分間を指定しています。

PIT作成クエリ
POST some-index/_pit?keep_alive=2m
出力結果
{
  "id": "uMyMBAEnLmRzLW1ldHJpY2JlYXQtOC4xMy4yLTIwMjQuMDQuMjUtMDAwMDAxFkZ5Zk5MTi16UW1DM3FDRGVRTXNYZEEAFjJhVU0tRGdmU2I2SzVkV0diZlR1a1EAAAAAAABjQwcWbGlmQ0ptV3JUWnlSZzBIa2l3MVc0QQABFkZ5Zk5MTi16UW1DM3FDRGVRTXNYZEEAAA=="
}

得られたPITのIDを使ってクエリを実行します。一通りドキュメント取得が終わるまでは同じPIT IDを指定し続けます。

PITは作成時に指定したIndex (複数やDataStreamも可) に紐づいているため、取得クエリでは対象Indexを指定せず、代わりにPIT IDを指定して実行します。また、クエリ連続実行中にPITが失効してしまわないよう、有効期間の延長設定も指定します。

PITを指定したクエリ
GET _search
{
  "size": 1000,
  "sort": [
    {
      "@timestamp": "asc"
    },
    {
      "host.name": "asc"
    }
  ],
  "pit": {
    "id": "uMyMBAEnLmRzLW1ldHJpY2JlYXQtOC4xMy4yLTIwMjQuMDQuMjUtMDAwMDAxFkZ5Zk5MTi16UW1DM3FDRGVRTXNYZEEAFjJhVU0tRGdmU2I2SzVkV0diZlR1a1EAAAAAAABjQwcWbGlmQ0ptV3JUWnlSZzBIa2l3MVc0QQABFkZ5Zk5MTi16UW1DM3FDRGVRTXNYZEEAAA==",
    "keep_alive": "2m"
  }
}
続きの取得クエリ
GET _search
{
  "size": 1000,
  "sort": [
    {
      "@timestamp": "asc"
    },
    {
      "host.name": "asc"
    }
  ],
  "search_after": [
    1706713200000,
    "aaa-server-01"
  ],
  "pit": {
    "id": "uMyMBAEnLmRzLW1ldHJpY2JlYXQtOC4xMy4yLTIwMjQuMDQuMjUtMDAwMDAxFkZ5Zk5MTi16UW1DM3FDRGVRTXNYZEEAFjJhVU0tRGdmU2I2SzVkV0diZlR1a1EAAAAAAABjQwcWbGlmQ0ptV3JUWnlSZzBIa2l3MVc0QQABFkZ5Zk5MTi16UW1DM3FDRGVRTXNYZEEAAA==",
    "keep_alive": "2m"
  }
}

終わったらPITを削除します。

PIT削除クエリ
DELETE _pit
{
    "id" : "uMyMBAEnLmRzLW1ldHJpY2JlYXQtOC4xMy4yLTIwMjQuMDQuMjUtMDAwMDAxFkZ5Zk5MTi16UW1DM3FDRGVRTXNYZEEAFjJhVU0tRGdmU2I2SzVkV0diZlR1a1EAAAAAAABjQwcWbGlmQ0ptV3JUWnlSZzBIa2l3MVc0QQABFkZ5Zk5MTi16UW1DM3FDRGVRTXNYZEEAAA=="
}

PITの実装背景等詳細は、Elasticsearch公式ブログ記事「Get a consistent view of your data over time with the Elasticsearch point-in-time reader」をご参照ください。

定期投入されるドキュメントを重複なく定期取得したい

続いて、Elasticsearchをデータ連携の中継地点のようにみなし、定期的に投入されてくるドキュメントを重複なく定期的に取得していきたい場合を考えます。

基本的にはタイムスタンプのフィールドでソートして、直近過去一定期間の取得を定期実行してしまえばよいのですが、次のとおり注意すべきパターンが存在します。

  • ドキュメントのタイムスタンプと、実際にElasticsearchへ投入される時刻とに乖離がある (投入されるまでに時間がかかる)
  • ドキュメントの投入がタイムスタンプ順とは限らず、逆転して古いドキュメントが後から投入されることがある

このような場合、そのままではドキュメントの取得漏れが起きてしまう可能性があります。

最もシンプルな対処方法は、ドキュメントに元から入っているタイムスタンプのフィールドではなく、Elasticsearchへの投入時刻のタイムスタンプを別途追加し、そちらをソートに使う方法です。フィールド名は任意ですが、Elasticの推奨フィールド定義一覧であるECS (Elastic Common Schema)event.ingested という名前で定義されているため、今回の例でもこのフィールドを使います。

まずは、Elasticsearchへドキュメントが投入された瞬間のタイムスタンプを追加するIngest pipelineを作成します。

Ingest pipeline作成クエリ
PUT _ingest/pipeline/ingested-timestamp
{
  "processors": [
    {
      "set": {
        "field": "event.ingested",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

以降、投入側はこのIngest pipeline ingested-timestamp を指定して投入するようにします。取得側は、このElasticsearchへの投入タイムスタンプ event.ingested に基づき

  • 取得時間範囲を指定
  • ソートに使用

して取得します。

時間範囲を指定したクエリ
GET some-index/_search
{
  "size": 1000,
  "sort": [
    {
      "event.ingested": "asc"
    },
    {
      "host.name": "asc"
    }
  ],
  "query": {
    "range": {
      "event.ingested": {
        "gte": "2024-03-01T09:00:00.000Z",
        "lt": "2024-03-01T09:10:00.000Z"
      }
    }
  }
}
続きの取得クエリ
GET some-index/_search
{
  "size": 1000,
  "sort": [
    {
      "event.ingested": "asc"
    },
    {
      "host.name": "asc"
    }
  ],
  "search_after": [
    1706713200000,
    "aaa-server-01"
  ],
  "query": {
    "range": {
      "event.ingested": {
        "gte": "2024-03-01T09:00:00.000Z",
        "lt": "2024-03-01T09:10:00.000Z"
      }
    }
  }
}

なお、Elasticsearchはドキュメントの投入・取得ともに高速ですが、投入したドキュメントが取得できるよう反映されるまで少しタイムラグが存在します。実際の取得の際は、投入されてから数分空けた範囲を指定すると良いでしょう。

このタイムラグは、投入されたドキュメントを反映する「リフレッシュ」処理の定期実行頻度の設定項目 index.refresh_interval でチューニング可能です。しかし、高頻度にすると負荷が大きくなるため注意が必要です。

また、境界値の重複取得を防ぐため、期間指定は [以上, 未満) で指定する半開区間が望ましいです。上記の例では、 09:00以上, 09:10未満 を指定しています。次回実行時には、 09:10以上, 09:20未満 を指定します。

おわりに

Elasticsearchのクエリ内でソートして search_after を設定することで、ドキュメントを全件取得出来ることをご紹介しました。また、Point in timeを使ってある瞬間の状態を一時保存して全件を取得する方法や、Elasticsearchへ投入された時刻のタイムスタンプを用いて重複なく定期的に取得する方法もご紹介しました。

今回はElasticsearchだけで実現できるREST APIのクエリをご紹介しましたが、この他にもElasticsearchのドキュメントを取り出して活用するインタフェースやライブラリとして様々なものが存在します。例えば、エクスポートしたドキュメントをElasticsearchの外でデータ分析したいのであれば下記のようなライブラリ製品が存在しており、システム化を見据えているならば導入を検討してみるのも良さそうです。

  • Pandas互換のPython Elasticsearchライブラリ「Eland
    メソッド1つでCSV等への出力が可能です。
  • Hadoopへの連携インタフェースである「Elasticsearch-Hadoop
    Hadoopの入力元/出力先としてElasticsearchを指定可能です。

これらも全件取得が可能ですが、「ある瞬間の状態を一時保存して取得する」であったり、「タイムスタンプを用いて重複なく取得する」といった場合の土台となる考え方は共通です。今回ご紹介した考え方や設定が同様に適用できます。

私たちのチームはこれからもElasticsearchを活用し、より良いデータ収集/活用の基盤を提供していきたいと思います。何かElasticsearchでお困りのことがあればぜひお気軽にご相談ください。
Elasticsearch & 導入支援 | NTTテクノクロス株式会社

ご一読いただき、誠にありがとうございました。
NTTテクノクロス Advent Calendar 2024 シリーズ2 明日23日目は @korodroid さんです。お楽しみに!

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?