Posted at

RestHighLevelClientでBulkAPIを使う


前回記事と記事の内容

前回記事のTransportClientからRestHighLevelClientに移行するで作成したサンプルアプリから今回はBulkAPIを使用したJavaClientからElasticsaerchへのBulkインサート処理についてまとめています。


環境

macOS

Elasticsearch6.5.2

Java8

Spring Boot 2.1.1


サンプルアプリ

作成したアプリケーションはGitHubに挙げています。

https://github.com/ohanamisan/Elasticsearch_on_Java

Elasticsearchのバージョンが異なる場合は適宜gradleファイルのjarインポート箇所を変更してください。

その他詳細はREADME参照でお願いします。


実装

今回はBulkAPIの中のBulkProcessorを使ってデータのインサート処理を実装していきます。

特別大きな違いもなくドキュメントを参考にすればすらすら出来ちゃうと思いますがTransportClientとRestHighLevelClientの比較程度のメモだと思ってください。

本家のBulkProcessorのドキュメントはこちら↓


RestHighLevelClient

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-bulk.html


TransportClient

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html


TransportClientでのBulkProcessorの生成

BulkProcessor processor = BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {

@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
}

@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
throwable.printStackTrace();
}

@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
System.out.println(
"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
}
}).setBulkActions(20)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(0)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();


RestHighLevelClientでのBulkProcessorの生成

BulkProcessor processor =  BulkProcessor.builder((request, bulkListener) ->

client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {

@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
}

@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
throwable.printStackTrace();
}

@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
System.out.println(
"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
}
}).setBulkActions(20)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(0)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();

どちらも大きな違いはなくBulkProcessorのbuilderメソッドの第一引数でTransportClientではそのまま生成したclientを渡し、RestHighLevelClientではラムダでbulkAsyncメソッドを使ってclient情報を渡してあげます。

第二引数にはどちらも実行前、実行後の処理を実装したListenerを定義します。


インサート処理

インサート時の処理はどちらも変わらずaddメソッドを使うことができます。

processor.add(new IndexRequest("投入先index").source("投入データ"));

processor.add(new IndexRequest("投入先index", "投入先type").source("投入データ"));
processor.add(new IndexRequest("投入先index", "投入先type", "一意のid").source("投入データ"));


さいごに

ドキュメント通りListenerのインスタンスを生成した後、BulkProcessorのbuilderで定義を分けることももちろんできます。  

実行前、実行後の処理をがっつり実装するときはその辺りの処理を分けると責務がはっきりして良い感じになるんですかね。  

次はScrollAPIを使ってサンプルアプリにページング機能をつけてみたいと思います。