Introduction
前回までのあらすじ。
コード
Futureの基本
非同期APIから生成されたコマンドは、RedisFuture interfaceの実装インスタンスを返します。このinterfaceは、java.util.concurrent.CompletableFuture classと同様に、CompletionStageとFuture interfaceを実装しています。
参照 → https://lettuce.io/core/release/reference/#asynchronous-api.result-handles
- First steps with CompletableFuture
上記ドキュメント Example 6より。
import java.util.concurrent.CompletableFuture
val future = CompletableFuture<String>()
future.complete("hogehoge")
val isDone = future.isDone // getter.., setter.., is..(Booleanのみ)は、()が不要
val value = future.get()
- Using listeners with CompletableFuture
同じくExample7より。
import java.util.concurrent.CompletableFuture
val future = CompletableFuture<String>()
future.thenRun {
println("Got value: ${future.get()}") // Runnable
// 本当はここで例外処理が必要
}
println("Current state: ${future.isDone}")
future.complete("hogehoge")
println("Current state: ${future.isDone}")
// 以下のように表示されます
// Current state: false
// Got value: hogehoge
// Current state: true
- Using a Consumer future listner
Example8より。上記の例では、Runnable内でfuture.get()を呼び出していますが、この呼び出しはblockingで、また、例外を送出する可能性があります。より、洗練された方法として、Consumer interfaceを使う方法があります。
import java.util.concurrent.CompletableFuture
val future = CompletableFuture<String>()
future.thenAccept {
println("Got value: $it") // Consumer<String>
}
println("Current state: ${future.isDone}")
future.complete("hogehoge")
println("Current state: ${future.isDone}")
LettuceでFutureを使う
- Blocking synchronization
非同期APIコマンドからRedisFutureを取得した場合でも、それを同期的に使うこともできます。
参照 → https://lettuce.io/core/release/reference/#asynchronous-api.consuming-futures
Example9, 10より。
import io.lettuce.core.RedisClient
import java.util.concurrent.TimeUnit
val redisClient = RedisClient.create("redis://localhost:6379/0")
val connection = redisClient.connect()
val command = connection.async()
val future = command.get("key") // RedisFuture<String>
//val value = future.get() // 引数なしだと、取得できるまでブロックしてしまう
val value = future.get(10, TimeUnit.SECONDS) // タイムアウト付き(タイムアウト時にはTimeoutExceptionが発生)
println(value)
- Using a Consumer listener with GET
Example11, 12より。上記にCompletableFutureと同様に、Consumer listenerを使うこともできます。
import io.lettuce.core.RedisClient
val redisClient = RedisClient.create("redis://localhost:6379/0")
val connection = redisClient.connect()
val command = connection.async()
val future = command.get("key") // RedisFuture<String>
future.thenAccept {
println("Value: $it")
}
- Getting multiple keys asynchronously
Example14より。データのアクセスをまとめて行い、すべて完了するまで待つ例です。
import io.lettuce.core.RedisClient
import io.lettuce.core.LettuceFutures
import java.util.concurrent.TimeUnit
val redisClient = RedisClient.create("redis://localhost:6379/0")
val connection = redisClient.connect()
val command = connection.async()
val futures:MutableList<RedisFuture<String>> = mutableListOf()
for (i in 1..9) {
futures.add(command.get("key-$i"))
}
val futureArray = futures.toTypedArray()
// *futureArray: 可変引数に変換している
LettuceFutures.awaitAll(10, TimeUnit.SECONDS, *futureArray)
Example15のように、単独のデータアクセスでも同様にすることができます。get()メソッドにタイムアウトを指定する例と違い、この場合はTimeoutExceptionが送出されません。
import io.lettuce.core.RedisClient
import io.lettuce.core.LettuceFutures
import java.util.concurrent.TimeUnit
val redisClient = RedisClient.create("redis://localhost:6379/0")
val connection = redisClient.connect()
val command = connection.async()
val future = command.get("hogehoge")
if (!future.await(1, TimeUnit.MINUTES)) {
println("TIMEOUT")
}
- Future chaining
Example16より。RedisFutureには、futureをchainしたりtransformしたりするさまざまなメソッドがあります。
import io.lettuce.core.RedisClient
import io.lettuce.core.LettuceFutures
import java.util.concurrent.TimeUnit
val redisClient = RedisClient.create("redis://localhost:6379/0")
val connection = redisClient.connect()
val command = connection.async()
val future = command.get("hogehoge")
future.thenApply {
it.length
}.thenAccept {
println("Length is $it")
}