LoginSignup
0
0

More than 3 years have passed since last update.

Elasticsearch > kotlin > es-kotlin-clinent マニュアル(意訳)8 - Asynchronous

Posted at

Elasticsearchをサーバーサイドで操作するKotlin版ライブラリ
がよく出来ていたのでその動作確認をしつつマニュアルを日本語にしつつメモしています。

全10回です。

es-kotlin-clinent マニュアル(意訳)1
es-kotlin-clinent マニュアル(意訳)2
es-kotlin-clinent マニュアル(意訳)3
es-kotlin-clinent マニュアル(意訳)4
es-kotlin-clinent マニュアル(意訳)5 - 楽観的ロックによる更新
es-kotlin-clinent マニュアル(意訳)6 - 検索
es-kotlin-clinent マニュアル(意訳)7 - Kotlin Query DSL
es-kotlin-clinent マニュアル(意訳)8 - Asynchronous <- ここ
es-kotlin-clinent マニュアル(意訳)9 - DSLs
es-kotlin-clinent マニュアル(意訳)10 - Example

Asynchronous IO with Co-routines

RestHighLevelClientは、レスポンスが返ってきたときにコールバックを受けて処理するほとんどのAPIの非同期バージョンを公開しています。これを使用するのは、ちょっとした手間がかかります。

幸いなことに、Kotlinには非同期プログラミングのためのco-routineがあり、このライブラリはこれらの関数のco-routineフレンドリーなバージョンを提供しています。
これらのsuspend関数は、suspendとしてマークされ、KotlinのsuspendCancellableCoroutineを使ったSuspendingActionListenerを使って、残りの高レベルクライアントが期待するコールバックをラップするという点を除けば、同期版とほとんど同じように動作します。

Elasticsearch 7.5.0の時点では、すべての非同期呼び出しはタスクをキャンセルできるCancellableオブジェクトを返すようになっています。
suspendCancellableCoRoutineを使用すると、何らかの障害が発生したり、コアーチンスコープを中止したりすると、実行中のタスクはすべてキャンセルされます。

KtorSpring Boot 2.xのような非同期サーバーフレームワークを使用している場合(リアクティブモードで)、非同期関数を使用したいと思うでしょう。

asyncメソッド

co-routinesをサポートするために、このプロジェクトではコード生成プラグインを使用して、Rest High Level async関数のそれぞれのco-routinesフレンドリーなバージョンを生成しています。
現時点では、それらのほとんどがカバーされています。その数は100以上あります。

jillesvangurp/コード生成プラグイン

例として、ここではreloadAnalyzers APIを使用する3つの方法を紹介します。

1. 同期バージョン

RestHighLevel クライアントによって提供される同期バージョン

val ic = esClient.indices()
val response = ic.reloadAnalyzers(
  ReloadAnalyzersRequest("myindex"), RequestOptions.DEFAULT
)

2. 非同期バージョン

RestHighLevel クライアントによって提供されるコールバックを使用した非同期バージョン

ic.reloadAnalyzersAsync(
  ReloadAnalyzersRequest("myindex"),
  RequestOptions.DEFAULT,
  object : ActionListener<ReloadAnalyzersResponse> {
    override fun onFailure(e: Exception) {
      println("it failed")
    }

    override fun onResponse(response: ReloadAnalyzersResponse) {
      println("it worked")
    }
  }
)

3. コアーチンフレンドリーなバージョン

co-routineフレンドリーなバージョンです。

コードジェネレーターのプラグインで生成された関数を使用したcoroutineフレンドリーなバージョンは、サスペンドバージョンなので、coroutineスコープを取得するためにrunBlockingに入れていますが、
もちろんあなた自身のアプリケーションでより適切なスコープを使用してください。

runBlocking {
  val response2 = ic.reloadAnalyzersAsync(
    ReloadAnalyzersRequest("myindex"), RequestOptions.DEFAULT
  )
}

AsyncIndexRepository

RestHighLevelClientのほとんどの関数のサスペンド版を持つことに加えて、IndexRepositoryにはAsyncIndexRepositoryのカウンター部分があります。

これのAPIは通常のリポジトリと似ています。

indexの作成

// 拡張関数を使用して新しいリポジトリを作成することができます。
val asyncRepo = esClient.asyncIndexRepository<Thing>("asyncthings")

// asyncRepo上のすべての関数はもちろんサスペンドされているので、
// それらをサスペンドさせるためにはサスペンドされた関数をサブルーチンで実行する必要があります。
runBlocking {
  asyncRepo.createIndex {
    source(
      """
        {
          "settings": {
          "index": {
            "number_of_shards": 3,
            "number_of_replicas": 0,
            "blocks": {
            "read_only_allow_delete": "false"
            }
          }
          },
          "mappings": {
          "properties": {
            "title": {
            "type": "text"
            }
          }
          }
        }
      """,
      XContentType.JSON
    )
  }
}

データ

// asyncRepo上のすべての関数はもちろんサスペンドされています。
// コ・ルーティン・スコープで実行する必要があります。
runBlocking {
  // これらはすべて非同期サスペンド関数を使用しています。
  asyncRepo.index("thing1", Thing("The first thing"))
  // これは `AsyncBulkIndexingSession` を使用します。
  asyncRepo.bulk {
    for (i in 2.rangeTo(10)) {
      index("thing_$i", Thing("thing $i"))
    }
  }
  asyncRepo.refresh()
  val count = asyncRepo.count { }
  println("indexed $count items")
}

Captured Output:

indexed 10 items

Asynchronous search

非同期検索APIは非常に似ています。
返されるAsyncSearchResults以外は似ています。
結果はKotlin Co-RoutinesライブラリのFlowAPIを利用しています。

import kotlinx.coroutines.flow.count

runBlocking {
  val results = asyncRepo.search(scrolling = true) {
    configure {
      query = matchAll()
    }
  }

  // hits is a Flow<Thing>
  println("Hits: ${results.mappedHits.count()}")
}

Captured Output:

Hits: 10
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0