はじめに
今回はマニアックですが、便利な機能について簡単にお話したいと思います。
Elasticsearch のインデックスの派生で Data Streams という概念が出てきた事をご存知の方は多いと思います。Data Streams は時系列データに特化したもので、ILMの操作等がよりシンプルになるため、Elastic の integrations も Data Streams を使うようになってきています。
バージョン 9.1 で、Data Streams に新しい Failure Store という機能がリリースされましたので簡単にご紹介したいと思います。(英語ブログ)
Failure Store
Failure Store は Data Streams の書き込み失敗時のログを Elasticsearch 自体に残す画期的な機能です。更に書き込みが失敗した場合でもレスポンスは 200 を返すので、何事もなく処理が進みます(失敗した事はレスポンスに書いています)。
詳細はドキュメントにありますが、Failure Store に書き込まるパターン、書き込まれないパターンがあります。
Failure Store に書き込まれるパターン
- インジェストパイプラインの失敗
- スキーマのマッピングの不整合による書き込み失敗
Failure Store に書き込まれないパターン
- バックプレッシャーによる書き込み失敗
- ドキュメントのバージョンコンフリクトによる書き込みの失敗
仕組みは簡単で、専用の index (Data Streams の裏のインデックスが .ds-* で始まるように、failure store は .fs-* というインデックス) が作成されます。
Failure Store に入っているデータを参照する場合はインデックス名の最後に ::failures と追記すると、Failure Store を検索します。
色々説明するより実際の動きを見る方がわかりやすいと思いますので、スニペットと実行結果で挙動を説明しようと思います。試した環境 Elastic Cloud Hosted v9.2.2 です。
実際の挙動
大前提として、 Index pattern で Data Stream を指定し作成します。そこで、マッピングも指定していますが、id フィールドを integer に指定しています。数値型以外を入れると失敗するはずです。
次に ingest pipeline を作成しています。ここでは、存在しないフィールドを rename する処理を行うように設定しました。
最後に _bulk を使って複数ドキュメントを書く込みます。わざと "id": "invalid_text" というマッピングの不整合を起こすようなドキュメントを1つ仕込んであります。
実験1 - 今まで通りの Data Streams
前提
- Failure Store 無効
- Ingest pipeline 無効
PUT _index_template/index-template
{
"index_patterns": [
"datastream-*"
],
"data_stream": {},
"template": {
"data_stream_options": {
"failure_store": {
"enabled": false
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"id": {
"type": "integer"
}
}
}
}
}
PUT _ingest/pipeline/test_pipeline
{
"processors": [
{
"rename": {
"field": "nonexistent",
"target_field": "exists"
}
}
]
}
POST datastream-001/_bulk
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1234}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1235}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": "invalid_text"}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1236}
結果
POST datastream-001/_bulk
{
"errors": true,
"took": 0,
"items": [
{
"create": {
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "fZkvB5sBuCFYC3yynYBX",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1,
"status": 201
}
},
{
"create": {
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "fpkvB5sBuCFYC3yynYBX",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1,
"status": 201
}
},
{
"create": {
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "f5kvB5sBuCFYC3yynYBX",
"status": 400,
"failure_store": "not_enabled",
"error": {
"type": "document_parsing_exception",
"reason": "[1:46] failed to parse field [id] of type [integer] in document with id 'f5kvB5sBuCFYC3yynYBX'. Preview of field's value: 'invalid_text'",
"caused_by": {
"type": "number_format_exception",
"reason": "For input string: \"invalid_text\""
}
}
}
},
{
"create": {
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "gJkvB5sBuCFYC3yynYBX",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 2,
"_primary_term": 1,
"status": 201
}
}
]
}
GET datastream-001/_search
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "aJkZB5sBuCFYC3yy74Cd",
"_score": 1,
"_source": {
"@timestamp": "2025-05-01T00:00:00Z",
"id": 1234
}
},
{
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "aZkZB5sBuCFYC3yy74Cd",
"_score": 1,
"_source": {
"@timestamp": "2025-05-01T00:00:00Z",
"id": 1235
}
},
{
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "a5kZB5sBuCFYC3yy74Cd",
"_score": 1,
"_source": {
"@timestamp": "2025-05-01T00:00:00Z",
"id": 1236
}
}
]
}
}
結果としてドキュメントが3つ書き込まれています。失敗したドキュメントのレスポンスは status: 400 となっています。
書き込みを失敗した情報がどこにあるかというと、Elasticsearch のログに吐き出されています。
[Elastic Cloud のコンソールのログ画面から抜粋]

