Java
Elasticsearch

Elasticsearch Java-API を使う (Aggregation編)

この記事は、Elasticsearch Java-API を使う (BulkRequest 登録編) の続きです。
要件やデータ構造などは、上記を参照してください。

さて、Bulkで登録したデータを色々扱っていきたいと思います。

要件

  • 日別売上ランキングTOP10を表示したい。
  • 日付範囲は、日別・週別・月別など切り替えられるようにしたい。
  • 売上ランキングの件数は変更可能にしたい。
  • 総合とジャンル別でも切り分けて見れるようにしたい。

取得方法

Kibana

image.png

ある特定の1日のTOP5を表示できるようになりました。

クエリ

上記のような構成をクエリで表現すると、以下のようになります。

GET sample-ranking-qiita-*/_search
{
  "aggs": {
    "range": {
      "date_range": {
        "field": "execDate",
        "ranges": [
          {
            "from": "2018/01/15T00:00:00+09:00",
            "to": "2018/01/15T23:59:59+09:00"
          }
        ]
      },
      "aggs": {
        "title_rate": {
          "terms": {
            "field": "title.keyword",
            "size": 5,
            "order": {
              "sum_rate": "desc"
            }
          },
          "aggs": {
            "sum_rate": {
              "sum": {
                "field": "rate"
              }
            }
          }
        }
      }
    }
  }
}

aggsの入れ子がちょっと見づらいですかね。
以下の3つのaggsを定義しています。

  1. sum_rateという名前で、rateを合計する
  2. title_rateという名前で、title別にsum_rateを、降順に表示する
  3. title_rateの集計範囲は、execDatefromからtoの間とする。

SQL

参考までに、SQLですと、こんなイメージです。
(試してはいないので、間違ってたらすみません)

ranking.sql
SELECT
    ranking
    , title
    , sum_rate
FROM (
    select
        ROW_NUMBER() OVER (ORDER BY sum_rate DESC) AS ranking
        , tb1.title
        , tb1.sum_rate 
    FROM (
        SELECT
            title
            , SUM(rate) as sum_rate 
        FROM
            tbl_ranking
        WHERE
            execDate >= '2018/01/15T00:00:00+09:00'
            AND execDate <= '2018/01/15T23:59:59+09:00'
        GROUP BY
            title
    ) tb1
)
WHERE
    ranking <= 5

取得結果

クエリの結果を取得すると、aggregationsタグの中に、aggsの結果がネストされて入っています。
bucketsタグ内のリストが各aggsの結果リストとなります。
Kibanaで表示したのと同じタイトルが並んでいますね。レート合計値も合致しています。

{
  "took": 426,
  "timed_out": false,
  "_shards": {
    "total": 15,
    "successful": 15,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 24800,
    "max_score": 1,
    "hits": [
      (略)
    ]
  },
  "aggregations": {
    "range": {
      "buckets": [
        {
          // 3. title_rateの集計日付範囲 ------------------------------
          "key": "2018/01/14T15:00:00+0000-2018/01/15T14:59:59+0000",
          "from": 1515942000000,
          "from_as_string": "2018/01/14T15:00:00+0000",
          "to": 1516028399000,
          "to_as_string": "2018/01/15T14:59:59+0000",
          "doc_count": 400,
          // 2. title別sum_rate降順 ------------------------------
          "title_rate": {
            "doc_count_error_upper_bound": -1,
            "sum_other_doc_count": 380,
            "buckets": [
              {
                "key": "開幕の新た",
                "doc_count": 4,
                "sum_rate": {
                  // 1. rate合計 ------------
                  "value": 355
                }
              },
              {
                "key": "高校生",
                "doc_count": 4,
                "sum_rate": {
                  "value": 337
                }
              },
              {
                "key": "中間テスト",
                "doc_count": 4,
                "sum_rate": {
                  "value": 333
                }
              },
              {
                "key": "亭主の真相",
                "doc_count": 4,
                "sum_rate": {
                  "value": 292
                }
              },
              {
                "key": "試合の一瞬",
                "doc_count": 4,
                "sum_rate": {
                  "value": 292
                }
              }
            ]
          }
        }
      ]
    }
  }
}

