LoginSignup
5
4

More than 5 years have passed since last update.

Elasticsearch Java-API を使う (Aggregation編)

Last updated at Posted at 2018-01-20

この記事は、Elasticsearch Java-API を使う (BulkRequest 登録編) の続きです。
要件やデータ構造などは、上記を参照してください。

さて、Bulkで登録したデータを色々扱っていきたいと思います。

要件

  • 日別売上ランキングTOP10を表示したい。
  • 日付範囲は、日別・週別・月別など切り替えられるようにしたい。
  • 売上ランキングの件数は変更可能にしたい。
  • 総合とジャンル別でも切り分けて見れるようにしたい。

取得方法

Kibana

image.png

ある特定の1日のTOP5を表示できるようになりました。

クエリ

上記のような構成をクエリで表現すると、以下のようになります。

GET sample-ranking-qiita-*/_search
{
  "aggs": {
    "range": {
      "date_range": {
        "field": "execDate",
        "ranges": [
          {
            "from": "2018/01/15T00:00:00+09:00",
            "to": "2018/01/15T23:59:59+09:00"
          }
        ]
      },
      "aggs": {
        "title_rate": {
          "terms": {
            "field": "title.keyword",
            "size": 5,
            "order": {
              "sum_rate": "desc"
            }
          },
          "aggs": {
            "sum_rate": {
              "sum": {
                "field": "rate"
              }
            }
          }
        }
      }
    }
  }
}

aggsの入れ子がちょっと見づらいですかね。
以下の3つのaggsを定義しています。

  1. sum_rateという名前で、rateを合計する
  2. title_rateという名前で、title別にsum_rateを、降順に表示する
  3. title_rateの集計範囲は、execDatefromからtoの間とする。

SQL

参考までに、SQLですと、こんなイメージです。
(試してはいないので、間違ってたらすみません)

ranking.sql
SELECT
    ranking
    , title
    , sum_rate
FROM (
    select
        ROW_NUMBER() OVER (ORDER BY sum_rate DESC) AS ranking
        , tb1.title
        , tb1.sum_rate 
    FROM (
        SELECT
            title
            , SUM(rate) as sum_rate 
        FROM
            tbl_ranking
        WHERE
            execDate >= '2018/01/15T00:00:00+09:00'
            AND execDate <= '2018/01/15T23:59:59+09:00'
        GROUP BY
            title
    ) tb1
)
WHERE
    ranking <= 5

取得結果

クエリの結果を取得すると、aggregationsタグの中に、aggsの結果がネストされて入っています。
bucketsタグ内のリストが各aggsの結果リストとなります。
Kibanaで表示したのと同じタイトルが並んでいますね。レート合計値も合致しています。

{
  "took": 426,
  "timed_out": false,
  "_shards": {
    "total": 15,
    "successful": 15,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 24800,
    "max_score": 1,
    "hits": [
      (略)
    ]
  },
  "aggregations": {
    "range": {
      "buckets": [
        {
          // 3. title_rateの集計日付範囲 ------------------------------
          "key": "2018/01/14T15:00:00+0000-2018/01/15T14:59:59+0000",
          "from": 1515942000000,
          "from_as_string": "2018/01/14T15:00:00+0000",
          "to": 1516028399000,
          "to_as_string": "2018/01/15T14:59:59+0000",
          "doc_count": 400,
          // 2. title別sum_rate降順 ------------------------------
          "title_rate": {
            "doc_count_error_upper_bound": -1,
            "sum_other_doc_count": 380,
            "buckets": [
              {
                "key": "開幕の新た",
                "doc_count": 4,
                "sum_rate": {
                  // 1. rate合計 ------------
                  "value": 355
                }
              },
              {
                "key": "高校生",
                "doc_count": 4,
                "sum_rate": {
                  "value": 337
                }
              },
              {
                "key": "中間テスト",
                "doc_count": 4,
                "sum_rate": {
                  "value": 333
                }
              },
              {
                "key": "亭主の真相",
                "doc_count": 4,
                "sum_rate": {
                  "value": 292
                }
              },
              {
                "key": "試合の一瞬",
                "doc_count": 4,
                "sum_rate": {
                  "value": 292
                }
              }
            ]
          }
        }
      ]
    }
  }
}

Java-API

