Help us understand the problem. What is going on with this article?

Elasticsearch Java-API を使う (BulkRequest 登録編) 【追記あり】

More than 3 years have passed since last update.

Elasticsearch では、Java APIを使う事ができます。

Elasticsearch Java API 公式ドキュメント

これがなかなかに便利なのですが、あまり情報が出回ってないようなので、まとめてみました。

【2018/01/12】追記

本記事で使用しているTransportClientは、次のメジャーバージョン7から非推奨、8で削除されるそうです。
そのため、High Level REST Clientへの移行が推奨されております。
詳細はコメント欄をご確認ください。
johtani様 、ご指摘ありがとうございました。

要件

  • とある本屋チェーン店における、売り上げランキング推移が見たい
  • 支店情報や書籍情報など詳しい情報は別のDBで管理している
  • 「何冊売れたか」という実際の数値データも別のDBで管理している
  • ランキング推移や分析などを、ELS(+Kibana)で行いたい

こんなイメージで作っていきたいと思います。

開発環境

  • Elasticsearch 5.6.5 (gradle … 'org.elasticsearch.client:transport:5.6.5')
  • Kibana 5.6.5
  • Java 1.8

データ構成

  • ランキング情報
    • 支店名
    • 書名
    • 著者名
    • ジャンル
    • ランキング
    • レート
    • 登録日

RDBですと、支店情報と書名情報を別々のTBLに持たせて、売上TBLを作って繋げて…という流れになると思いますが、ELSの場合、1行1件のデータとなりますので、このようなレコードを複数件登録します。
ELSのレコードは、ビューのようなイメージを持つと作りやすい気がします。

このレコードで特徴的なのは、レート(重みづけ)でしょうか。
ランキングは通常、1位が最上位で、以下、数字が増えていくごとにランキングが低くなっていきます。
ですが、このランキングを折れ線グラフのY軸で表示しようとすると、Y軸の値が上にいくほどが小さい値という、通常とは逆転した表示をしなくてはならず、ELSでは表現が難しいです。
参考(というか私の過去の質問): 折れ線グラフでのランキング表示

なので、グラフの表示を行いやすいように、ランキングの値を重みづけという形で変換したのが、レートです。

レート算出方法:(ランキング総数) - (自身のランキング値) + 1

ランキング総数が動くと、グラフの表示が変になってしまうので、100なり1000なり、常に固定した値を用います。

任意の方法で、ELSにインポート可能な状態のJSONを作成します。

JSON

branch  title   author  genre   ranking rate    execDate
新宿  タイトルA   著者安   comic-girl  1   3   2018/01/11T13:00:00+09:00
新宿  タイトルB   著者以   comic-male  2   2   2018/01/11T13:00:00+09:00
新宿  タイトルC   著者宇   comic-girl  3   1   2018/01/11T13:00:00+09:00
丸の内   タイトルC   著者宇   comic-girl  1   3   2018/01/11T13:00:00+09:00
丸の内   タイトルA   著者安   comic-girl  2   2   2018/01/11T13:00:00+09:00
丸の内   タイトルD   著者衣   comic-boy   3   1   2018/01/11T13:00:00+09:00
品川  タイトルB   著者以   comic-male  1   3   2018/01/11T13:00:00+09:00
品川  タイトルC   著者宇   comic-girl  2   2   2018/01/11T13:00:00+09:00
品川  タイトルE   著者於   comic-femail    3   1   2018/01/11T13:00:00+09:00
ranking.json
[{"branch":"新宿","title":"タイトルA","author":"著者安","genre":"comic-girl","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"新宿","title":"タイトルB","author":"著者以","genre":"comic-male","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"新宿","title":"タイトルC","author":"著者宇","genre":"comic-girl","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"丸の内","title":"タイトルC","author":"著者宇","genre":"comic-girl","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"丸の内","title":"タイトルA","author":"著者安","genre":"comic-girl","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"丸の内","title":"タイトルD","author":"著者衣","genre":"comic-boy","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"品川","title":"タイトルB","author":"著者以","genre":"comic-male","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"品川","title":"タイトルC","author":"著者宇","genre":"comic-girl","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"品川","title":"タイトルE","author":"著者於","genre":"comic-femail","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"}]

Bean

これに対応する、ランキング情報Beanも作成します。

RankingDataBean.java
public class RankingDataBean extends ElsDataBaseBean {

    // 支店名
    private String branch;
    // 書名
    private String title;
    // 著者名
    private String author;
    // ジャンル
    private String genre;
    // ランキング
    private int ranking;
    // レート
    private int rate;
    // 登録日
    private Date execDate;

    ()
}

ここでの特徴は、ElsDataBaseBean を継承している点です。
中身はこんな感じです。

ElsDataBaseBean.java
public class ElsDataBaseBean {
    // インデックス
    private String index;

    ()
}

index だけ自前で持っています。
このBeanを継承するBeanであれば何でも登録できるようにImporterを作っていきます。

ELSへのインポート処理

ElsImporter.java
/**
 * ELSへBulkでデータをインポートする
 */
public class ElsImporter<T extends ElsDataBaseBean> {

    private static Logger logger = LoggerFactory.getLogger(ElsImporter.class);

    // 年月インデックスのフォーマット(yyyyMM)
    private static DateTimeFormatter YM_INDEX_FORMATTER;

    // 各種設定情報
    SettingBaseBean setting;

