ElasticSearchはRDBではないので基本的にはJOINできません。
しかし、それっぽい操作を実現する方法がいくつかあります。
- join field typeを使う
- enrich processor pipelineを使う
- transformを使う
この記事では、この「3. transformを使う」を試してみたいと思います。
(参考)他の方法
join field typeを使った方法
enrich processorを使った方法
使用するインデックスの作成
elastic discussに投稿されていた、「ドライバーインデックスに、車インデックスの情報を付与する」 というシチュエーションで考えてみたいと思います。Dev Toolsにコピペすれば実行できる形になっているので、ぜひ試してみてください。
まずは車インデックスを作成し、ダミーデータを入力します。将来的にownerIdをドライバーインデックスにおけるdriverIdと結び付けたいので、copy_to
を用いて同名のdriverIdフィールドを作成しています。transformの中でruntime_fieldを使うことでも実現できるかもしれません。
PUT /test_transform_car/
{
"mappings": {
"properties": {
"carId":{"type": "keyword"},
"model":{"type": "keyword"},
"modelId":{"type": "keyword"},
"ownerId":{"type": "keyword",
"copy_to": "driverId"
},
"driverId":{"type":"keyword"}
}
}
}
POST /test_transform_car/_bulk
{"index":{}}
{"carId":"car001","model":"model001","ownerId":"001"}
{"index":{}}
{"carId":"car002","model":"model002","ownerId":"001"}
続いて、ドライバーインデックスを作成。
PUT /test_transform_driver/
{
"mappings":{
"properties": {
"driverId":{"type":"keyword"},
"name":{"type":"keyword"},
"country":{"type":"keyword"},
"countryId":{"type":"keyword"}
}
}
}
POST /test_transform_driver/_bulk
{"index":{}}
{"driverId":"001","name":"foo"}
{"index":{}}
{"driverId":"002","name":"baa"}
宛先のインデックスも作成しておきます。cars.all
はnested field type
にしておくのが良いでしょう。一方でdriver.all
はidで一意に定まる想定でnestedにはせず通常のobjectです。他のキーについても適宜設定します。
PUT /test_transform_join/
{
"mappings": {
"properties": {
"cars.all":{
"type":"nested",
"properties": {
"carId":{"type": "keyword"},
"model":{"type": "keyword"},
"modelId":{"type": "keyword"}
}
},
"driver.all":{
"properties": {
"name":{"type":"keyword"},
"country":{"type":"keyword"},
"countryId":{"type":"keyword"}
}
},
"driverId":{"type":"keyword"}
}
}
}
Transformの作成
この記事のメインです。elastic dicussにおけるこちらの回答を基にscript metric aggregation
を用いてpivot transformを作成します。なお一工夫入れて、JOINに使ったフィールドなど不要なフィールドを削除できるようにしました。
PUT _transform/test_car/
{
"dest":{
"index":"test_transform_join"
},
"pivot":{
"aggs":{
"cars":{
"filter": {
"term":{
"_index":"test_transform_car"
}
},
"aggs":{
"all":{
"scripted_metric": {
"init_script": "state.docs = []",
"map_script": "HashMap map=new HashMap(params['_source']); for(field in params['unnecessaryFields']) {map.remove(field)} state.docs.add(map)",
"combine_script": "return state.docs",
"reduce_script": "def ret = []; for (s in states) {for (d in s) { ret.add(d);}}return ret",
"params":{
"unnecessaryFields":["ownerId"]
}
}
}
}
},
"driver":{
"filter": {
"term":{
"_index":"test_transform_driver"
}
},
"aggs":{
"all":{
"scripted_metric": {
"init_script": "state.docs = []",
"map_script": "HashMap map=new HashMap(params['_source']); for(field in params['unnecessaryFields']) {map.remove(field)} state.docs.add(map)",
"combine_script": "return state.docs",
"reduce_script": "def ret = []; for (s in states) {for (d in s) { ret.add(d);}}return ret",
"params":{
"unnecessaryFields":["driverId"]
}
}
}
}
}
},
"group_by": {
"driverId": {"terms": {
"field": "driverId"
}}
}
},
"source":{
"index":["test_transform_car", "test_transform_driver"]
}
}
transformを作成したら、previewで確認してください。
GET _transform/test_car/_preview
{
"preview" : [
{
"cars" : {
"all" : [
{
"model" : "model001",
"carId" : "car001"
},
{
"model" : "model002",
"carId" : "car002"
}
]
},
"driverId" : "001",
"driver" : {
"all" : [
{
"name" : "foo"
}
]
}
},
{
"cars" : {
"all" : [
{
"model" : "model003",
"carId" : "car003"
}
]
},
"driverId" : "002",
"driver" : {
"all" : [
{
"name" : "baa"
}
]
}
}
],
"generated_dest_index" : {
"mappings" : {
"_meta" : {
"_transform" : {
"transform" : "test_car",
"version" : {
"created" : "7.16.2"
},
"creation_date_in_millis" : 1642769227904
},
"created_by" : "transform"
},
"properties" : {
"driverId" : {
"type" : "keyword"
}
}
},
"settings" : {
"index" : {
"number_of_shards" : "1",
"auto_expand_replicas" : "0-1"
}
},
"aliases" : { }
}
}
preview
の中に、生成されるdocumentがarrayとして格納されています。driverIdに対して、driverの情報とcarの情報が格納されていることが分かるかと思います。
ここまで確認したら、transformを開始させ、JOINされた結果が生成されることを確認しましょう。
POST _transform/test_car/_start
GET /test_transform_join/_search?filter_path=hits.hits._source,hits.hits._id
{
"hits" : {
"hits" : [
{
"_id" : "MPc7S9g1I0p7BDqjrqr0eWAAAAAAAAAA",
"_source" : {
"cars" : {
"all" : [
{
"model" : "model001",
"carId" : "car001"
},
{
"model" : "model002",
"carId" : "car002"
}
]
},
"driverId" : "001",
"driver" : {
"all" : [
{
"name" : "foo"
}
]
}
}
},
{
"_id" : "MNs3WLav1eXtZTHwvhPNvlEAAAAAAAAA",
"_source" : {
"cars" : {
"all" : [
{
"model" : "model003",
"carId" : "car003"
}
]
},
"driverId" : "002",
"driver" : {
"all" : [
{
"name" : "baa"
}
]
}
}
}
]
}
}
うまいこと2つのindexがJOINされたindexが作成されました😀
driverやcarを更新して、JOINも追って更新されることを確認してください。
bucket selector aggregationを組み合わせることで見かけ上はINNER JOINを作成できますが、INDEXサイズの圧縮にはなるものの計算負荷の軽減にはならないことに注意してください。
transformを利用する際の注意点
transformにはいくつかの注意点もあるようです。こちらもご確認ください。
https://www.elastic.co/guide/en/elasticsearch/reference/current/transform-limitations.html