LoginSignup
1
0

More than 3 years have passed since last update.

RestHighLevelClientでBulkAPIを使う

Last updated at Posted at 2018-12-26

前回記事と記事の内容

前回記事のTransportClientからRestHighLevelClientに移行するで作成したサンプルアプリから今回はBulkAPIを使用したJavaClientからElasticsaerchへのBulkインサート処理についてまとめています。

環境

macOS
Elasticsearch6.5.2
Java8
Spring Boot 2.1.1

サンプルアプリ

作成したアプリケーションはGitHubに挙げています。
https://github.com/ohanamisan/Elasticsearch_on_Java

Elasticsearchのバージョンが異なる場合は適宜gradleファイルのjarインポート箇所を変更してください。
その他詳細はREADME参照でお願いします。

実装

今回はBulkAPIの中のBulkProcessorを使ってデータのインサート処理を実装していきます。
特別大きな違いもなくドキュメントを参考にすればすらすら出来ちゃうと思いますがTransportClientとRestHighLevelClientの比較程度のメモだと思ってください。
本家のBulkProcessorのドキュメントはこちら↓

RestHighLevelClient

TransportClient

TransportClientでのBulkProcessorの生成


BulkProcessor processor = BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {

            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
                System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                System.out.println(
                        "bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
            }
        }).setBulkActions(20)
          .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
          .setFlushInterval(TimeValue.timeValueSeconds(5))
          .setConcurrentRequests(0)
          .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
          .build();

RestHighLevelClientでのBulkProcessorの生成


BulkProcessor processor =  BulkProcessor.builder((request, bulkListener) ->
                client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {

            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
                System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                System.out.println(
                        "bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
            }
        }).setBulkActions(20)
          .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
          .setFlushInterval(TimeValue.timeValueSeconds(5))
          .setConcurrentRequests(0)
          .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
          .build();

どちらも大きな違いはなくBulkProcessorのbuilderメソッドの第一引数でTransportClientではそのまま生成したclientを渡し、RestHighLevelClientではラムダでbulkAsyncメソッドを使ってclient情報を渡してあげます。
第二引数にはどちらも実行前、実行後の処理を実装したListenerを定義します。

インサート処理

インサート時の処理はどちらも変わらずaddメソッドを使うことができます。

processor.add(new IndexRequest("投入先index").source("投入データ"));
processor.add(new IndexRequest("投入先index", "投入先type").source("投入データ"));
processor.add(new IndexRequest("投入先index", "投入先type", "一意のid").source("投入データ"));

さいごに

ドキュメント通りListenerのインスタンスを生成した後、BulkProcessorのbuilderで定義を分けることももちろんできます。  
実行前、実行後の処理をがっつり実装するときはその辺りの処理を分けると責務がはっきりして良い感じになるんですかね。  

次はScrollAPIを使ってサンプルアプリにページング機能をつけてみたいと思います。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0