さて、それでは、Java-APIで上記クエリを取得していきましょう。

AggsSampleViewer.java
public class AggsSampleViewer {

    private static Logger logger = LoggerFactory.getLogger(AggsSampleViewer.class);

    // 年月インデックスのフォーマット(yyyyMM)
    private static DateTimeFormatter YM_INDEX_FORMATTER;
    // 実行日付のフォーマット(yyyy/MM/dd'T'HH:mm:ss+09:00)
    private static DateTimeFormatter DT_INDEX_FORMATTER;

    // 各種設定情報
    QiitaSettingBean setting;


    /**
     * コンストラクタ
     * @param setting 設定情報
     */
    public AggsSampleViewer(QiitaSettingBean setting) {
        super();
        this.setting = setting;

        DT_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getExecDateFormat());
        YM_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getIndexYmFormat());
    }

    /**
     * 取得処理実行
     *
     * @param subIndex サブインデックス名(全体取得時はnull)
     * @throws Exception
     */
    public void execute(String subIndex) throws Exception {

        // ELSをJavaで扱うためのクライアントを作成する
        // setting.getElasticearch().getAddress(): IPアドレス
        // setting.getElasticearch().getPort(): ポート番号(通常は9300)
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
                new InetSocketTransportAddress(InetAddress.getByName(setting.getElasticearch().getAddress()),
                        setting.getElasticearch().getPort()));

        // 検索用Index生成(サブインデックス指定時はサブも入れる)
        String searchIndex = setting.getElsImport().getIndex();
        if (StringUtils.isNotEmpty(subIndex)) {
            searchIndex = searchIndex + "-" + subIndex;
        }
        searchIndex += "*";

        logger.debug("■■■ 検索対象インデックス: " + searchIndex);

        // sum_rate用 Aggs:rateの合計値を算出する
        AggregationBuilder sumRateAggs = AggregationBuilders.sum("sum_rate").field("rate");

        // title_rate用 Aggs:title別rate降順表示
        AggregationBuilder titleRateAggs = AggregationBuilders.terms("title_rate")
                // title別に集計する
                .field("title.keyword")
                // setting.getRankingLimit(): ランキング個数
                .size(setting.getRankingLimit())
                // 表示順は、sum_rateの降順
                .order(Order.aggregation("sum_rate", false))
                // subaggsに、sum_rate用 Aggsを追加
                .subAggregation(sumRateAggs)
                ;

        // 日付範囲用Aggs
        AggregationBuilder dateRangeAggs = AggregationBuilders.dateRange("range")
                // 日付範囲の対象は、処理実行日
                .field("execDate")
                // 日付範囲を、処理対象日の0時ピッタリの前日~当日0時ピッタリに設定する
                // 例) setting.getNow(): 2018/01/16 13:20:35
                //      from:2018/01/15 00:00:00
                //      to  :2018/01/16 00:00:00
                .addRange(
                        setting.getNow().minusDays(1).truncatedTo(ChronoUnit.DAYS).format(DT_INDEX_FORMATTER)
                        , setting.getNow().truncatedTo(ChronoUnit.DAYS).format(DT_INDEX_FORMATTER)
                )
                // subaggsに、title_rate用 Aggsを追加
                .subAggregation(titleRateAggs)
                ;

        // 全体クエリ
        SearchRequestBuilder requestBuilder = client.prepareSearch(searchIndex)
                .addAggregation(dateRangeAggs);


        // 検索結果を取得する
        SearchResponse res = requestBuilder.get();

        // 全体クエリのAggsの結果取得
        for (Aggregation aggs : res.getAggregations()) {
            logger.debug("aggs name: "+ aggs.getName());
        }

        // 【日付範囲用Aggs】(タイプ: date_range) ----------------
        InternalDateRange resDateRangeAggs = res.getAggregations().get("range");
        // 日付範囲用Aggs内のbucketを確認する
        for (InternalDateRange.Bucket dateRangeBucket : resDateRangeAggs.getBuckets()) {
            // keyとdoc_countはどのBucketでも取得可能
            logger.debug("* resDateRangeAggs bucket key: "+ dateRangeBucket.getKey());
            logger.debug("* resDateRangeAggs bucket doc_count: "+ dateRangeBucket.getDocCount());
            // 日付範囲用Bucketを参照しているため、FromとToが取得可能
            logger.debug("* resDateRangeAggs bucket from: "+ dateRangeBucket.getFromAsString());
            logger.debug("* resDateRangeAggs bucket to: "+ dateRangeBucket.getToAsString());

            // 日付範囲用Aggs内のAggs結果確認
            for (Aggregation aggs : dateRangeBucket.getAggregations()) {
                logger.debug("* resDateRangeAggs bucket aggs: "+ aggs.getName());
            }

            // 【title_rate用 Aggs】(タイプ: terms) ----------------
            Terms resTitleRateAggs = dateRangeBucket.getAggregations().get("title_rate");

            // title_rate用 Aggs内のbucketを確認する
            for (Terms.Bucket termBucket : resTitleRateAggs.getBuckets()) {
                logger.debug("** resTitleRateAggs bucket key: "+ termBucket.getKey());
                logger.debug("** resTitleRateAggs bucket doc_count: "+ termBucket.getDocCount());

                // title_rate用 Aggs結果確認
                for (Aggregation aggs : termBucket.getAggregations()) {
                    logger.debug("** resTitleRateAggs bucket aggs: "+ aggs.getName());
                }

                // 【sum_rate用 Aggs】(タイプ: sum) ----------------
                Sum resSumRateAggs = termBucket.getAggregations().get("sum_rate");

                // Sumの中にAggsはなく、sum結果が取得できる
                logger.debug("*** resSumRateAggs sum name: "+ resSumRateAggs.getName());
                logger.debug("*** resSumRateAggs sum value: "+ resSumRateAggs.getValueAsString());
            }
        }
    }
}

