前回記事と記事の内容
前回記事のTransportClientからRestHighLevelClientに移行するで作成したサンプルアプリから今回はBulkAPIを使用したJavaClientからElasticsaerchへのBulkインサート処理についてまとめています。
環境
macOS
Elasticsearch6.5.2
Java8
Spring Boot 2.1.1
サンプルアプリ
作成したアプリケーションはGitHubに挙げています。
https://github.com/ohanamisan/Elasticsearch_on_Java
Elasticsearchのバージョンが異なる場合は適宜gradleファイルのjarインポート箇所を変更してください。
その他詳細はREADME参照でお願いします。
実装
今回はBulkAPIの中のBulkProcessorを使ってデータのインサート処理を実装していきます。
特別大きな違いもなくドキュメントを参考にすればすらすら出来ちゃうと思いますがTransportClientとRestHighLevelClientの比較程度のメモだと思ってください。
本家のBulkProcessorのドキュメントはこちら↓
RestHighLevelClient
TransportClient
TransportClientでのBulkProcessorの生成
BulkProcessor processor = BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
System.out.println(
"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
}
}).setBulkActions(20)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(0)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
RestHighLevelClientでのBulkProcessorの生成
BulkProcessor processor = BulkProcessor.builder((request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
System.out.println(
"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
}
}).setBulkActions(20)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(0)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
どちらも大きな違いはなくBulkProcessorのbuilderメソッドの第一引数でTransportClientではそのまま生成したclientを渡し、RestHighLevelClientではラムダでbulkAsyncメソッドを使ってclient情報を渡してあげます。
第二引数にはどちらも実行前、実行後の処理を実装したListenerを定義します。
インサート処理
インサート時の処理はどちらも変わらずaddメソッドを使うことができます。
processor.add(new IndexRequest("投入先index").source("投入データ"));
processor.add(new IndexRequest("投入先index", "投入先type").source("投入データ"));
processor.add(new IndexRequest("投入先index", "投入先type", "一意のid").source("投入データ"));
さいごに
ドキュメント通りListenerのインスタンスを生成した後、BulkProcessorのbuilderで定義を分けることももちろんできます。
実行前、実行後の処理をがっつり実装するときはその辺りの処理を分けると責務がはっきりして良い感じになるんですかね。
次はScrollAPIを使ってサンプルアプリにページング機能をつけてみたいと思います。