Elasticsearch

ElasticsearchでMySQLから取り込み時に配列として入れる

More than 3 years have passed since last update.

ElasticsearchでMySQLから自動取り込みする時に、Elasticsearch側ドキュメントのフィールドに配列として取り込みたい。

やっぱできないかなーと思ったら、できたのでメモ


環境


以下の前提で。


  • 取り込み対象MySQLテーブル名: myTable

  • Elasticsearchインデックス名: myIndex

  • Elasticsearchタイプ名: myType

  • Elasticsearchのriver名: myRiver



インデックスのマッピング設定例

(例なので Kuromoji 等は設定していません)

curl -XPUT localhost:9200/myIndex -d '↓下のJSON'

{

"settings": {
"analysis": {
"analyzer": {
"ngram_analyzer": {
"tokenizer": "ngram_tokenizer"
}
},
"filter": {
"greek_lowercase_filter": {
"type": "lowercase",
"language": "greek"
}
},
"tokenizer": {
"ngram_tokenizer": {
"type": "nGram",
"max_gram": 3,
"min_gram": 2,
"token_chars": [
"letter",
"digit"
]
}
}
}
},

"mappings": {
"hospitals": {
"_all": {
"enabled": true
},
"_source": {
"enabled": true
},
"properties": {

//integer型
"id": {
"type": "integer",
"index": "not_analyzed"
},

//date型
"created": {
"type": "date",
"index": "not_analyzed"
},

//string型
"name": {
"type": "string",
"analyzer": "ngram_analyzer",
"index": "analyzed"
},

//object型
"location": {
"type": "object",
"properties": {
"country": {
"type": "string",
"index": "not_analyzed"
},
"prefecture": {
"type": "string",
"index": "not_analyzed"
},
"city": {
"type": "string",
"index": "not_analyzed"
}
}
},

//array型(stringでOK)
"tags": {
"type": "string"
}
}
}
}
}


river の設定

定期実行の例として1分ごとに取り込む設定(「schedule」のとこ)にしてあります

curl -XPUT http://localhost:9200/_river/myRiver/_meta -d '↓下のJSON'

{

"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/myTable",
"user" : "test",
"password" : "test",
"schedule" : "0 0-59 0-23 ? * *",
"sql" : "SELECT .... 後述"
}
}

↑のSQLの部分(実際には改行やインデントはナシ)

SELECT

`id` AS `_id`,
`id`,
DATE_FORMAT(`created`, '%Y-%m-%dT%H:%i:%s') AS `created`,
`name`,
`country` AS `location.country`,
`prefecture` AS `location.prefecture`,
`city` AS `location.city`,
(
SELECT
GROUP_CONCAT(`tag` SEPARATOR ',')
FROM `myTagsTable`
WHERE `id` = `myTable`.`id`
) AS `tags[]`
FROM `myTable`

各型の説明としては、


  • _id 」を与えるとElasticsearch側の _id として設定される

  • date型は yyyy-mm-dd hh:ii:ss ではダメ(parse error)で、yyyy-mm-ddThh:ii:ss というふうに「 T 」が必要

  • object型はドットで繋いで階層を表現可能

  • array型はカンマ区切りの文字列を作り、それを「カラム名[]」と配列っぽい名前で出力(数値は文字列として格納される。けど検索には影響なし)

みたいな感じにすればOK。

エラーなく入ってるか確認するには、ログ /var/log/elasticsearch/erasticsearch.log を眺めてみる。

$ sudo tail -f /var/log/elasticsearch/elasticsearch.log

エラーがあればどこのエラーかがログに書いてある。

(SQL文はあらかじめMySQL側でエラーが出ないことを確認しておく)

データが入ったみたいなら、確認してみる。

curl -XGET http://localhost:9200/myIndex/MyType -d '{"query":{"match_all":{}}}'

{

"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 1,
"hits": [
{
"_index": "myIndex",
"_type": "myType",
"_id": "1",
"_score": 1,
"_source": {
"id": 1,
"created": "2009-08-07T17:51:53",
"name": "ほげ割烹",
"locate": {
"country": "JP",
"prefecture": "東京都",
"city": "千代田区"
},
"tags": ""
}
},
{
"_index": "myIndex",
"_type": "myType",
"_id": "2",
"_score": 1,
"_source": {
"id": 2,
"created": "2012-05-13T13:25:02",
"name": "レストラン モゲ",
"locate": {
"country": "JP",
"prefecture": "東京都",
"city": "中野区"
},
"tags": [
"イタリアン",
"ランチ"
]
}
},
{
"_index": "myIndex",
"_type": "myType",
"_id": "3",
"_score": 1,
"_source": {
"id": 3,
"created": "2013-10-02T10:02:43",
"name": "すし野郎",
"locate": {
"country": "JP",
"prefecture": "東京都",
"city": "練馬区"
},
"tags": "寿司"
}
}
]
}
}

array型の配列は、空なら空文字、1つのみなら文字列、複数なら配列になる。

スキーマレスというやつですか。

また、この例では1分おきに取り込まれるので、MySQL側で編集したら最大1分後に反映される。

削除には対応してない のでアプリ側でなんとかする。

そのうちプラグインが対応してくれると助かる。