LoginSignup
0
2

More than 3 years have passed since last update.

Elasticsearch : text になったfieldを date にする

Posted at

pipelineとは

fieldを追加したり、convertしたりできる。logstashで言う filter{} セクション。

pipelineの使い方

  1. 事前にpipelineを定義
  2. indexするとき(dataをinsertするとき)にpipelineを指定しながらindexing
  3. Viola!

さくっとおためし Pipeline

*elastic 6.3なので古いです。 POSTでエラーになる場合は _doc を外してください

# 既存のpipelineリスト
get _ingest/pipeline

# my_add_my_text パイプラインを新規定義
put _ingest/pipeline/my_add_my_text
{
  "description": "adds added_my_text field",
  "processors": [
    {
      "set" : {
        "field" : "added_my_text",
        "value" : "{{answer.my_text}} is my_text"
      }
    }
  ]
}


delete test

# my_add_my_textパイプラインを使いながらdataを入れる
POST test/_doc/1?pipeline=my_add_my_text
{
  "answer": {
    "my_text": 20191115
  }
}


# 作ったpipelineの削除
DELETE _ingest/pipeline/my_add_my_text


開発方法

_simulate を使うと便利。事前にindexをPOSTしておく必要がない。(pipelineの定義と一緒にダミーデータも渡せる)

ただmappingがverboseにしても出ないから、何の data typeで入ったかはわからなそう

POST _ingest/pipeline/_simulate?verbose
{
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "added_my_text",
          "value" : "{{answer.my_text}} is my_text"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "_doc",
      "_id": "id",
      "_source": {
          "answer": {
            "answered_at": "2019-11-15 00:00:00"
          }
      }
    }
  ]
}



Reindexでpipelineを使う方法

data typeを変えるときとかに使うとよさそう。

indexのaliasと組み合わせると便利。
アプリは index名: test を使っているとき
index a から b にコピー
bのエイリアスを test にすると、アプリは何も変更を加えず、新しいindexにアクセスできる

# 巣のデータ
post test_source/_doc/1
{
  "answer": {
    "my_text": 20191115
  }
}


get test_source/_search              <-----入れたデータが出る

# indexをコピー
POST _reindex
{
  "source": {
    "index": "test_source"           <------これを
  },
  "dest": {
    "index": "test_dest",           <-----このindexにコピーpipelineを通しながら
    "pipeline": "my_add_my_text"
  }
} 

# 新しいindex
get test_dest/_search                 <----- added_my_textが増えたデータが出る

# index名:test でアクセスできるようにaliasを設定する        <---- 既存aliaがある場合は事前にdelete必要
POST /_aliases
{
  "actions": [
    {
      "add": {  
        "index": "test_dest",          <---これが本体
        "alias": "test"                 <--こっちが虚体
      }
    }
  ]
}


# aliasのリスト
GET /_alias
GET _cat/aliases        <---こっちのほうが見やすい

yyyy-mm-dd hh:ii:ss な text を date にしたい

これが今回の目的でした。pipelineにはさっき使った set というprocessorがある。
processorによってできることが違う。フィールドを増やしたり、typeを変えたり、正規表現したり、いろいろ。

種類は以下

Processors

https://www.elastic.co/guide/en/elasticsearch/reference/6.3/ingest-processors.html
- Append Processor
- Convert Processor
- Date Processor
- Date Index Name Processor
- Fail Processor
- Foreach Processor
- Grok Processor
- Gsub Processor
- Join Processor
- JSON Processor
- KV Processor
- Lowercase Processor
- Remove Processor
- Rename Processor
- Script Processor
- Set Processor
- Split Processor
- Sort Processor
- Trim Processor
- Uppercase Processor
- Dot Expander Processor
- URL Decode Processor

dateを変えようとして convert processorを使ったらエラー。名前的にdate processorか?

textになっているfield

yyyy-mm-dd hh:ii:ssがtextになる


delete test

post test/_doc/1
{
  "answer": {
    "answered_at": "2019-11-15 00:00:00"          <------- yyyy-mm-dd hh:ii:ss 形式だと
  }
}

get test/_mapping

