この記事は、Elasticsearch Java-API を使う (BulkRequest 登録編) の続きです。
要件やデータ構造などは、上記を参照してください。
さて、Bulkで登録したデータを色々扱っていきたいと思います。
要件
- 日別売上ランキングTOP10を表示したい。
- 日付範囲は、日別・週別・月別など切り替えられるようにしたい。
- 売上ランキングの件数は変更可能にしたい。
- 総合とジャンル別でも切り分けて見れるようにしたい。
取得方法
Kibana
ある特定の1日のTOP5を表示できるようになりました。
クエリ
上記のような構成をクエリで表現すると、以下のようになります。
GET sample-ranking-qiita-*/_search
{
"aggs": {
"range": {
"date_range": {
"field": "execDate",
"ranges": [
{
"from": "2018/01/15T00:00:00+09:00",
"to": "2018/01/15T23:59:59+09:00"
}
]
},
"aggs": {
"title_rate": {
"terms": {
"field": "title.keyword",
"size": 5,
"order": {
"sum_rate": "desc"
}
},
"aggs": {
"sum_rate": {
"sum": {
"field": "rate"
}
}
}
}
}
}
}
}
aggsの入れ子がちょっと見づらいですかね。
以下の3つのaggsを定義しています。
-
sum_rate
という名前で、rate
を合計する -
title_rate
という名前で、title
別にsum_rate
を、降順に表示する -
title_rate
の集計範囲は、execDate
のfrom
からto
の間とする。
SQL
参考までに、SQLですと、こんなイメージです。
(試してはいないので、間違ってたらすみません)
SELECT
ranking
, title
, sum_rate
FROM (
select
ROW_NUMBER() OVER (ORDER BY sum_rate DESC) AS ranking
, tb1.title
, tb1.sum_rate
FROM (
SELECT
title
, SUM(rate) as sum_rate
FROM
tbl_ranking
WHERE
execDate >= '2018/01/15T00:00:00+09:00'
AND execDate <= '2018/01/15T23:59:59+09:00'
GROUP BY
title
) tb1
)
WHERE
ranking <= 5
取得結果
クエリの結果を取得すると、aggregations
タグの中に、aggsの結果がネストされて入っています。
buckets
タグ内のリストが各aggsの結果リストとなります。
Kibanaで表示したのと同じタイトルが並んでいますね。レート合計値も合致しています。
{
"took": 426,
"timed_out": false,
"_shards": {
"total": 15,
"successful": 15,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 24800,
"max_score": 1,
"hits": [
(略)
]
},
"aggregations": {
"range": {
"buckets": [
{
// 3. title_rateの集計日付範囲 ------------------------------
"key": "2018/01/14T15:00:00+0000-2018/01/15T14:59:59+0000",
"from": 1515942000000,
"from_as_string": "2018/01/14T15:00:00+0000",
"to": 1516028399000,
"to_as_string": "2018/01/15T14:59:59+0000",
"doc_count": 400,
// 2. title別sum_rate降順 ------------------------------
"title_rate": {
"doc_count_error_upper_bound": -1,
"sum_other_doc_count": 380,
"buckets": [
{
"key": "開幕の新た",
"doc_count": 4,
"sum_rate": {
// 1. rate合計 ------------
"value": 355
}
},
{
"key": "高校生",
"doc_count": 4,
"sum_rate": {
"value": 337
}
},
{
"key": "中間テスト",
"doc_count": 4,
"sum_rate": {
"value": 333
}
},
{
"key": "亭主の真相",
"doc_count": 4,
"sum_rate": {
"value": 292
}
},
{
"key": "試合の一瞬",
"doc_count": 4,
"sum_rate": {
"value": 292
}
}
]
}
}
]
}
}
}
Java-API
さて、それでは、Java-APIで上記クエリを取得していきましょう。
public class AggsSampleViewer {
private static Logger logger = LoggerFactory.getLogger(AggsSampleViewer.class);
// 年月インデックスのフォーマット(yyyyMM)
private static DateTimeFormatter YM_INDEX_FORMATTER;
// 実行日付のフォーマット(yyyy/MM/dd'T'HH:mm:ss+09:00)
private static DateTimeFormatter DT_INDEX_FORMATTER;
// 各種設定情報
QiitaSettingBean setting;
/**
* コンストラクタ
* @param setting 設定情報
*/
public AggsSampleViewer(QiitaSettingBean setting) {
super();
this.setting = setting;
DT_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getExecDateFormat());
YM_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getIndexYmFormat());
}
/**
* 取得処理実行
*
* @param subIndex サブインデックス名(全体取得時はnull)
* @throws Exception
*/
public void execute(String subIndex) throws Exception {
// ELSをJavaで扱うためのクライアントを作成する
// setting.getElasticearch().getAddress(): IPアドレス
// setting.getElasticearch().getPort(): ポート番号(通常は9300)
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
new InetSocketTransportAddress(InetAddress.getByName(setting.getElasticearch().getAddress()),
setting.getElasticearch().getPort()));
// 検索用Index生成(サブインデックス指定時はサブも入れる)
String searchIndex = setting.getElsImport().getIndex();
if (StringUtils.isNotEmpty(subIndex)) {
searchIndex = searchIndex + "-" + subIndex;
}
searchIndex += "*";
logger.debug("■■■ 検索対象インデックス: " + searchIndex);
// sum_rate用 Aggs:rateの合計値を算出する
AggregationBuilder sumRateAggs = AggregationBuilders.sum("sum_rate").field("rate");
// title_rate用 Aggs:title別rate降順表示
AggregationBuilder titleRateAggs = AggregationBuilders.terms("title_rate")
// title別に集計する
.field("title.keyword")
// setting.getRankingLimit(): ランキング個数
.size(setting.getRankingLimit())
// 表示順は、sum_rateの降順
.order(Order.aggregation("sum_rate", false))
// subaggsに、sum_rate用 Aggsを追加
.subAggregation(sumRateAggs)
;
// 日付範囲用Aggs
AggregationBuilder dateRangeAggs = AggregationBuilders.dateRange("range")
// 日付範囲の対象は、処理実行日
.field("execDate")
// 日付範囲を、処理対象日の0時ピッタリの前日~当日0時ピッタリに設定する
// 例) setting.getNow(): 2018/01/16 13:20:35
// from:2018/01/15 00:00:00
// to :2018/01/16 00:00:00
.addRange(
setting.getNow().minusDays(1).truncatedTo(ChronoUnit.DAYS).format(DT_INDEX_FORMATTER)
, setting.getNow().truncatedTo(ChronoUnit.DAYS).format(DT_INDEX_FORMATTER)
)
// subaggsに、title_rate用 Aggsを追加
.subAggregation(titleRateAggs)
;
// 全体クエリ
SearchRequestBuilder requestBuilder = client.prepareSearch(searchIndex)
.addAggregation(dateRangeAggs);
// 検索結果を取得する
SearchResponse res = requestBuilder.get();
// 全体クエリのAggsの結果取得
for (Aggregation aggs : res.getAggregations()) {
logger.debug("aggs name: "+ aggs.getName());
}
// 【日付範囲用Aggs】(タイプ: date_range) ----------------
InternalDateRange resDateRangeAggs = res.getAggregations().get("range");
// 日付範囲用Aggs内のbucketを確認する
for (InternalDateRange.Bucket dateRangeBucket : resDateRangeAggs.getBuckets()) {
// keyとdoc_countはどのBucketでも取得可能
logger.debug("* resDateRangeAggs bucket key: "+ dateRangeBucket.getKey());
logger.debug("* resDateRangeAggs bucket doc_count: "+ dateRangeBucket.getDocCount());
// 日付範囲用Bucketを参照しているため、FromとToが取得可能
logger.debug("* resDateRangeAggs bucket from: "+ dateRangeBucket.getFromAsString());
logger.debug("* resDateRangeAggs bucket to: "+ dateRangeBucket.getToAsString());
// 日付範囲用Aggs内のAggs結果確認
for (Aggregation aggs : dateRangeBucket.getAggregations()) {
logger.debug("* resDateRangeAggs bucket aggs: "+ aggs.getName());
}
// 【title_rate用 Aggs】(タイプ: terms) ----------------
Terms resTitleRateAggs = dateRangeBucket.getAggregations().get("title_rate");
// title_rate用 Aggs内のbucketを確認する
for (Terms.Bucket termBucket : resTitleRateAggs.getBuckets()) {
logger.debug("** resTitleRateAggs bucket key: "+ termBucket.getKey());
logger.debug("** resTitleRateAggs bucket doc_count: "+ termBucket.getDocCount());
// title_rate用 Aggs結果確認
for (Aggregation aggs : termBucket.getAggregations()) {
logger.debug("** resTitleRateAggs bucket aggs: "+ aggs.getName());
}
// 【sum_rate用 Aggs】(タイプ: sum) ----------------
Sum resSumRateAggs = termBucket.getAggregations().get("sum_rate");
// Sumの中にAggsはなく、sum結果が取得できる
logger.debug("*** resSumRateAggs sum name: "+ resSumRateAggs.getName());
logger.debug("*** resSumRateAggs sum value: "+ resSumRateAggs.getValueAsString());
}
}
}
}
クエリと同じネスト構造でAggsを定義していきます。
どのAggsを使って、どのBucketが返ってくるかを調べるのがちょっと大変ですかね ^^;
この辺は公式を確認するのが一番です。
#公式はどう定義するかは書いてあるのですが、その結果どういった情報が返ってくるかは書かれてません…自動補完等を駆使して検討をつけていくとかですかね…
実行はこのように行います。
AggsSampleViewer viewer = new AggsSampleViewer(setting);
// 全体ランキング情報生成
logger.debug("■ 全体ランキング ----------------------------------------");
viewer.execute(null);
// 少年ランキング情報生成
logger.debug("■ 少年ランキング ----------------------------------------");
viewer.execute("boy");
実行結果
■ 全体ランキング ----------------------------------------
■■■ 検索対象インデックス: sample-ranking-qiita*
aggs name: range
* resDateRangeAggs bucket key: 2018/01/14T15:00:00+0000-2018/01/15T15:00:00+0000
* resDateRangeAggs bucket doc_count: 400
* resDateRangeAggs bucket from: 2018/01/14T15:00:00+0000
* resDateRangeAggs bucket to: 2018/01/15T15:00:00+0000
* resDateRangeAggs bucket aggs: title_rate
** resTitleRateAggs bucket key: 開幕の新た
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 355.0
** resTitleRateAggs bucket key: 高校生
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 337.0
** resTitleRateAggs bucket key: 中間テスト
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 333.0
** resTitleRateAggs bucket key: 亭主の真相
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 292.0
** resTitleRateAggs bucket key: 試合の一瞬
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 292.0
■ 少年ランキング ----------------------------------------
■■■ 検索対象インデックス: sample-ranking-qiita-boy*
aggs name: range
* resDateRangeAggs bucket key: 2018/01/14T15:00:00+0000-2018/01/15T15:00:00+0000
* resDateRangeAggs bucket doc_count: 141
* resDateRangeAggs bucket from: 2018/01/14T15:00:00+0000
* resDateRangeAggs bucket to: 2018/01/15T15:00:00+0000
* resDateRangeAggs bucket aggs: title_rate
** resTitleRateAggs bucket key: 高校生
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 337.0
** resTitleRateAggs bucket key: ノックアウトと2話
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 287.0
** resTitleRateAggs bucket key: 成績のフォワードと様子
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 278.0
** resTitleRateAggs bucket key: 心
** resTitleRateAggs bucket doc_count: 4
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 267.0
** resTitleRateAggs bucket key: 選手権
** resTitleRateAggs bucket doc_count: 3
** resTitleRateAggs bucket aggs: sum_rate
*** resSumRateAggs sum name: sum_rate
*** resSumRateAggs sum value: 260.0
Kibanaやクエリと同じ結果を取得することができました!
あ、集計クエリとして当たり前ですが、aggs
では集計に使用したフィールドのみ取得可能で、それ以外(今回ですと著者等)は取得できません。
aggs
内のフィールドと紐付けさせた結果を、query
で一発取得できたら楽なのですが、どうもそんな風にはなってくれません。
query
のsize
を増やせば、取得できる可能性は大きくなりますが、query
のsize
は10000件
が確か上限だったと思いますので、この範囲内で確実に取れる、という確信がなければ、おとなしく別クエリで取得した方がよいでしょう。
書籍のジャンルをサブインデックスとして定義したのは、検索範囲の絞り込みを容易にするためです。
インデックスとして絞り込みを入れると、長期範囲の検索をするときに、少しでも負担軽くなるかなー、と。
もちろん、クエリ条件に入れる事も可能です。
// ジャンル絞り込み用クエリ
QueryBuilder queryBuilder = QueryBuilders.termQuery("index", subIndex);
// 全体クエリ
SearchRequestBuilder requestBuilder = client.prepareSearch(searchIndex)
.addAggregation(dateRangeAggs);
// サブインデックスが指定されている場合、クエリを追加する
if (StringUtils.isNotEmpty(subIndex)) {
requestBuilder.setQuery(queryBuilder);
}
こんな感じで、QueryBuilder
を使用しても絞り込みできます。
日付範囲の切替
日付範囲を週単位や月単位など、変更したい場合には、dateRangeAggs
のaddRange
内を切り替えてあげます。
設定から自由に変えられるようにしたいので、範囲定義情報も作成しました。
// 日付範囲用Aggs
AggregationBuilder dateRangeAggs = AggregationBuilders.dateRange("range")
// 日付範囲の対象は、処理実行日
.field("execDate")
// 日付範囲を、検索範囲のFromからToまでの間を設定する
// 例) 実行日10日前 から 実行日当日0時まで
//"searchRange": {
// "from": "-10",
// "fromChrono": "days",
// "to": "0",
// "toChrono": "days"
//}
.addRange(
DT_INDEX_FORMATTER.format(
// 実行日に検索範囲設定のFromを、指定ユニット単位で加算する
setting.getNow()
.plus(setting.getSearchRange().getFrom()
, setting.getSearchRange().findFromChronoUnit())
// 日以降を切り落とす(0時ぴったりにする)
.truncatedTo(ChronoUnit.DAYS))
, DT_INDEX_FORMATTER.format(
// 実行日に検索範囲設定のToを、指定ユニット単位で加算する
setting.getNow()
.plus(setting.getSearchRange().getTo()
, setting.getSearchRange().findToChronoUnit())
// 日以降を切り落とす(0時ぴったりにする)
.truncatedTo(ChronoUnit.DAYS))
)
// subaggsに、title_rate用 Aggsを追加
.subAggregation(titleRateAggs)
;
public class RangeBean {
private int from;
private String fromChrono;
private int to;
private String toChrono;
public ChronoUnit findFromChronoUnit() {
return findChronoUnit(fromChrono);
}
public ChronoUnit findToChronoUnit() {
return findChronoUnit(toChrono);
}
/**
* Chrono文字列をユニットに変更
*
* @param chrono
* @return
*/
public ChronoUnit findChronoUnit(String chrono) {
if (StringUtils.equals(chrono, "days")) {
return ChronoUnit.DAYS;
}
if (StringUtils.equals(chrono, "weeks")) {
return ChronoUnit.WEEKS;
}
if (StringUtils.equals(chrono, "months")) {
return ChronoUnit.MONTHS;
}
if (StringUtils.equals(chrono, "years")) {
return ChronoUnit.YEARS;
}
return null;
}
//(略)
}
これで、一ヶ月単位でも、三年単位でも好きに範囲を指定することができます。
次回はJava-APIでのクエリの指定と取得方法を書いてみたいと思います。