Java-API

さて、それでは、Java-APIで上記クエリを取得していきましょう。

AggsSampleViewer.java
public class AggsSampleViewer {

    private static Logger logger = LoggerFactory.getLogger(AggsSampleViewer.class);

    // 年月インデックスのフォーマット(yyyyMM)
    private static DateTimeFormatter YM_INDEX_FORMATTER;
    // 実行日付のフォーマット(yyyy/MM/dd'T'HH:mm:ss+09:00)
    private static DateTimeFormatter DT_INDEX_FORMATTER;

    // 各種設定情報
    QiitaSettingBean setting;


    /**
     * コンストラクタ
     * @param setting 設定情報
     */
    public AggsSampleViewer(QiitaSettingBean setting) {
        super();
        this.setting = setting;

        DT_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getExecDateFormat());
        YM_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getIndexYmFormat());
    }

    /**
     * 取得処理実行
     *
     * @param subIndex サブインデックス名(全体取得時はnull)
     * @throws Exception
     */
    public void execute(String subIndex) throws Exception {

        // ELSをJavaで扱うためのクライアントを作成する
        // setting.getElasticearch().getAddress(): IPアドレス
        // setting.getElasticearch().getPort(): ポート番号(通常は9300)
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
                new InetSocketTransportAddress(InetAddress.getByName(setting.getElasticearch().getAddress()),
                        setting.getElasticearch().getPort()));

        // 検索用Index生成(サブインデックス指定時はサブも入れる)
        String searchIndex = setting.getElsImport().getIndex();
        if (StringUtils.isNotEmpty(subIndex)) {
            searchIndex = searchIndex + "-" + subIndex;
        }
        searchIndex += "*";

        logger.debug("■■■ 検索対象インデックス: " + searchIndex);

        // sum_rate用 Aggs:rateの合計値を算出する
        AggregationBuilder sumRateAggs = AggregationBuilders.sum("sum_rate").field("rate");

        // title_rate用 Aggs:title別rate降順表示
        AggregationBuilder titleRateAggs = AggregationBuilders.terms("title_rate")
                // title別に集計する
                .field("title.keyword")
                // setting.getRankingLimit(): ランキング個数
                .size(setting.getRankingLimit())
                // 表示順は、sum_rateの降順
                .order(Order.aggregation("sum_rate", false))
                // subaggsに、sum_rate用 Aggsを追加
                .subAggregation(sumRateAggs)
                ;

        // 日付範囲用Aggs
        AggregationBuilder dateRangeAggs = AggregationBuilders.dateRange("range")
                // 日付範囲の対象は、処理実行日
                .field("execDate")
                // 日付範囲を、処理対象日の0時ピッタリの前日~当日0時ピッタリに設定する
                // 例) setting.getNow(): 2018/01/16 13:20:35
                //      from:2018/01/15 00:00:00
                //      to  :2018/01/16 00:00:00
                .addRange(
                        setting.getNow().minusDays(1).truncatedTo(ChronoUnit.DAYS).format(DT_INDEX_FORMATTER)
                        , setting.getNow().truncatedTo(ChronoUnit.DAYS).format(DT_INDEX_FORMATTER)
                )
                // subaggsに、title_rate用 Aggsを追加
                .subAggregation(titleRateAggs)
                ;

        // 全体クエリ
        SearchRequestBuilder requestBuilder = client.prepareSearch(searchIndex)
                .addAggregation(dateRangeAggs);


        // 検索結果を取得する
        SearchResponse res = requestBuilder.get();

        // 全体クエリのAggsの結果取得
        for (Aggregation aggs : res.getAggregations()) {
            logger.debug("aggs name: "+ aggs.getName());
        }

        // 【日付範囲用Aggs】(タイプ: date_range) ----------------
        InternalDateRange resDateRangeAggs = res.getAggregations().get("range");
        // 日付範囲用Aggs内のbucketを確認する
        for (InternalDateRange.Bucket dateRangeBucket : resDateRangeAggs.getBuckets()) {
            // keyとdoc_countはどのBucketでも取得可能
            logger.debug("* resDateRangeAggs bucket key: "+ dateRangeBucket.getKey());
            logger.debug("* resDateRangeAggs bucket doc_count: "+ dateRangeBucket.getDocCount());
            // 日付範囲用Bucketを参照しているため、FromとToが取得可能
            logger.debug("* resDateRangeAggs bucket from: "+ dateRangeBucket.getFromAsString());
            logger.debug("* resDateRangeAggs bucket to: "+ dateRangeBucket.getToAsString());

            // 日付範囲用Aggs内のAggs結果確認
            for (Aggregation aggs : dateRangeBucket.getAggregations()) {
                logger.debug("* resDateRangeAggs bucket aggs: "+ aggs.getName());
            }

            // 【title_rate用 Aggs】(タイプ: terms) ----------------
            Terms resTitleRateAggs = dateRangeBucket.getAggregations().get("title_rate");

            // title_rate用 Aggs内のbucketを確認する
            for (Terms.Bucket termBucket : resTitleRateAggs.getBuckets()) {
                logger.debug("** resTitleRateAggs bucket key: "+ termBucket.getKey());
                logger.debug("** resTitleRateAggs bucket doc_count: "+ termBucket.getDocCount());

                // title_rate用 Aggs結果確認
                for (Aggregation aggs : termBucket.getAggregations()) {
                    logger.debug("** resTitleRateAggs bucket aggs: "+ aggs.getName());
                }

                // 【sum_rate用 Aggs】(タイプ: sum) ----------------
                Sum resSumRateAggs = termBucket.getAggregations().get("sum_rate");

                // Sumの中にAggsはなく、sum結果が取得できる
                logger.debug("*** resSumRateAggs sum name: "+ resSumRateAggs.getName());
                logger.debug("*** resSumRateAggs sum value: "+ resSumRateAggs.getValueAsString());
            }
        }
    }
}

