やりたいこと:Source indexで「論理削除フラグ」をつけて履歴管理し、Dest indexには削除されていない有効な文書のみをインデックスしたい。
事前準備
DELETE test_latest_transform
PUT test_latest_transform
{
"mappings": {
"properties": {
"delete_flag_datetime":{
"type": "date"
},
"id": {
"type": "keyword"
}
}
},
"settings": {
"index":{
"default_pipeline": "last_updated_timestamp_pipeline"
}
}
}
POST test_latest_transform/_doc
{
"id": "a",
"delete_flag_datetime":"1970-01-01",
"delete_flag": true
}
POST test_latest_transform/_doc
{
"id": "a",
"delete_flag_datetime":"1970-01-01",
"delete_flag": true
}
POST test_latest_transform/_doc
{
"id": "b",
"delete_flag_datetime":"2200-01-01",
"delete_flag": false
}
POST test_latest_transform/_doc
{
"id": "c",
"delete_flag_datetime":"2200-01-01",
"delete_flag": false
}
GET test_latest_transform/_search
rentention_policyを転用する
ややトリッキーですが、retention_policyを活用すると目的を達成することができます。
retention_policy
(Optional, object) Defines a retention policy for the transform. Data that meets the defined criteria is deleted from the destination index.
- Properties of retention_policy
time
(Required, object) Specifies that the transform uses a time field to set the retention policy.
Properties of time
field
(Required, string) The date field that is used to calculate the age of the document.
max_age
(Required, time units) Specifies the maximum age of a document in the destination index. Documents that are older than the configured value are removed from the destination index.
ドキュメントを読むと、本来は「作成日」などのフィールドを指定してある一定期間以上経過したドキュメントを削除するような機能です。
これを削除したいドキュメントは古い日付、残したいactiveなドキュメントは新しい日付とすることで削除フラグとして利用することができます。
PUT _transform/test_retention_policy
{
"dest": {
"index": "test_latest_transform_dest"
},
"latest":{
"sort": "LastUpdatedTimestamp",
"unique_key": "id"
},
"source":{
"index": "test_latest_transform"
},
"retention_policy":{
"time":{
"field": "delete_flag_datetime",
"max_age": "10000d"
}
},
"frequency": "3s",
"sync": {
"time":{
"delay": "3s",
"field": "LastUpdatedTimestamp"
}
}
}
POST _transform/test_retention_policy/_start
GET test_latest_transform_dest/_search?filter_path=hits.hits._source.id,hits.hits._source.delete_flag_datetime
# response
{
"hits" : {
"hits" : [
{
"_source" : {
"id" : "b",
"delete_flag_datetime" : "2200-01-01"
}
},
{
"_source" : {
"id" : "c",
"delete_flag_datetime" : "2022-06-09"
}
}
]
}
}
POST test_latest_transform/_doc
{
"id": "c",
"delete_flag_datetime":"1970-06-10"
}
GET _transform/test_retention_policy/_stats
# 新たなcheckpointが作成され、完了したことを確認する
GET test_latest_transform_dest/_search?filter_path=hits.hits._source.id,hits.hits._source.delete_flag_datetime
# response
{
"hits" : {
"hits" : [
{
"_source" : {
"id" : "b",
"delete_flag_datetime" : "2200-01-01"
}
}
]
}
}
無事にid=cのドキュメントが宛先インデックスから削除されました。
「フラグ」のオン・オフが分かりづらいため、ソース側のインデックスでingest pluginなどを用いてdelete_flagフィールドからdelete_flag_dateフィールドを生成するといった対策を取った方が良いと思われます。
また、falseとしての日付は未来の日付でも問題なかったのでたいていの場合は大丈夫そうですが、max_age
とフラグにした日時によっては〇〇年問題が埋め込まれてしまう可能性もあります。
latest_transform自体が履歴インデックスと親和性の高い仕組みなので、そのうちにdelete_flagの機能が実装されるといいなと思います。
うまくいかなかったこと① source.query
query
(Optional, object) A query clause that retrieves a subset of data from the source index. See Query DSL.
Transform APIでは、source.query
で抽出するデータをフィルターすることができます。更新がうまくいかなそうですが、一応試してみます。
PUT _transform/test_query_with_transform
{
"dest": {
"index": "test_query_with_transform_dest"
},
"latest":{
"sort": "LastUpdatedTimestamp",
"unique_key": "id"
},
"source":{
"index": "test_latest_transform",
"query": {
"term":
{"delete_flag": false}
}
},
"frequency": "3s",
"sync": {
"time":{
"delay": "10s",
"field": "LastUpdatedTimestamp"
}
}
}
POST _transform/test_query_with_transform/_start
GET test_query_with_transform_dest/_search?filter_path=hits.hits._source.id,hits.hits._source.delete_flag
{
"hits" : {
"hits" : [
{
"_source" : {
"id" : "b",
"delete_flag" : false
}
},
{
"_source" : {
"id" : "c",
"delete_flag" : false
}
}
]
}
}
最初はうまくいっているようですが、
POST test_latest_transform/_doc
{
"id": "c",
"delete_flag": true
}
GET _transform/test_query_with_transform/_stats
# いつまでも{"operations_behind": 1}が残ります。
GET test_query_with_transform_dest/_search?filter_path=hits.hits._source.id,hits.hits._source.delete_flag
{
"hits" : {
"hits" : [
{
"_source" : {
"id" : "b",
"delete_flag" : false
}
},
{
"_source" : {
"id" : "c",
"delete_flag" : false
}
}
]
}
}
id=cの文書は削除されず、残ってしまいます。やはり、うまく削除できませんでした。おそらくtransformより先にqueryによるfilterが効いてしまうためだろうと思います。
うまくいかなかったこと② ingest plugin
続いて、dest index側のingest pluginでdropすることを試みます。
PUT _ingest/pipeline/drop_DELETE
{
"processors": [
{
"drop":{
"if": "if ((ctx.containsKey('delete_flag')) && (ctx?.delete_flag == true)){return true} else {return false}",
"ignore_failure": true
}
}
]
}
PUT test_transform_with_delete_processor_dest
{
"settings":{
"index":{
"default_pipeline": "drop_DELETE"
}
}
}
delete_flagをみてdropするフィルターを作り、dest_indexに設定しておきます。test_latest_transformもいったん削除してリセットします。
PUT _transform/test_transform_with_delete_processor
{
"dest": {
"index": "test_transform_with_delete_processor_dest"
},
"latest":{
"sort": "LastUpdatedTimestamp",
"unique_key": "id"
},
"source":{
"index": "test_latest_transform",
"query": {
"term":
{"delete_flag": false}
}
},
"frequency": "3s",
"sync": {
"time":{
"delay": "10s",
"field": "LastUpdatedTimestamp"
}
}
}
POST _transform/test_transform_with_delete_processor/_start
こちらも開始時はうまくいきますが、
POST test_latest_transform/_doc
{
"id": "c",
"delete_flag": true
}
GET _transform/test_query_with_transform/_stats
いつまでも{"operations_behind": 1}
が残り、id=cの文書が宛先インデックスから削除されません。