# response
{
  "test": {
    "mappings": {
      "_doc": {
        "properties": {
          "answer": {
            "properties": {
              "answered_at": {
                "type": "text",                  <---------- textになってしまう期間検索とかができなくなる
                "fields": {
                  "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

うまくいかなかった Date processor

00:00:00 だと "Value 0 for clockhourOfHalfday must be in the range [1,12]" でエラー

# date processorを使う
PUT _ingest/pipeline/my-convert-date
{
  "description": "converts text field '2019-11-15 00:00:00' to date",
  "processors" : [
    {
      "date" : {
        "field" : "answer.answered_at",
        "target_field" : "answer.answered_at_date",
        "formats" : ["yyyy-MM-dd hh:mm:ss"],
        "timezone" : "Asia/Tokyo"
      }
    }
  ]
}


get test/_mapping
{
  "error": {
    "root_cause": [
      {
        "type": "exception",
        "reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: unable to parse date [2019-11-15 00:00:00]",
        "header": {
          "processor_type": "date"
        }
      }
    ],
    "type": "exception",
    "reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: unable to parse date [2019-11-15 00:00:00]",
    "caused_by": {
      "type": "illegal_argument_exception",
      "reason": "java.lang.IllegalArgumentException: unable to parse date [2019-11-15 00:00:00]",
      "caused_by": {
        "type": "illegal_argument_exception",
        "reason": "unable to parse date [2019-11-15 00:00:00]",
        "caused_by": {
          "type": "illegal_field_value_exception",
          "reason": "Cannot parse \"2019-11-15 00:00:00\": Value 0 for clockhourOfHalfday must be in the range [1,12]"       <------ 内部が12Hモードになってるみたい
        }
      }
    },
    "header": {
      "processor_type": "date"
    }
  },
  "status": 500
}

うまくいった gsub filter

ISO8601の形式にしてからdateに変換すれば行くやろ、と思ったのでとりあえず 真ん中のスペースを T に置換 したら、それだけで dynamic_date_formats のお眼鏡にかなったようで date 認識された。


delete test

## gsub processorで文字列置換する
# before: 2019-11-15 00:00:00
# after: 2019-11-15T00:00:00
PUT _ingest/pipeline/my-convert-date
{
  "description": "converts text field '2019-11-15 00:00:00' to date",
  "processors" : [
    {
      "gsub": {
        "field": "answer.answered_at",
        "pattern": " ",                <------- スペースを
        "replacement": "T"               <-------- T にする
      }
    }
  ]
}

# data 投入
post test/_doc/1?pipeline=my-convert-date
{
  "answer": {
    "answered_at": "2019-11-15 00:00:00"
  }
}


get test/_mapping

# response
{
  "test": {
    "mappings": {
      "_doc": {
        "properties": {
          "answer": {
            "properties": {
              "answered_at": {
                "type": "date"      <------- dateになった
              }
            }
          }
        }
      }
    }
  }
}


ちなみに date 判定される形式は ↓ に書いている(自動で型判定 = dynamic mapping という機能名)

The default value for dynamic_date_formats is:
[ "strict_date_optional_time","yyyy/MM/dd HH:flag_mm:ss Z||yyyy/MM/dd Z"]
https://www.elastic.co/guide/en/elasticsearch/reference/6.3/dynamic-field-mapping.html

reindex で text を date にする完全版

# 既存indexのbackup
POST _reindex
{
  "source": {
    "index": "myindex"
  },
  "dest": {
    "index": "myindex_20200129"
  }
}

# mappingを消すためindexごとdelete
delete myindex

# dateにするpipelineの登録
PUT _ingest/pipeline/my-convert-date
{
  "description": "converts text field '2019-11-15 00:00:00' to date '2019-11-15T00:00:00'",
  "processors" : [
    {
      "gsub": {
        "field": "answer.answered_at",
        "pattern": " ",
        "replacement": "T"
      }
    }
  ]
}


# joinしているindexのため事前に必要なmappingをしておく
put myindex
{
  "mappings": {
    "_doc": {
      "properties": {
        "my_join_field": {
          "type": "join",
          "relations": {
            "question": [
              "answer"
            ]
          }
        }
      }
    }
  }
}

# pipelineに指定しているfieldがないindexでエラーになるのでfieldがあるindex/ないindexで分けてreindexする
# queryのテスト
POST myindex_20200129/_search
{
    "query": {
      "exists": {
        "field": "question"
      }
    }
}

POST myindex_20200129/_search
{
    "query": {
      "exists": {
        "field": "answer"
      }
    }
}

# pipelineなしのreindex
POST _reindex
{
  "source": {
    "index": "myindex_20200129",
    "query": {
      "exists": {
        "field": "question"
      }
    }
  },
  "dest": {
    "index": "myindex_20200129_reindexed"
  }
} 

# pipelineありのreindex
POST _reindex
{
  "source": {
    "index": "myindex_20200129",
    "query": {
      "exists": {
        "field": "answer"
      }
    }
  },
  "dest": {
    "index": "myindex_20200129_reindexed",
    "pipeline": "my-convert-date"
  }
} 

# aliasで名前違ってもアクセスできるように
POST /_aliases
{
  "actions": [
    {
      "add": {
        "index": "myindex_20200129_reindexed",
        "alias": "myindex"
      }
    }
  ]
}

GET /_alias
GET myindex/_search        # dataチェック


0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2