Java
Elasticsearch

Elasticsearch Java-API を使う (BulkRequest 登録編) 【追記あり】

Elasticsearch では、Java APIを使う事ができます。

Elasticsearch Java API 公式ドキュメント

これがなかなかに便利なのですが、あまり情報が出回ってないようなので、まとめてみました。

【2018/01/12】追記

本記事で使用しているTransportClientは、次のメジャーバージョン7から非推奨、8で削除されるそうです。
そのため、High Level REST Clientへの移行が推奨されております。
詳細はコメント欄をご確認ください。
johtani様 、ご指摘ありがとうございました。

要件

  • とある本屋チェーン店における、売り上げランキング推移が見たい
  • 支店情報や書籍情報など詳しい情報は別のDBで管理している
  • 「何冊売れたか」という実際の数値データも別のDBで管理している
  • ランキング推移や分析などを、ELS(+Kibana)で行いたい

こんなイメージで作っていきたいと思います。

開発環境

  • Elasticsearch 5.6.5 (gradle … 'org.elasticsearch.client:transport:5.6.5')
  • Kibana 5.6.5
  • Java 1.8

データ構成

  • ランキング情報
    • 支店名
    • 書名
    • 著者名
    • ジャンル
    • ランキング
    • レート
    • 登録日

RDBですと、支店情報と書名情報を別々のTBLに持たせて、売上TBLを作って繋げて…という流れになると思いますが、ELSの場合、1行1件のデータとなりますので、このようなレコードを複数件登録します。
ELSのレコードは、ビューのようなイメージを持つと作りやすい気がします。

このレコードで特徴的なのは、レート(重みづけ)でしょうか。
ランキングは通常、1位が最上位で、以下、数字が増えていくごとにランキングが低くなっていきます。
ですが、このランキングを折れ線グラフのY軸で表示しようとすると、Y軸の値が上にいくほどが小さい値という、通常とは逆転した表示をしなくてはならず、ELSでは表現が難しいです。
参考(というか私の過去の質問): 折れ線グラフでのランキング表示

なので、グラフの表示を行いやすいように、ランキングの値を重みづけという形で変換したのが、レートです。

レート算出方法:(ランキング総数) - (自身のランキング値) + 1

ランキング総数が動くと、グラフの表示が変になってしまうので、100なり1000なり、常に固定した値を用います。

任意の方法で、ELSにインポート可能な状態のJSONを作成します。

JSON

branch  title   author  genre   ranking rate    execDate
新宿  タイトルA   著者安   comic-girl  1   3   2018/01/11T13:00:00+09:00
新宿  タイトルB   著者以   comic-male  2   2   2018/01/11T13:00:00+09:00
新宿  タイトルC   著者宇   comic-girl  3   1   2018/01/11T13:00:00+09:00
丸の内   タイトルC   著者宇   comic-girl  1   3   2018/01/11T13:00:00+09:00
丸の内   タイトルA   著者安   comic-girl  2   2   2018/01/11T13:00:00+09:00
丸の内   タイトルD   著者衣   comic-boy   3   1   2018/01/11T13:00:00+09:00
品川  タイトルB   著者以   comic-male  1   3   2018/01/11T13:00:00+09:00
品川  タイトルC   著者宇   comic-girl  2   2   2018/01/11T13:00:00+09:00
品川  タイトルE   著者於   comic-femail    3   1   2018/01/11T13:00:00+09:00
ranking.json
[{"branch":"新宿","title":"タイトルA","author":"著者安","genre":"comic-girl","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"新宿","title":"タイトルB","author":"著者以","genre":"comic-male","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"新宿","title":"タイトルC","author":"著者宇","genre":"comic-girl","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"丸の内","title":"タイトルC","author":"著者宇","genre":"comic-girl","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"丸の内","title":"タイトルA","author":"著者安","genre":"comic-girl","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"丸の内","title":"タイトルD","author":"著者衣","genre":"comic-boy","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"品川","title":"タイトルB","author":"著者以","genre":"comic-male","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"品川","title":"タイトルC","author":"著者宇","genre":"comic-girl","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"品川","title":"タイトルE","author":"著者於","genre":"comic-femail","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"}]

Bean

これに対応する、ランキング情報Beanも作成します。

RankingDataBean.java
public class RankingDataBean extends ElsDataBaseBean {

    // 支店名
    private String branch;
    // 書名
    private String title;
    // 著者名
    private String author;
    // ジャンル
    private String genre;
    // ランキング
    private int ranking;
    // レート
    private int rate;
    // 登録日
    private Date execDate;

    ()
}

ここでの特徴は、ElsDataBaseBean を継承している点です。
中身はこんな感じです。

ElsDataBaseBean.java
public class ElsDataBaseBean {
    // インデックス
    private String index;

    ()
}

index だけ自前で持っています。
このBeanを継承するBeanであれば何でも登録できるようにImporterを作っていきます。

ELSへのインポート処理

ElsImporter.java
/**
 * ELSへBulkでデータをインポートする
 */
public class ElsImporter<T extends ElsDataBaseBean> {

    private static Logger logger = LoggerFactory.getLogger(ElsImporter.class);

