ElasticsearchでMySQLから自動取り込みする時に、Elasticsearch側ドキュメントのフィールドに配列として取り込みたい。
やっぱできないかなーと思ったら、できたのでメモ 。
環境
- Elasticsearch -> 1.1.0
- MySQL JDBC ドライバ -> 5.1.30
- river-jdbc プラグイン -> 1.1.0.0
以下の前提で。
- 取り込み対象MySQLテーブル名: myTable
- Elasticsearchインデックス名: myIndex
- Elasticsearchタイプ名: myType
- Elasticsearchのriver名: myRiver
インデックスのマッピング設定例
(例なので Kuromoji 等は設定していません)
curl -XPUT localhost:9200/myIndex -d '↓下のJSON'
{
"settings": {
"analysis": {
"analyzer": {
"ngram_analyzer": {
"tokenizer": "ngram_tokenizer"
}
},
"filter": {
"greek_lowercase_filter": {
"type": "lowercase",
"language": "greek"
}
},
"tokenizer": {
"ngram_tokenizer": {
"type": "nGram",
"max_gram": 3,
"min_gram": 2,
"token_chars": [
"letter",
"digit"
]
}
}
}
},
"mappings": {
"hospitals": {
"_all": {
"enabled": true
},
"_source": {
"enabled": true
},
"properties": {
//integer型
"id": {
"type": "integer",
"index": "not_analyzed"
},
//date型
"created": {
"type": "date",
"index": "not_analyzed"
},
//string型
"name": {
"type": "string",
"analyzer": "ngram_analyzer",
"index": "analyzed"
},
//object型
"location": {
"type": "object",
"properties": {
"country": {
"type": "string",
"index": "not_analyzed"
},
"prefecture": {
"type": "string",
"index": "not_analyzed"
},
"city": {
"type": "string",
"index": "not_analyzed"
}
}
},
//array型(stringでOK)
"tags": {
"type": "string"
}
}
}
}
}
river の設定
定期実行の例として1分ごとに取り込む設定(「schedule」のとこ)にしてあります
curl -XPUT http://localhost:9200/_river/myRiver/_meta -d '↓下のJSON'
{
"type" : "jdbc",
"jdbc" : {
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/myTable",
"user" : "test",
"password" : "test",
"schedule" : "0 0-59 0-23 ? * *",
"sql" : "SELECT .... 後述"
}
}
↑のSQLの部分(実際には改行やインデントはナシ)
SELECT
`id` AS `_id`,
`id`,
DATE_FORMAT(`created`, '%Y-%m-%dT%H:%i:%s') AS `created`,
`name`,
`country` AS `location.country`,
`prefecture` AS `location.prefecture`,
`city` AS `location.city`,
(
SELECT
GROUP_CONCAT(`tag` SEPARATOR ',')
FROM `myTagsTable`
WHERE `id` = `myTable`.`id`
) AS `tags[]`
FROM `myTable`
各型の説明としては、
- 「 _id 」を与えるとElasticsearch側の _id として設定される
- date型は
yyyy-mm-dd hh:ii:ss
ではダメ(parse error)で、yyyy-mm-ddThh:ii:ss
というふうに「 T 」が必要 - object型はドットで繋いで階層を表現可能
- array型はカンマ区切りの文字列を作り、それを「カラム名[]」と配列っぽい名前で出力(数値は文字列として格納される。けど検索には影響なし)
みたいな感じにすればOK。
エラーなく入ってるか確認するには、ログ /var/log/elasticsearch/erasticsearch.log を眺めてみる。
$ sudo tail -f /var/log/elasticsearch/elasticsearch.log
エラーがあればどこのエラーかがログに書いてある。
(SQL文はあらかじめMySQL側でエラーが出ないことを確認しておく)
データが入ったみたいなら、確認してみる。
curl -XGET http://localhost:9200/myIndex/MyType -d '{"query":{"match_all":{}}}'
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 1,
"hits": [
{
"_index": "myIndex",
"_type": "myType",
"_id": "1",
"_score": 1,
"_source": {
"id": 1,
"created": "2009-08-07T17:51:53",
"name": "ほげ割烹",
"locate": {
"country": "JP",
"prefecture": "東京都",
"city": "千代田区"
},
"tags": ""
}
},
{
"_index": "myIndex",
"_type": "myType",
"_id": "2",
"_score": 1,
"_source": {
"id": 2,
"created": "2012-05-13T13:25:02",
"name": "レストラン モゲ",
"locate": {
"country": "JP",
"prefecture": "東京都",
"city": "中野区"
},
"tags": [
"イタリアン",
"ランチ"
]
}
},
{
"_index": "myIndex",
"_type": "myType",
"_id": "3",
"_score": 1,
"_source": {
"id": 3,
"created": "2013-10-02T10:02:43",
"name": "すし野郎",
"locate": {
"country": "JP",
"prefecture": "東京都",
"city": "練馬区"
},
"tags": "寿司"
}
}
]
}
}
array型の配列は、空なら空文字、1つのみなら文字列、複数なら配列になる。
スキーマレスというやつですか。
また、この例では1分おきに取り込まれるので、MySQL側で編集したら最大1分後に反映される。
削除には対応してない のでアプリ側でなんとかする。
そのうちプラグインが対応してくれると助かる。