Elasticsearch では、Java APIを使う事ができます。
Elasticsearch Java API 公式ドキュメント
これがなかなかに便利なのですが、あまり情報が出回ってないようなので、まとめてみました。
【2018/01/12】追記
本記事で使用しているTransportClient
は、次のメジャーバージョン7から非推奨、8で削除されるそうです。
そのため、High Level REST Client
への移行が推奨されております。
詳細はコメント欄をご確認ください。
johtani様 、ご指摘ありがとうございました。
要件
- とある本屋チェーン店における、売り上げランキング推移が見たい
- 支店情報や書籍情報など詳しい情報は別のDBで管理している
- 「何冊売れたか」という実際の数値データも別のDBで管理している
- ランキング推移や分析などを、ELS(+Kibana)で行いたい
こんなイメージで作っていきたいと思います。
開発環境
- Elasticsearch 5.6.5 (gradle … 'org.elasticsearch.client:transport:5.6.5')
- Kibana 5.6.5
- Java 1.8
データ構成
- ランキング情報
- 支店名
- 書名
- 著者名
- ジャンル
- ランキング
- レート
- 登録日
RDBですと、支店情報と書名情報を別々のTBLに持たせて、売上TBLを作って繋げて…という流れになると思いますが、ELSの場合、1行1件のデータとなりますので、このようなレコードを複数件登録します。
ELSのレコードは、ビューのようなイメージを持つと作りやすい気がします。
このレコードで特徴的なのは、レート(重みづけ)でしょうか。
ランキングは通常、1位が最上位で、以下、数字が増えていくごとにランキングが低くなっていきます。
ですが、このランキングを折れ線グラフのY軸で表示しようとすると、Y軸の値が上にいくほどが小さい値という、通常とは逆転した表示をしなくてはならず、ELSでは表現が難しいです。
参考(というか私の過去の質問): 折れ線グラフでのランキング表示
なので、グラフの表示を行いやすいように、ランキングの値を重みづけという形で変換したのが、レートです。
レート算出方法:(ランキング総数) - (自身のランキング値) + 1
ランキング総数が動くと、グラフの表示が変になってしまうので、100なり1000なり、常に固定した値を用います。
任意の方法で、ELSにインポート可能な状態のJSONを作成します。
JSON
branch title author genre ranking rate execDate
新宿 タイトルA 著者安 comic-girl 1 3 2018/01/11T13:00:00+09:00
新宿 タイトルB 著者以 comic-male 2 2 2018/01/11T13:00:00+09:00
新宿 タイトルC 著者宇 comic-girl 3 1 2018/01/11T13:00:00+09:00
丸の内 タイトルC 著者宇 comic-girl 1 3 2018/01/11T13:00:00+09:00
丸の内 タイトルA 著者安 comic-girl 2 2 2018/01/11T13:00:00+09:00
丸の内 タイトルD 著者衣 comic-boy 3 1 2018/01/11T13:00:00+09:00
品川 タイトルB 著者以 comic-male 1 3 2018/01/11T13:00:00+09:00
品川 タイトルC 著者宇 comic-girl 2 2 2018/01/11T13:00:00+09:00
品川 タイトルE 著者於 comic-femail 3 1 2018/01/11T13:00:00+09:00
[{"branch":"新宿","title":"タイトルA","author":"著者安","genre":"comic-girl","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"新宿","title":"タイトルB","author":"著者以","genre":"comic-male","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"新宿","title":"タイトルC","author":"著者宇","genre":"comic-girl","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"丸の内","title":"タイトルC","author":"著者宇","genre":"comic-girl","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"丸の内","title":"タイトルA","author":"著者安","genre":"comic-girl","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"丸の内","title":"タイトルD","author":"著者衣","genre":"comic-boy","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"品川","title":"タイトルB","author":"著者以","genre":"comic-male","ranking":1,"rate":3,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"品川","title":"タイトルC","author":"著者宇","genre":"comic-girl","ranking":2,"rate":2,"execDate":"2018/01/11T13:00:00+09:00"},{"branch":"品川","title":"タイトルE","author":"著者於","genre":"comic-femail","ranking":3,"rate":1,"execDate":"2018/01/11T13:00:00+09:00"}]
Bean
これに対応する、ランキング情報Beanも作成します。
public class RankingDataBean extends ElsDataBaseBean {
// 支店名
private String branch;
// 書名
private String title;
// 著者名
private String author;
// ジャンル
private String genre;
// ランキング
private int ranking;
// レート
private int rate;
// 登録日
private Date execDate;
(略)
}
ここでの特徴は、ElsDataBaseBean を継承している点です。
中身はこんな感じです。
public class ElsDataBaseBean {
// インデックス
private String index;
(略)
}
index だけ自前で持っています。
このBeanを継承するBeanであれば何でも登録できるようにImporterを作っていきます。
ELSへのインポート処理
/**
* ELSへBulkでデータをインポートする
*/
public class ElsImporter<T extends ElsDataBaseBean> {
private static Logger logger = LoggerFactory.getLogger(ElsImporter.class);
// 年月インデックスのフォーマット(yyyyMM)
private static DateTimeFormatter YM_INDEX_FORMATTER;
// 各種設定情報
SettingBaseBean setting;
// インポートするデータのクラス
Class<T> clazz;
/**
* コンストラクタ
* @param setting 各種設定情報
* @param clazz インポートするデータのクラス
*/
public ElsImporter(SettingBaseBean setting, Class<T> clazz) {
this.setting = setting;
this.clazz = clazz;
YM_INDEX_FORMATTER = DateTimeFormatter.ofPattern(setting.getElasticearch().getIndexYmFormat());
}
/**
* インポートの実行
*/
public boolean execute() throws Exception {
logger.info("ElsImporter インポート処理開始 -----------------------------------------");
// ELSをJavaで扱うためのクライアントを作成する
// setting.getElasticearch().getAddress(): IPアドレス
// setting.getElasticearch().getPort(): ポート番号(通常は9300)
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(
setting.getElasticearch().getAddress()), setting.getElasticearch().getPort()));
// bulk準備
BulkRequestBuilder bulkRequest = client.prepareBulk();
// Gson準備
// setting.getElasticearch().getExecDateFormat():
// 登録日のフォーマット(yyyy/MM/dd'T'HH:mm:ss+09:00)
Gson gson = new GsonBuilder().setDateFormat(setting.getElasticearch().getExecDateFormat()).create();
// JSONの入っているディレクトリ内ファイル全対象とする
File dir = new File(setting.getRootDir(), setting.getSubDir().getJsonDir());
for (File file : dir.listFiles()) {
logger.debug("■■ Import: "+ file.getName());
// JSONデータをリストに変換
List<T> list = gson.fromJson(FileUtils.readFileToString(file, "UTF-8"), new ListOfSomething<T>(clazz));
// ランキング情報を1件ずつ取得してbulkに追加する
for (T data : list) {
// 登録先の index
// setting.getElasticearch().getIndex(): ELSのindex(共通部分)
// setting.getNow(): 処理実行時のLocalDateTime
// 例1: setting.getElasticearch().getIndex() : ranking
// data.getIndex() : comic-girl
// setting.getNow() : 2018/01/18 12:00:00
// 生成index : ranking-comic-girl-201801
// 例2: setting.getElasticearch().getIndex() : ranking
// data.getIndex() : null
// setting.getNow() : 2018/01/18 12:00:00
// 生成index : ranking-201801
String index;
if (StringUtils.isEmpty(data.getIndex())) {
// サブインデックスなしバージョン
index = setting.getElasticearch().getIndex()
+ "-" + YM_INDEX_FORMATTER.format(setting.getNow());
}
else {
// サブインデックスありバージョン
index = setting.getElasticearch().getIndex()
+ "-" + data.getIndex()
+ "-" + YM_INDEX_FORMATTER.format(setting.getNow());
}
// BulkにJSON形式のデータを追加する
// setting.getElasticearch().getType(): ELSのtype
bulkRequest.add(client.prepareIndex(index
, setting.getElasticearch().getType())
.setSource(gson.toJson(data), XContentType.JSON));
}
}
// Bulk実行
BulkResponse bulkResponse = bulkRequest.execute().get();
// Bulkが失敗しているか(失敗しているとtrue)
if (bulkResponse.hasFailures()) {
logger.error("ElsImporter bulk失敗");
// クライアントを終了する
client.close();
return false;
}
logger.info("ElsImporter インポート処理終了 -----------------------------------------");
// クライアントを終了する
client.close();
return true;
}
/**
* Gsonでリスト内のオブジェクトを指定した型に変換する
*/
class ListOfSomething<X> implements ParameterizedType {
private Class<?> wrapped;
public ListOfSomething(Class<X> wrapped) {
this.wrapped = wrapped;
}
public Type[] getActualTypeArguments() {
return new Type[] {wrapped};
}
public Type getRawType() {
return List.class;
}
public Type getOwnerType() {
return null;
}
}
}
参考: Java Type Generic as Argument for GSON
インポート対象のデータクラス情報自身はImporterには持たせていません。
ジェネリクスとクラス情報のみで対応しているので、今回のランキング情報以外にも流用可能です。
ElsDataBaseBean
で持っていたindex
はサブインデックスのような役割を果たします。
Kibana
で表示したり、Java-API
で取得する粒度に応じて付与するといいと思います。
ELSのインデックスの粒度はlogstashだと年月日がデフォルトですが、これもどのくらいの期間にまたがってみる事が多いか、に基づいて検討するといいでしょう。
ElsImporter<RankingDataBean> importer
= new ElsImporter<RankingDataBean>(setting, RankingDataBean.class);
boolean result = importer.execute();
呼び出しはこのような方法で行います。
RankingDataBean
を3か所も記載するのは、ちょっとアレなのですが、gson対応のため、諦めました。
所感
BulkRequestのインポート処理をできるだけ汎用的にできるよう、頑張ってみました。
次回以降、この登録したデータをゴリゴリしてみたいと思います。