クエリと同じネスト構造でAggsを定義していきます。
どのAggsを使って、どのBucketが返ってくるかを調べるのがちょっと大変ですかね ^^;
この辺は公式を確認するのが一番です。
#公式はどう定義するかは書いてあるのですが、その結果どういった情報が返ってくるかは書かれてません…自動補完等を駆使して検討をつけていくとかですかね…

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html

実行はこのように行います。

main.java
AggsSampleViewer viewer = new AggsSampleViewer(setting);

// 全体ランキング情報生成
logger.debug("■ 全体ランキング ----------------------------------------");
viewer.execute(null);

// 少年ランキング情報生成
logger.debug("■ 少年ランキング ----------------------------------------");
viewer.execute("boy");

実行結果

■ 全体ランキング ---------------------------------------- 
■■■ 検索対象インデックス: sample-ranking-qiita* 
aggs name: range 
* resDateRangeAggs bucket key: 2018/01/14T15:00:00+0000-2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket doc_count: 400 
* resDateRangeAggs bucket from: 2018/01/14T15:00:00+0000 
* resDateRangeAggs bucket to: 2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket aggs: title_rate 
** resTitleRateAggs bucket key: 開幕の新た 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 355.0 
** resTitleRateAggs bucket key: 高校生 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 337.0 
** resTitleRateAggs bucket key: 中間テスト 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 333.0 
** resTitleRateAggs bucket key: 亭主の真相 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 292.0 
** resTitleRateAggs bucket key: 試合の一瞬 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 292.0 
■ 少年ランキング ---------------------------------------- 
■■■ 検索対象インデックス: sample-ranking-qiita-boy* 
aggs name: range 
* resDateRangeAggs bucket key: 2018/01/14T15:00:00+0000-2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket doc_count: 141 
* resDateRangeAggs bucket from: 2018/01/14T15:00:00+0000 
* resDateRangeAggs bucket to: 2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket aggs: title_rate 
** resTitleRateAggs bucket key: 高校生 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 337.0 
** resTitleRateAggs bucket key: ノックアウトと2話 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 287.0 
** resTitleRateAggs bucket key: 成績のフォワードと様子 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 278.0 
** resTitleRateAggs bucket key: 心 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 267.0 
** resTitleRateAggs bucket key: 選手権 
** resTitleRateAggs bucket doc_count: 3 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 260.0 

