pipelineとは
fieldを追加したり、convertしたりできる。logstashで言う filter{} セクション。
pipelineの使い方
- 事前にpipelineを定義
- indexするとき(dataをinsertするとき)にpipelineを指定しながらindexing
- Viola!
さくっとおためし Pipeline
*elastic 6.3なので古いです。 POSTでエラーになる場合は _doc を外してください
# 既存のpipelineリスト
get _ingest/pipeline
# my_add_my_text パイプラインを新規定義
put _ingest/pipeline/my_add_my_text
{
"description": "adds added_my_text field",
"processors": [
{
"set" : {
"field" : "added_my_text",
"value" : "{{answer.my_text}} is my_text"
}
}
]
}
delete test
# my_add_my_textパイプラインを使いながら、dataを入れる
POST test/_doc/1?pipeline=my_add_my_text
{
"answer": {
"my_text": 20191115
}
}
# 作ったpipelineの削除
DELETE _ingest/pipeline/my_add_my_text
開発方法
_simulate を使うと便利。事前にindexをPOSTしておく必要がない。(pipelineの定義と一緒にダミーデータも渡せる)
ただmappingがverboseにしても出ないから、何の data typeで入ったかはわからなそう
POST _ingest/pipeline/_simulate?verbose
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "added_my_text",
"value" : "{{answer.my_text}} is my_text"
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "_doc",
"_id": "id",
"_source": {
"answer": {
"answered_at": "2019-11-15 00:00:00"
}
}
}
]
}
Reindexでpipelineを使う方法
data typeを変えるときとかに使うとよさそう。
indexのaliasと組み合わせると便利。
アプリは index名: test を使っているとき
index a から b にコピー
bのエイリアスを test にすると、アプリは何も変更を加えず、新しいindexにアクセスできる
# 巣のデータ
post test_source/_doc/1
{
"answer": {
"my_text": 20191115
}
}
get test_source/_search <-----入れたデータが出る
# indexをコピー
POST _reindex
{
"source": {
"index": "test_source" <------これを
},
"dest": {
"index": "test_dest", <-----このindexにコピー(pipelineを通しながら)
"pipeline": "my_add_my_text"
}
}
# 新しいindex
get test_dest/_search <----- added_my_textが増えたデータが出る
# index名:test でアクセスできるようにaliasを設定する <---- 既存aliaがある場合は事前にdelete必要
POST /_aliases
{
"actions": [
{
"add": {
"index": "test_dest", <---これが本体
"alias": "test" <--こっちが虚体
}
}
]
}
# aliasのリスト
GET /_alias
GET _cat/aliases <---こっちのほうが見やすい
yyyy-mm-dd hh:ii:ss な text を date にしたい
これが今回の目的でした。pipelineにはさっき使った set というprocessorがある。
processorによってできることが違う。フィールドを増やしたり、typeを変えたり、正規表現したり、いろいろ。
種類は以下
Processors
https://www.elastic.co/guide/en/elasticsearch/reference/6.3/ingest-processors.html
- Append Processor
- Convert Processor
- Date Processor
- Date Index Name Processor
- Fail Processor
- Foreach Processor
- Grok Processor
- Gsub Processor
- Join Processor
- JSON Processor
- KV Processor
- Lowercase Processor
- Remove Processor
- Rename Processor
- Script Processor
- Set Processor
- Split Processor
- Sort Processor
- Trim Processor
- Uppercase Processor
- Dot Expander Processor
- URL Decode Processor
dateを変えようとして convert processorを使ったらエラー。名前的にdate processorか?
textになっているfield
yyyy-mm-dd hh:ii:ssがtextになる
delete test
post test/_doc/1
{
"answer": {
"answered_at": "2019-11-15 00:00:00" <------- yyyy-mm-dd hh:ii:ss 形式だと
}
}
get test/_mapping
# response
{
"test": {
"mappings": {
"_doc": {
"properties": {
"answer": {
"properties": {
"answered_at": {
"type": "text", <---------- textになってしまう。期間検索とかができなくなる
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
}
}
}
うまくいかなかった Date processor
00:00:00 だと "Value 0 for clockhourOfHalfday must be in the range [1,12]" でエラー
# date processorを使う
PUT _ingest/pipeline/my-convert-date
{
"description": "converts text field '2019-11-15 00:00:00' to date",
"processors" : [
{
"date" : {
"field" : "answer.answered_at",
"target_field" : "answer.answered_at_date",
"formats" : ["yyyy-MM-dd hh:mm:ss"],
"timezone" : "Asia/Tokyo"
}
}
]
}
get test/_mapping
{
"error": {
"root_cause": [
{
"type": "exception",
"reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: unable to parse date [2019-11-15 00:00:00]",
"header": {
"processor_type": "date"
}
}
],
"type": "exception",
"reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: unable to parse date [2019-11-15 00:00:00]",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "java.lang.IllegalArgumentException: unable to parse date [2019-11-15 00:00:00]",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "unable to parse date [2019-11-15 00:00:00]",
"caused_by": {
"type": "illegal_field_value_exception",
"reason": "Cannot parse \"2019-11-15 00:00:00\": Value 0 for clockhourOfHalfday must be in the range [1,12]" <------ 内部が12Hモードになってるみたい
}
}
},
"header": {
"processor_type": "date"
}
},
"status": 500
}
うまくいった gsub filter
ISO8601の形式にしてからdateに変換すれば行くやろ、と思ったのでとりあえず 真ん中のスペースを T に置換 したら、それだけで dynamic_date_formats
のお眼鏡にかなったようで date 認識された。
delete test
## gsub processorで文字列置換する
# before: 2019-11-15 00:00:00
# after: 2019-11-15T00:00:00
PUT _ingest/pipeline/my-convert-date
{
"description": "converts text field '2019-11-15 00:00:00' to date",
"processors" : [
{
"gsub": {
"field": "answer.answered_at",
"pattern": " ", <------- スペースを
"replacement": "T" <-------- T にする
}
}
]
}
# data 投入
post test/_doc/1?pipeline=my-convert-date
{
"answer": {
"answered_at": "2019-11-15 00:00:00"
}
}
get test/_mapping
# response
{
"test": {
"mappings": {
"_doc": {
"properties": {
"answer": {
"properties": {
"answered_at": {
"type": "date" <------- dateになった
}
}
}
}
}
}
}
}
ちなみに date 判定される形式は ↓ に書いている(自動で型判定 = dynamic mapping という機能名)
The default value for dynamic_date_formats is:
[ "strict_date_optional_time","yyyy/MM/dd HHss Z||yyyy/MM/dd Z"]
https://www.elastic.co/guide/en/elasticsearch/reference/6.3/dynamic-field-mapping.html
reindex で text を date にする完全版
# 既存indexのbackup
POST _reindex
{
"source": {
"index": "myindex"
},
"dest": {
"index": "myindex_20200129"
}
}
# mappingを消すため、indexごとdelete
delete myindex
# dateにするpipelineの登録
PUT _ingest/pipeline/my-convert-date
{
"description": "converts text field '2019-11-15 00:00:00' to date '2019-11-15T00:00:00'",
"processors" : [
{
"gsub": {
"field": "answer.answered_at",
"pattern": " ",
"replacement": "T"
}
}
]
}
# joinしているindexのため、事前に必要なmappingをしておく
put myindex
{
"mappings": {
"_doc": {
"properties": {
"my_join_field": {
"type": "join",
"relations": {
"question": [
"answer"
]
}
}
}
}
}
}
# pipelineに指定しているfieldがないindexでエラーになるので、fieldがあるindex/ないindexで分けてreindexする。
# queryのテスト
POST myindex_20200129/_search
{
"query": {
"exists": {
"field": "question"
}
}
}
POST myindex_20200129/_search
{
"query": {
"exists": {
"field": "answer"
}
}
}
# pipelineなしのreindex
POST _reindex
{
"source": {
"index": "myindex_20200129",
"query": {
"exists": {
"field": "question"
}
}
},
"dest": {
"index": "myindex_20200129_reindexed"
}
}
# pipelineありのreindex
POST _reindex
{
"source": {
"index": "myindex_20200129",
"query": {
"exists": {
"field": "answer"
}
}
},
"dest": {
"index": "myindex_20200129_reindexed",
"pipeline": "my-convert-date"
}
}
# aliasで名前違ってもアクセスできるように
POST /_aliases
{
"actions": [
{
"add": {
"index": "myindex_20200129_reindexed",
"alias": "myindex"
}
}
]
}
GET /_alias
GET myindex/_search # dataチェック