クエリと同じネスト構造でAggsを定義していきます。
どのAggsを使って、どのBucketが返ってくるかを調べるのがちょっと大変ですかね ^^;
この辺は公式を確認するのが一番です。
#公式はどう定義するかは書いてあるのですが、その結果どういった情報が返ってくるかは書かれてません…自動補完等を駆使して検討をつけていくとかですかね…

実行はこのように行います。

main.java
AggsSampleViewer viewer = new AggsSampleViewer(setting);

// 全体ランキング情報生成
logger.debug("■ 全体ランキング ----------------------------------------");
viewer.execute(null);

// 少年ランキング情報生成
logger.debug("■ 少年ランキング ----------------------------------------");
viewer.execute("boy");

実行結果

■ 全体ランキング ---------------------------------------- 
■■■ 検索対象インデックス: sample-ranking-qiita* 
aggs name: range 
* resDateRangeAggs bucket key: 2018/01/14T15:00:00+0000-2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket doc_count: 400 
* resDateRangeAggs bucket from: 2018/01/14T15:00:00+0000 
* resDateRangeAggs bucket to: 2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket aggs: title_rate 
** resTitleRateAggs bucket key: 開幕の新た 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 355.0 
** resTitleRateAggs bucket key: 高校生 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 337.0 
** resTitleRateAggs bucket key: 中間テスト 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 333.0 
** resTitleRateAggs bucket key: 亭主の真相 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 292.0 
** resTitleRateAggs bucket key: 試合の一瞬 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 292.0 
■ 少年ランキング ---------------------------------------- 
■■■ 検索対象インデックス: sample-ranking-qiita-boy* 
aggs name: range 
* resDateRangeAggs bucket key: 2018/01/14T15:00:00+0000-2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket doc_count: 141 
* resDateRangeAggs bucket from: 2018/01/14T15:00:00+0000 
* resDateRangeAggs bucket to: 2018/01/15T15:00:00+0000 
* resDateRangeAggs bucket aggs: title_rate 
** resTitleRateAggs bucket key: 高校生 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 337.0 
** resTitleRateAggs bucket key: ノックアウトと2話 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 287.0 
** resTitleRateAggs bucket key: 成績のフォワードと様子 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 278.0 
** resTitleRateAggs bucket key: 心 
** resTitleRateAggs bucket doc_count: 4 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 267.0 
** resTitleRateAggs bucket key: 選手権 
** resTitleRateAggs bucket doc_count: 3 
** resTitleRateAggs bucket aggs: sum_rate 
*** resSumRateAggs sum name: sum_rate 
*** resSumRateAggs sum value: 260.0 