    // 年月インデックスのフォーマット(yyyyMM)
    private static DateTimeFormatter YM_INDEX_FORMATTER;

    // 各種設定情報
    SettingBaseBean setting;

    // インポートするデータのクラス
    Class<T> clazz;

    /**
     * コンストラクタ
     * @param setting 各種設定情報
     * @param clazz インポートするデータのクラス
     */
    public ElsImporter(SettingBaseBean setting, Class<T> clazz) {
        this.setting = setting;
        this.clazz = clazz;

        YM_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getIndexYmFormat());
    }

    /**
     * インポートの実行
     */
    public boolean execute() throws Exception {

        logger.info("ElsImporter インポート処理開始 -----------------------------------------");

        // ELSをJavaで扱うためのクライアントを作成する
        // setting.getElasticearch().getAddress(): IPアドレス
        // setting.getElasticearch().getPort(): ポート番号(通常は9300)
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(
                        setting.getElasticearch().getAddress()), setting.getElasticearch().getPort()));

        // bulk準備
        BulkRequestBuilder bulkRequest = client.prepareBulk();

        // Gson準備
        // setting.getElasticearch().getExecDateFormat(): 
        //  登録日のフォーマット(yyyy/MM/dd'T'HH:mm:ss+09:00)
        Gson gson = new GsonBuilder().setDateFormat(setting.getElasticearch().getExecDateFormat()).create();

        // JSONの入っているディレクトリ内ファイル全対象とする
        File dir = new File(setting.getRootDir(), setting.getSubDir().getJsonDir());
        for (File file : dir.listFiles()) {

            logger.debug("■■ Import: "+ file.getName());

            // JSONデータをリストに変換
            List<T> list = gson.fromJson(FileUtils.readFileToString(file, "UTF-8"), new ListOfSomething<T>(clazz));

            // ランキング情報を1件ずつ取得してbulkに追加する
            for (T data : list) {
                // 登録先の index
                // setting.getElasticearch().getIndex(): ELSのindex(共通部分)
                // setting.getNow(): 処理実行時のLocalDateTime
                // 例1: setting.getElasticearch().getIndex()   : ranking
                //      data.getIndex()                     : comic-girl
                //      setting.getNow()                    : 2018/01/18 12:00:00
                //      生成index                         : ranking-comic-girl-201801
                // 例2: setting.getElasticearch().getIndex()   : ranking
                //      data.getIndex()                     : null
                //      setting.getNow()                    : 2018/01/18 12:00:00
                //      生成index                         : ranking-201801
                String index;
                if (StringUtils.isEmpty(data.getIndex())) {
                    // サブインデックスなしバージョン
                    index = setting.getElasticearch().getIndex()
                            + "-" + YM_INDEX_FORMATTER.format(setting.getNow());
                }
                else {
                    // サブインデックスありバージョン
                    index = setting.getElasticearch().getIndex()
                            + "-" + data.getIndex()
                            + "-" + YM_INDEX_FORMATTER.format(setting.getNow());
                }

                // BulkにJSON形式のデータを追加する
                // setting.getElasticearch().getType(): ELSのtype
                bulkRequest.add(client.prepareIndex(index
                        , setting.getElasticearch().getType())
                    .setSource(gson.toJson(data), XContentType.JSON));
            }
        }

        // Bulk実行
        BulkResponse bulkResponse = bulkRequest.execute().get();

        // Bulkが失敗しているか(失敗しているとtrue)
        if (bulkResponse.hasFailures()) {
            logger.error("ElsImporter bulk失敗");

            // クライアントを終了する
            client.close();

            return false;
        }

        logger.info("ElsImporter インポート処理終了 -----------------------------------------");

        // クライアントを終了する
        client.close();

        return true;
    }

    /**
     * Gsonでリスト内のオブジェクトを指定した型に変換する
     */
    class ListOfSomething<X> implements ParameterizedType {

        private Class<?> wrapped;

        public ListOfSomething(Class<X> wrapped) {
            this.wrapped = wrapped;
        }

        public Type[] getActualTypeArguments() {
            return new Type[] {wrapped};
        }

        public Type getRawType() {
            return List.class;
        }

        public Type getOwnerType() {
            return null;
        }

    }
}

参考: Java Type Generic as Argument for GSON

インポート対象のデータクラス情報自身はImporterには持たせていません。
ジェネリクスとクラス情報のみで対応しているので、今回のランキング情報以外にも流用可能です。
ElsDataBaseBeanで持っていたindexはサブインデックスのような役割を果たします。
Kibanaで表示したり、Java-APIで取得する粒度に応じて付与するといいと思います。
ELSのインデックスの粒度はlogstashだと年月日がデフォルトですが、これもどのくらいの期間にまたがってみる事が多いか、に基づいて検討するといいでしょう。

exec.java
ElsImporter<RankingDataBean> importer 
    = new ElsImporter<RankingDataBean>(setting, RankingDataBean.class);
boolean result = importer.execute();

呼び出しはこのような方法で行います。
RankingDataBeanを3か所も記載するのは、ちょっとアレなのですが、gson対応のため、諦めました。

所感

BulkRequestのインポート処理をできるだけ汎用的にできるよう、頑張ってみました。
次回以降、この登録したデータをゴリゴリしてみたいと思います。