    // インポートするデータのクラス
    Class<T> clazz;

    /**
     * コンストラクタ
     * @param setting 各種設定情報
     * @param clazz インポートするデータのクラス
     */
    public ElsImporter(SettingBaseBean setting, Class<T> clazz) {
        this.setting = setting;
        this.clazz = clazz;

        YM_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getIndexYmFormat());
    }

    /**
     * インポートの実行
     */
    public boolean execute() throws Exception {

        logger.info("ElsImporter インポート処理開始 -----------------------------------------");

        // ELSをJavaで扱うためのクライアントを作成する
        // setting.getElasticearch().getAddress(): IPアドレス
        // setting.getElasticearch().getPort(): ポート番号(通常は9300)
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(
                        setting.getElasticearch().getAddress()), setting.getElasticearch().getPort()));

        // bulk準備
        BulkRequestBuilder bulkRequest = client.prepareBulk();

        // Gson準備
        // setting.getElasticearch().getExecDateFormat(): 
        //  登録日のフォーマット(yyyy/MM/dd'T'HH:mm:ss+09:00)
        Gson gson = new GsonBuilder().setDateFormat(setting.getElasticearch().getExecDateFormat()).create();

        // JSONの入っているディレクトリ内ファイル全対象とする
        File dir = new File(setting.getRootDir(), setting.getSubDir().getJsonDir());
        for (File file : dir.listFiles()) {

            logger.debug("■■ Import: "+ file.getName());

            // JSONデータをリストに変換
            List<T> list = gson.fromJson(FileUtils.readFileToString(file, "UTF-8"), new ListOfSomething<T>(clazz));

            // ランキング情報を1件ずつ取得してbulkに追加する
            for (T data : list) {
                // 登録先の index
                // setting.getElasticearch().getIndex(): ELSのindex(共通部分)
                // setting.getNow(): 処理実行時のLocalDateTime
                // 例1: setting.getElasticearch().getIndex()   : ranking
                //      data.getIndex()                     : comic-girl
                //      setting.getNow()                    : 2018/01/18 12:00:00
                //      生成index                         : ranking-comic-girl-201801
                // 例2: setting.getElasticearch().getIndex()   : ranking
                //      data.getIndex()                     : null
                //      setting.getNow()                    : 2018/01/18 12:00:00
                //      生成index                         : ranking-201801
                String index;
                if (StringUtils.isEmpty(data.getIndex())) {
                    // サブインデックスなしバージョン
                    index = setting.getElasticearch().getIndex()
                            + "-" + YM_INDEX_FORMATTER.format(setting.getNow());
                }
                else {
                    // サブインデックスありバージョン
                    index = setting.getElasticearch().getIndex()
                            + "-" + data.getIndex()
                            + "-" + YM_INDEX_FORMATTER.format(setting.getNow());
                }

                // BulkにJSON形式のデータを追加する
                // setting.getElasticearch().getType(): ELSのtype
                bulkRequest.add(client.prepareIndex(index
                        , setting.getElasticearch().getType())
                    .setSource(gson.toJson(data), XContentType.JSON));
            }
        }

        // Bulk実行
        BulkResponse bulkResponse = bulkRequest.execute().get();

        // Bulkが失敗しているか(失敗しているとtrue)
        if (bulkResponse.hasFailures()) {
            logger.error("ElsImporter bulk失敗");

            // クライアントを終了する
            client.close();

            return false;
        }

        logger.info("ElsImporter インポート処理終了 -----------------------------------------");

        // クライアントを終了する
        client.close();

        return true;
    }

    /**
     * Gsonでリスト内のオブジェクトを指定した型に変換する
     */
    class ListOfSomething<X> implements ParameterizedType {

        private Class<?> wrapped;

        public ListOfSomething(Class<X> wrapped) {
            this.wrapped = wrapped;
        }

        public Type[] getActualTypeArguments() {
            return new Type[] {wrapped};
        }

        public Type getRawType() {
            return List.class;
        }

        public Type getOwnerType() {
            return null;
        }

    }
}

参考: Java Type Generic as Argument for GSON

インポート対象のデータクラス情報自身はImporterには持たせていません。
ジェネリクスとクラス情報のみで対応しているので、今回のランキング情報以外にも流用可能です。
ElsDataBaseBeanで持っていたindexはサブインデックスのような役割を果たします。
Kibanaで表示したり、Java-APIで取得する粒度に応じて付与するといいと思います。
ELSのインデックスの粒度はlogstashだと年月日がデフォルトですが、これもどのくらいの期間にまたがってみる事が多いか、に基づいて検討するといいでしょう。

exec.java
ElsImporter<RankingDataBean> importer 
    = new ElsImporter<RankingDataBean>(setting, RankingDataBean.class);
boolean result = importer.execute();

呼び出しはこのような方法で行います。
RankingDataBeanを3か所も記載するのは、ちょっとアレなのですが、gson対応のため、諦めました。

所感

BulkRequestのインポート処理をできるだけ汎用的にできるよう、頑張ってみました。
次回以降、この登録したデータをゴリゴリしてみたいと思います。

miz21358
仕事やら趣味でプログラミングとか周辺をごそごそやってます。 「手を抜く為の手間は惜しまない」がモットー。 Java, Python, Sikuli, Elasticsearch, Kibana, Monaca, JavaScript, bot, Windowsバッチ, VBScript, Excel マクロ
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away