Kibanaやクエリと同じ結果を取得することができました!
あ、集計クエリとして当たり前ですが、aggsでは集計に使用したフィールドのみ取得可能で、それ以外(今回ですと著者等)は取得できません。
aggs内のフィールドと紐付けさせた結果を、queryで一発取得できたら楽なのですが、どうもそんな風にはなってくれません。
querysizeを増やせば、取得できる可能性は大きくなりますが、querysize10000件が確か上限だったと思いますので、この範囲内で確実に取れる、という確信がなければ、おとなしく別クエリで取得した方がよいでしょう。

書籍のジャンルをサブインデックスとして定義したのは、検索範囲の絞り込みを容易にするためです。
インデックスとして絞り込みを入れると、長期範囲の検索をするときに、少しでも負担軽くなるかなー、と。
もちろん、クエリ条件に入れる事も可能です。

AggsSampleViewer2.java
        // ジャンル絞り込み用クエリ
        QueryBuilder queryBuilder = QueryBuilders.termQuery("index", subIndex);


        // 全体クエリ
        SearchRequestBuilder requestBuilder = client.prepareSearch(searchIndex)
                .addAggregation(dateRangeAggs);

        // サブインデックスが指定されている場合、クエリを追加する
        if (StringUtils.isNotEmpty(subIndex)) {
            requestBuilder.setQuery(queryBuilder);
        }

こんな感じで、QueryBuilderを使用しても絞り込みできます。

日付範囲の切替

日付範囲を週単位や月単位など、変更したい場合には、dateRangeAggsaddRange内を切り替えてあげます。
設定から自由に変えられるようにしたいので、範囲定義情報も作成しました。

AggsSampleViewer3.java
        // 日付範囲用Aggs
        AggregationBuilder dateRangeAggs = AggregationBuilders.dateRange("range")
                // 日付範囲の対象は、処理実行日
                .field("execDate")
                // 日付範囲を、検索範囲のFromからToまでの間を設定する
                // 例) 実行日10日前 から 実行日当日0時まで
                //"searchRange": {
                //      "from": "-10",
                //      "fromChrono": "days",
                //      "to": "0",
                //      "toChrono": "days"
                //}
                .addRange(
                        DT_INDEX_FORMATTER.format(
                                // 実行日に検索範囲設定のFromを、指定ユニット単位で加算する
                                setting.getNow()
                                    .plus(setting.getSearchRange().getFrom()
                                            , setting.getSearchRange().findFromChronoUnit())
                                // 日以降を切り落とす(0時ぴったりにする)
                                .truncatedTo(ChronoUnit.DAYS))
                        , DT_INDEX_FORMATTER.format(
                                // 実行日に検索範囲設定のToを、指定ユニット単位で加算する
                                setting.getNow()
                                    .plus(setting.getSearchRange().getTo()
                                            , setting.getSearchRange().findToChronoUnit())
                                // 日以降を切り落とす(0時ぴったりにする)
                                .truncatedTo(ChronoUnit.DAYS))
                )
                // subaggsに、title_rate用 Aggsを追加
                .subAggregation(titleRateAggs)
                ;
RangeBean.java
public class RangeBean {
    private int from;
    private String fromChrono;
    private int to;
    private String toChrono;

    public ChronoUnit findFromChronoUnit() {
        return findChronoUnit(fromChrono);
    }

    public ChronoUnit findToChronoUnit() {
        return findChronoUnit(toChrono);
    }

    /**
     * Chrono文字列をユニットに変更
     *
     * @param chrono
     * @return
     */
    public ChronoUnit findChronoUnit(String chrono) {
        if (StringUtils.equals(chrono, "days")) {
            return ChronoUnit.DAYS;
        }
        if (StringUtils.equals(chrono, "weeks")) {
            return ChronoUnit.WEEKS;
        }
        if (StringUtils.equals(chrono, "months")) {
            return ChronoUnit.MONTHS;
        }
        if (StringUtils.equals(chrono, "years")) {
            return ChronoUnit.YEARS;
        }

        return null;
    }

    //(略)
}

これで、一ヶ月単位でも、三年単位でも好きに範囲を指定することができます。

次回はJava-APIでのクエリの指定と取得方法を書いてみたいと思います。