Kibanaやクエリと同じ結果を取得することができました!
あ、集計クエリとして当たり前ですが、aggsでは集計に使用したフィールドのみ取得可能で、それ以外(今回ですと著者等)は取得できません。
aggs内のフィールドと紐付けさせた結果を、queryで一発取得できたら楽なのですが、どうもそんな風にはなってくれません。
querysizeを増やせば、取得できる可能性は大きくなりますが、querysize10000件が確か上限だったと思いますので、この範囲内で確実に取れる、という確信がなければ、おとなしく別クエリで取得した方がよいでしょう。

書籍のジャンルをサブインデックスとして定義したのは、検索範囲の絞り込みを容易にするためです。
インデックスとして絞り込みを入れると、長期範囲の検索をするときに、少しでも負担軽くなるかなー、と。
もちろん、クエリ条件に入れる事も可能です。

AggsSampleViewer2.java
        // ジャンル絞り込み用クエリ
        QueryBuilder queryBuilder = QueryBuilders.termQuery("index", subIndex);


        // 全体クエリ
        SearchRequestBuilder requestBuilder = client.prepareSearch(searchIndex)
                .addAggregation(dateRangeAggs);

        // サブインデックスが指定されている場合、クエリを追加する
        if (StringUtils.isNotEmpty(subIndex)) {
            requestBuilder.setQuery(queryBuilder);
        }

こんな感じで、QueryBuilderを使用しても絞り込みできます。

日付範囲の切替

日付範囲を週単位や月単位など、変更したい場合には、dateRangeAggsaddRange内を切り替えてあげます。
設定から自由に変えられるようにしたいので、範囲定義情報も作成しました。

AggsSampleViewer3.java
        // 日付範囲用Aggs
        AggregationBuilder dateRangeAggs = AggregationBuilders.dateRange("range")
                // 日付範囲の対象は、処理実行日
                .field("execDate")
                // 日付範囲を、検索範囲のFromからToまでの間を設定する
                // 例) 実行日10日前 から 実行日当日0時まで
                //"searchRange": {
                //      "from": "-10",
                //      "fromChrono": "days",
                //      "to": "0",
                //      "toChrono": "days"
                //}
                .addRange(
                        DT_INDEX_FORMATTER.format(
                                // 実行日に検索範囲設定のFromを、指定ユニット単位で加算する
                                setting.getNow()
                                    .plus(setting.getSearchRange().getFrom()
                                            , setting.getSearchRange().findFromChronoUnit())
                                // 日以降を切り落とす(0時ぴったりにする)
                                .truncatedTo(ChronoUnit.DAYS))
                        , DT_INDEX_FORMATTER.format(
                                // 実行日に検索範囲設定のToを、指定ユニット単位で加算する
                                setting.getNow()
                                    .plus(setting.getSearchRange().getTo()
                                            , setting.getSearchRange().findToChronoUnit())
                                // 日以降を切り落とす(0時ぴったりにする)
                                .truncatedTo(ChronoUnit.DAYS))
                )
                // subaggsに、title_rate用 Aggsを追加
                .subAggregation(titleRateAggs)
                ;
RangeBean.java
public class RangeBean {
    private int from;
    private String fromChrono;
    private int to;
    private String toChrono;

    public ChronoUnit findFromChronoUnit() {
        return findChronoUnit(fromChrono);
    }

    public ChronoUnit findToChronoUnit() {
        return findChronoUnit(toChrono);
    }

    /**
     * Chrono文字列をユニットに変更
     *
     * @param chrono
     * @return
     */
    public ChronoUnit findChronoUnit(String chrono) {
        if (StringUtils.equals(chrono, "days")) {
            return ChronoUnit.DAYS;
        }
        if (StringUtils.equals(chrono, "weeks")) {
            return ChronoUnit.WEEKS;
        }
        if (StringUtils.equals(chrono, "months")) {
            return ChronoUnit.MONTHS;
        }
        if (StringUtils.equals(chrono, "years")) {
            return ChronoUnit.YEARS;
        }

        return null;
    }

    //(略)
}

これで、一ヶ月単位でも、三年単位でも好きに範囲を指定することができます。

次回はJava-APIでのクエリの指定と取得方法を書いてみたいと思います。

5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4