実験2 - Failure Stores を有効化してみる
前提
- Failure Store 有効
- Ingest pipeline 無効
PUT _index_template/index-template
{
"index_patterns": [
"datastream-*"
],
"data_stream": {},
"template": {
"data_stream_options": {
"failure_store": {
"enabled": true
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"id": {
"type": "integer"
}
}
}
}
}
PUT _ingest/pipeline/test_pipeline
{
"processors": [
{
"rename": {
"field": "nonexistent",
"target_field": "exists"
}
}
]
}
POST datastream-001/_bulk
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1234}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1235}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": "invalid_text"}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1236}
結果
POST datastream-001/_bulk
{
"errors": false,
"took": 200,
"items": [
{
"create": {
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "gZkxB5sBuCFYC3yyJIDd",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1,
"status": 201
}
},
{
"create": {
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "gpkxB5sBuCFYC3yyJIDd",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1,
"status": 201
}
},
{
"create": {
"_index": ".fs-datastream-001-2025.12.10-000002",
"_id": "hZkxB5sBuCFYC3yyJYBR",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1,
"failure_store": "used",
"status": 201
}
},
{
"create": {
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "hJkxB5sBuCFYC3yyJIDd",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 2,
"_primary_term": 1,
"status": 201
}
}
]
}
GET datastream-001/_search
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "bJkhB5sBuCFYC3yybICZ",
"_score": 1,
"_source": {
"@timestamp": "2025-05-01T00:00:00Z",
"id": 1234
}
},
{
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "bZkhB5sBuCFYC3yybICZ",
"_score": 1,
"_source": {
"@timestamp": "2025-05-01T00:00:00Z",
"id": 1235
}
},
{
"_index": ".ds-datastream-001-2025.12.10-000001",
"_id": "b5khB5sBuCFYC3yybICZ",
"_score": 1,
"_source": {
"@timestamp": "2025-05-01T00:00:00Z",
"id": 1236
}
}
]
}
}
GET datastream-001::failures/_search
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": ".fs-datastream-001-2025.12.10-000002",
"_id": "cJkhB5sBuCFYC3yybID9",
"_score": 1,
"_source": {
"@timestamp": "2025-12-10T07:19:49.591Z",
"document": {
"id": "bpkhB5sBuCFYC3yybICZ",
"index": "datastream-001",
"source": {
"@timestamp": "2025-05-01T00:00:00Z",
"id": "invalid_text"
}
},
"error": {
"type": "document_parsing_exception",
"message": "[1:46] failed to parse field [id] of type [integer] in document with id 'bpkhB5sBuCFYC3yybICZ'. Preview of field's value: 'invalid_text'",
"stack_trace": """o.e.i.m.DocumentParsingException: [1:46] failed to parse field [id] of type [integer] in document with id 'bpkhB5sBuCFYC3yybICZ'. Preview of field's value: 'invalid_text'
at o.e.i.m.FieldMapper.rethrowAsDocumentParsingException(FieldMapper.java:241)
at o.e.i.m.FieldMapper.parse(FieldMapper.java:194)
... 24 more
Caused by: j.l.NumberFormatException: For input string: "invalid_text"
at j.i.m.FloatingDecimal.check(FloatingDecimal.java:2324)
at j.i.m.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1928)
... 33 more
"""
}
}
}
]
}
}
結果としてドキュメントが3つ書き込まれています。失敗したドキュメントの _bulk のレスポンスには "failure_store": "used", という記載がありますが通常の成功と同じ形で返ってきています。
GET datastream-001::failures/_search を検索すると、失敗した内容が残されています。書き込みを失敗した _source も残っているので、実際にどのドキュメントがどのように失敗したのかがわかります。
実験3 - 今まで通りの Data Streams で ingest pipeline を有効化
前提
- Failure Store 無効
- Ingest pipeline 有効
PUT _index_template/index-template
{
"index_patterns": [
"datastream-*"
],
"data_stream": {},
"template": {
"data_stream_options": {
"failure_store": {
"enabled": false
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"id": {
"type": "integer"
}
}
}
}
}
PUT _ingest/pipeline/test_pipeline
{
"processors": [
{
"rename": {
"field": "nonexistent",
"target_field": "exists"
}
}
]
}
POST datastream-001/_bulk?pipeline=test_pipeline
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1234}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1235}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": "invalid_text"}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1236}
結果
POST datastream-001/_bulk?pipeline=test_pipeline
{
"errors": true,
"took": 0,
"ingest_took": 0,
"items": [
{
"create": {
"_index": "datastream-001",
"_id": "auto-generated",
"status": 400,
"failure_store": "not_enabled",
"error": {
"type": "illegal_argument_exception",
"reason": "field [nonexistent] doesn't exist"
}
}
},
{
"create": {
"_index": "datastream-001",
"_id": "auto-generated",
"status": 400,
"failure_store": "not_enabled",
"error": {
"type": "illegal_argument_exception",
"reason": "field [nonexistent] doesn't exist"
}
}
},
{
"create": {
"_index": "datastream-001",
"_id": "auto-generated",
"status": 400,
"failure_store": "not_enabled",
"error": {
"type": "illegal_argument_exception",
"reason": "field [nonexistent] doesn't exist"
}
}
},
{
"create": {
"_index": "datastream-001",
"_id": "auto-generated",
"status": 400,
"failure_store": "not_enabled",
"error": {
"type": "illegal_argument_exception",
"reason": "field [nonexistent] doesn't exist"
}
}
}
]
}
GET datastream-001/_search
{
"error": {
"root_cause": [
{
"type": "index_not_found_exception",
"reason": "no such index [datastream-001]",
"resource.type": "index_or_alias",
"resource.id": "datastream-001",
"index_uuid": "_na_",
"index": "datastream-001"
}
],
"type": "index_not_found_exception",
"reason": "no such index [datastream-001]",
"resource.type": "index_or_alias",
"resource.id": "datastream-001",
"index_uuid": "_na_",
"index": "datastream-001"
},
"status": 404
}
結果として、すべての書き込みが失敗しています。そもそも書き込みすらされていないので Data Stream すら作成されていません。Ingest pipeline も失敗時の処理を指定していないのでログもありません。
実験4 - Failure Stores と ingest pipeline を有効化
前提
- Failure Store 有効
- Ingest pipeline 有効
PUT _index_template/index-template
{
"index_patterns": [
"datastream-*"
],
"data_stream": {},
"template": {
"data_stream_options": {
"failure_store": {
"enabled": true
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"id": {
"type": "integer"
}
}
}
}
}
PUT _ingest/pipeline/test_pipeline
{
"processors": [
{
"rename": {
"field": "nonexistent",
"target_field": "exists"
}
}
]
}
POST datastream-001/_bulk?pipeline=test_pipeline
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1234}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1235}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": "invalid_text"}
{"create":{}}
{"@timestamp": "2025-05-01T00:00:00Z", "id": 1236}
結果
{
"errors": false,
"took": 200,
"ingest_took": 0,
"items": [
{
"create": {
"_index": ".fs-datastream-001-2025.12.10-000001",
"_id": "hpk1B5sBuCFYC3yy6oB9",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1,
"failure_store": "used",
"status": 201
}
},
{
"create": {
"_index": ".fs-datastream-001-2025.12.10-000001",
"_id": "h5k1B5sBuCFYC3yy6oB9",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1,
"failure_store": "used",
"status": 201
}
},
{
"create": {
"_index": ".fs-datastream-001-2025.12.10-000001",
"_id": "iJk1B5sBuCFYC3yy6oB9",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 2,
"_primary_term": 1,
"failure_store": "used",
"status": 201
}
},
{
"create": {
"_index": ".fs-datastream-001-2025.12.10-000001",
"_id": "iZk1B5sBuCFYC3yy6oB9",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 3,
"_primary_term": 1,
"failure_store": "used",
"status": 201
}
}
]
}
GET datastream-001/_search
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 0,
"relation": "eq"
},
"max_score": null,
"hits": []
}
}
GET datastream-001::failures/_search
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 4,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": ".fs-datastream-001-2025.12.10-000001",
"_id": "hpk1B5sBuCFYC3yy6oB9",
"_score": 1,
"_source": {
"@timestamp": "2025-12-10T07:42:12.416Z",
"document": {
"index": "datastream-001",
"source": {
"@timestamp": "2025-05-01T00:00:00Z",
"id": 1234
}
},
"error": {
"type": "illegal_argument_exception",
"message": "field [nonexistent] doesn't exist",
"stack_trace": """j.l.IllegalArgumentException: field [nonexistent] doesn't exist
at o.e.i.c.RenameProcessor.execute(RenameProcessor.java:71)
at o.e.i.CompoundProcessor.innerExecute(CompoundProcessor.java:171)
... 21 more
""",
"pipeline_trace": [
"test_pipeline"
],
"pipeline": "test_pipeline",
"processor_type": "rename"
}
}
},
{
"_index": ".fs-datastream-001-2025.12.10-000001",
"_id": "h5k1B5sBuCFYC3yy6oB9",
"_score": 1,
"_source": {
"@timestamp": "2025-12-10T07:42:12.417Z",
"document": {
"index": "datastream-001",
"source": {
"@timestamp": "2025-05-01T00:00:00Z",
"id": 1235
}
},
"error": {
"type": "illegal_argument_exception",
"message": "field [nonexistent] doesn't exist",
"stack_trace": """j.l.IllegalArgumentException: field [nonexistent] doesn't exist
at o.e.i.c.RenameProcessor.execute(RenameProcessor.java:71)
at o.e.i.CompoundProcessor.innerExecute(CompoundProcessor.java:171)
... 21 more
""",
"pipeline_trace": [
"test_pipeline"
],
"pipeline": "test_pipeline",
"processor_type": "rename"
}
}
},
{
"_index": ".fs-datastream-001-2025.12.10-000001",
"_id": "iJk1B5sBuCFYC3yy6oB9",
"_score": 1,
"_source": {
"@timestamp": "2025-12-10T07:42:12.417Z",
"document": {
"index": "datastream-001",
"source": {
"@timestamp": "2025-05-01T00:00:00Z",
"id": "invalid_text"
}
},
"error": {
"type": "illegal_argument_exception",
"message": "field [nonexistent] doesn't exist",
"stack_trace": """j.l.IllegalArgumentException: field [nonexistent] doesn't exist
at o.e.i.c.RenameProcessor.execute(RenameProcessor.java:71)
at o.e.i.CompoundProcessor.innerExecute(CompoundProcessor.java:171)
... 21 more
""",
"pipeline_trace": [
"test_pipeline"
],
"pipeline": "test_pipeline",
"processor_type": "rename"
}
}
},
{
"_index": ".fs-datastream-001-2025.12.10-000001",
"_id": "iZk1B5sBuCFYC3yy6oB9",
"_score": 1,
"_source": {
"@timestamp": "2025-12-10T07:42:12.417Z",
"document": {
"index": "datastream-001",
"source": {
"@timestamp": "2025-05-01T00:00:00Z",
"id": 1236
}
},
"error": {
"type": "illegal_argument_exception",
"message": "field [nonexistent] doesn't exist",
"stack_trace": """j.l.IllegalArgumentException: field [nonexistent] doesn't exist
at o.e.i.c.RenameProcessor.execute(RenameProcessor.java:71)
at o.e.i.CompoundProcessor.innerExecute(CompoundProcessor.java:171)
... 21 more
""",
"pipeline_trace": [
"test_pipeline"
],
"pipeline": "test_pipeline",
"processor_type": "rename"
}
}
}
]
}
}
結果として、すべての書き込みが失敗していますが、レスポンスは書き込み成功した場合とほぼ同等で、Data Stream も作成されています。Ingest Pipeline の処理が失敗したのですが、Failure Store 側に処理の失敗内容が残っています。
まとめ
簡単に 4パターンほど試してみましたが、失敗がちゃんと残るというのはデバッグがかなり楽になりますし、すべて Elasticsearch 内にあるので、通常のインデックス同様に検索や集計処理もかけられます。
例えば下記のように、pipeline毎に何度失敗したか確認出来たりします。
GET datastream-001::failures/_search
{
"size": 0,
"aggs": {
"count": {
"terms": {
"field": "error.pipeline"
}
}
}
}
ただし一点だけ大きな弱点があるとすると、Elasticsearch に書き込まれるということは、その分だけディスクを使用するといく事になります。大量の失敗が続くとディスクに利用がとんでもない事になるかもしれません。少し注意が必要そうですね。
とはいえ、「ログを集めるのが面倒」から「設定したら勝手にElasticsearchに溜まる」に出来る事はかなり良い事ではないでしょうか。今まであえてログを収集しなければ見る事すら出来なかったのでより運用しやすくなっていくのではないでしょうか。
関連URL