2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

KotlinでReactorのハンズオン書いてみた

Posted at

Introduction

Kotlinの勉強がてら、ReactorのハンズオンをKotlinで書いてみました。
なお、Kotlinの環境構築については、以前の投稿を参照ください。

Gradle設定

Gradleの各設定ファイルです。

gradle.properties

これは、以前の投稿と同じです。

gradle.properties
org.gradle.jvmargs=-Djavax.net.ssl.trustStore=/etc/ssl/certs/java/cacerts -Djavax.net.ssl.trustStorePassword=changeit -Dorg.gradle.daemon=false

settings.gradle

これも同じです。プロジェクト名のみ今回の練習用に"reactorwork"としています。

settings.gradle
pluginManagement {
    repositories {
        mavenCentral()

        // Kotlin
        maven {
            url { 'https://dl.bintray.com/kotlin/kotlin-dev' }
        }

    }
}
rootProject.name = 'reactorwork'

build.gradle

これは以前の投稿とほぼ同じですが、以下の点が異なります。

  • Reactorのリポジトリ(https://repo/spring.io/milestone)が追加された
  • Reactor用のdependencyが追加された。なお、テスト用のSpecVerifierをmain側で使用しているため、testImplementationではなくimplementationになっている
  • 前回の投稿の間にkotlinのバージョンが1.2.50から1.2.51に変わったので引き上げられている
build.gradle
plugins {
    // Kotlin
    id 'org.jetbrains.kotlin.jvm' version '1.2.51'
}

group 'hogehoge'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()

    // Spek
    maven {
        url { 'http://dl.bintray.com/jetbrains/spek' }
    }

    // Reactor
    maven {
        url { 'https://repo.spring.io/milestone' }
    }
}

dependencies {
    // Kotlin
    implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"

    // JUnit5
    testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5:1.2.51'
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.2.0'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher:1.2.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.2.0'

    // Spek
    testImplementation 'org.jetbrains.spek:spek-api:1.1.5'
    implementation 'org.jetbrains.kotlin:kotlin-reflect:1.2.51'
    testRuntimeOnly 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'

    // Reactor
    implementation 'io.spring.gradle:dependency-management-plugin:1.0.5.RELEASE'
    implementation 'io.projectreactor:reactor-core:3.2.0.M2'
    implementation 'io.projectreactor:reactor-test:3.2.0.M2' // mainでSpecVerifierを使用しているため
    //testImplementation 'io.projectreactor:reactor-test:3.2.0.M2'
}

compileKotlin {
    kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "1.8"
}

test {
    useJUnitPlatform()
}

実装開始

Part01

Main

一応自力で考えて、そのあと、回答を見て修正しました。
(配列やコレクションを使わずに"foo"と"bar"を含むFlux書けという問題で、最初、"foo"と"bar"のMonoをconcatWithしていたのは秘密ですw)。

なお、Reactor3.1より、reactor-kotlin-extensionはreactor-coreに統合されています。
これにより、HogehogeException().toFlux()のような書き方がCoreライブラリのみで可能になっています。

Part01Flux.kt
import reactor.core.publisher.Flux
import reactor.core.publisher.toFlux
import java.time.Duration

class Part01Flux {
    // Return an empty Flux
    fun emptyFlux(): Flux<String> = Flux.empty()

    // Return a Flux that contains 2 values "foo" and "bar" without using an array or a collection
    fun fooBarFluxFromValues(): Flux<String> = Flux.just("foo", "bar")

    // Create a Flux from List that contains 2 values "foo" and "bar"
    fun fooBarFluxFromList(): Flux<String> = listOf("foo", "bar").toFlux()

    // Create a Flux that emits an IllegalStateException
    fun errorFlux(): Flux<String> = IllegalStateException().toFlux()

    // Create a Flux that emits increasing values from 0 to 9 each 100ms
    fun counter(): Flux<Long> = Flux.interval(Duration.ofMillis(100)).take(10)
}

Test

テストをSpekで書いています。
なお、解答ではStepVerifierを使っていますが、ここではtest()メソッドに変更しています。

Part01FluxSpec.kt
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import reactor.test.test

object Part01FluxSpec: Spek({
    describe("Part01Flux") {
        val workshop = Part01Flux()

        on("emptyFlux") {
            val flux = workshop.emptyFlux()
            it("is empty") {
                flux.test().verifyComplete()
            }
        }

        on("fooBarFluxFromValues") {
            val flux = workshop.fooBarFluxFromValues()
            it("is created without list") {
                flux.test()
                        .expectNext("foo", "bar")
                        .verifyComplete()
            }
        }

        on("fooBarFluxFromList") {
            val flux = workshop.fooBarFluxFromList()
            it("is created from list") {
                flux.test()
                        .expectNext("foo", "bar")
                        .verifyComplete()
            }
        }

        on("errorFlux") {
            val flux = workshop.errorFlux()
            it("has exception") {
                flux.test()
                        .verifyError(IllegalStateException::class.java)
            }
        }

        on("counter") {
            val flux = workshop.counter()
            it("has sequential values") {
                flux.test()
                        .expectNext(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L)
                        .verifyComplete()
            }
        }
    }
})

Part02

Monoの基本的な使い方だけです。
また、テスト(Spek)の書き方は、Part01とそう変わりないので省略します。

Part02Mono.kt
import reactor.core.publisher.Mono
import reactor.core.publisher.toMono

class Part02Mono {
    // Return an empty Mono
    fun emptyMono(): Mono<String> = Mono.empty()

    // Return a Mono that never emits any signal
    fun monoWithNoSignal(): Mono<String> = Mono.never()

    // Return a Mono that contains a "foo" value
    fun fooMono(): Mono<String> = Mono.just("foo")

    // Create a Mono that emits an IllegalStateException
    fun errorMono(): Mono<String> = IllegalStateException().toMono()
}

Part03

StepVerifierの使い方。
expect3600Elementsがうまくできない。どうやら、withVirtualTimeの内部のVirtualTimeSchedulerでエラーが出ている模様。
不具合も絡んでいるような気もするけど、時間が無いので調査は断念。

Part03StepVerifier.kt
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
import reactor.test.scheduler.VirtualTimeScheduler
import java.time.Duration
import java.util.function.Supplier

class Part03StepVerifier {
    // Use StepVerifier to check that the flux parameter emits "foo" and "bar" elements then completes successfully.
    fun expectFooBarComplete(flux: Flux<String>) {
        StepVerifier.create(flux)
                .expectNext("foo", "bar")
                .verifyComplete()
    }

    // Use StepVerifier to check that the flux parameter emits "foo" and "bar" elements then a RuntimeException error.
    fun expectFooBarError(flux: Flux<String>) {
        StepVerifier.create(flux)
                .expectNext("foo", "bar")
                .expectError(RuntimeException::class.java)
    }

    // Use StepVerifier to check that the flux parameter emits a User with "swhite"username
    // and another one with "jpinkman" then completes successfully.
    fun expectSkylerJesseComplete(flux: Flux<User>) {
        StepVerifier.create(flux)
                .expectNext(User("swhite"), User("jpinkman"))
    }

    // Expect 10 elements then complete and notice how long the test takes.
    fun expect10Elements(flux: Flux<Long>) {
        val duration = StepVerifier.create(flux)
                .expectNextCount(10)
                .verifyComplete()
        System.out.println("Duration=${duration.seconds}")
    }

    // Expect 3600 elements at intervals of 1 second, and verify quicker than 3600s
    // by manipulating virtual time thanks to StepVerifier#withVirtualTime, notice how long the test takes
    fun expect3600Elements(supplier: Supplier<Flux<Long>>) {
        // VirtualTimeSchedulerのcreateWorkerまわりでうまくいっていない気がする
        /*
        val duration = StepVerifier.withVirtualTime(supplier)
                .thenAwait(Duration.ofSeconds(3600))
                .expectNextCount(3600)
                .verifyComplete()
        System.out.println("Duration=${duration.seconds}")
        */
    }
}
Part03StepVerifierSpec.kt(一部)
object Part03StepVerifierSpec: Spek({
    describe("Part03StepVerifier") {
        val workshop = Part03StepVerifier()

        on("countWithVirtualTime") {
            val flux = Flux.interval(Duration.ofSeconds(1)).take(3600)
            val supplier = Supplier { flux }
            it("can count with timer") {
                workshop.expect3600Elements(supplier)
            }
        }
    }
})

Part04

Transform処理です。ここから、データクラスUserを使用します。
firstname, lastnameはNullableにしました。

少しひっかかった点は、fluxMapの引数にSAM(Single Abstract Method)を使ったところあたりです。
あと、Nullableへのメソッド呼び出し(?.)。

User.kt
data class User(val username: String, val firstname: String? = null, val lastname: String? = null) {
    companion object {
        val SKYLER = User("swhite", "Skyler", "White")
        val JESSE = User("jpinkman", "Jesse", "Pinkman")
        val WALTER = User("wwhite", "Walter", "White")
        val SAUL = User("sgoodman", "Saul", "Goodman")
    }
}
Part04Transform.kt
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

class Part04Transform {
    // Capitalize the user username, firstname and lastname
    fun capitalizeOne(mono: Mono<User>): Mono<User> =
            mono.map {
                User(
                        it.username.toUpperCase(),
                        it.firstname?.toUpperCase(),
                        it.lastname?.toUpperCase()
                )
            }

    // Capitalize the users username, firstName and lastName
    fun capitalizeMany(flux: Flux<User>): Flux<User> =
            flux.map {
                User(
                        it.username.toUpperCase(),
                        it.firstname?.toUpperCase(),
                        it.lastname?.toUpperCase()
                )
            }

    // Capitalize the users username, firstName and lastName using #asyncCapitalizeUser
    fun asyncCapitalizeMany(flux: Flux<User>): Flux<User> =
            flux.flatMap { asyncCapitalizeUser(it) }

    private fun asyncCapitalizeUser(u: User): Mono<User> =
            Mono.just(User(u.username.toUpperCase(), u.firstname?.toUpperCase(), u.lastname?.toUpperCase()))
}

Part05

Main

二つの連結方法(mergeとconcat)の違いですね。

Part05Merge.kt
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

class Part05Merge {
    // Merge flux1 and flux2 values with interleave
    fun mergeFluxWithInterleave(flux1: Flux<User>, flux2: Flux<User>): Flux<User> =
            flux1.mergeWith(flux2)

    // Merge flux1 and flux2 values with no interleave (flux1 values and then flux2 values)
    fun mergeFluxWithNoInterleave(flux1: Flux<User>, flux2: Flux<User>): Flux<User> =
            flux1.concatWith(flux2)

    // Create a Flux containing the value of mono1 then the value of mono2
    fun createFluxFromMultipleMono(mono1: Mono<User>, mono2: Mono<User>): Flux<User> =
            Flux.concat(mono1, mono2)
}

Test

データ保管用に使用するクラスを定義します。まず、interface部分。

ReactiveRepository.kt
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

interface ReactiveRepository<T> {
    fun save(publisher: Publisher<T>): Mono<Void>
    fun findFirst(): Mono<T>
    fun findAll(): Flux<T>
    fun findById(id: String): Mono<T>
}

つづいて、Userデータクラスを使ったconcreate部分。

ReactiveUserRepository.kt
import java.time.Duration
import java.util.ArrayList
import java.util.Arrays
import java.util.function.BiFunction

import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

class ReactiveUserRepository(private val delayInMs: Long, vararg varUsers: User) : ReactiveRepository<User> {
    private val users: MutableList<User> = ArrayList(Arrays.asList(*varUsers))

    constructor(delayInMs: Long = DEFAULT_DELAY_IN_MS): this(delayInMs, User.SKYLER, User.JESSE, User.WALTER, User.SAUL)

    constructor(vararg users: User) : this(DEFAULT_DELAY_IN_MS, *users)

    override fun save(publisher: Publisher<User>): Mono<Void> {
        return withDelay(Flux.from(publisher)).doOnNext { u -> users.add(u) }.then()
    }

    override fun findFirst(): Mono<User> {
        return withDelay(Mono.just(users[0]))
    }

    override fun findAll(): Flux<User> {
        return withDelay(Flux.fromIterable(users))
    }

    override fun findById(id: String): Mono<User> {
        val user = users.stream().filter { (username1) -> username1 == id }
                .findFirst()
                .orElseThrow { IllegalArgumentException("No user with username $id found!") }
        return withDelay(Mono.just(user))
    }

    private fun withDelay(userMono: Mono<User>): Mono<User> {
        return Mono
                .delay(Duration.ofMillis(delayInMs))
                .flatMap { userMono }
    }

    private fun withDelay(userFlux: Flux<User>): Flux<User> {
        return Flux
                .interval(Duration.ofMillis(delayInMs))
                .zipWith<User, User>(userFlux, BiFunction { _: Long, user: User -> user })
    }

    companion object {
        const val DEFAULT_DELAY_IN_MS: Long = 100
    }
}

そして、テストコードです。

Part05MergeSpec.kt
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import reactor.test.test

object Part05MergeSpec: Spek({
    describe("Part05Merge") {
        val workshop = Part05Merge()

        val MARIE = User("mschrader", "Marie", "Schrader")
        val MIKE = User("mehrmantraut", "Mike", "Ehrmantraut")

        val repositoryWithDelay = ReactiveUserRepository(500)
        val repository          = ReactiveUserRepository(MARIE, MIKE)

        on("mergeWithInterleave") {
            val flux = workshop.mergeFluxWithInterleave(repositoryWithDelay.findAll(), repository.findAll())
            it("is merged") {
                flux.test()
                        .expectNext(MARIE, MIKE, User.SKYLER, User.JESSE, User.WALTER, User.SAUL)
                        .verifyComplete()
            }
        }

        on("mergeWithNoInterleave") {
            val flux = workshop.mergeFluxWithNoInterleave(repositoryWithDelay.findAll(), repository.findAll())
            it("is concatenated") {
                flux.test()
                        .expectNext(User.SKYLER, User.JESSE, User.WALTER, User.SAUL, MARIE, MIKE)
                        .verifyComplete()
            }
        }

        on("multipleMonoToFlux") {
            val skylerMono = repositoryWithDelay.findFirst()
            val marieMono = repository.findFirst()
            val flux = workshop.createFluxFromMultipleMono(skylerMono, marieMono)
            it("is converted") {
                flux.test()
                        .expectNext(User.SKYLER, MARIE)
                        .verifyComplete()
            }
        }
    }
})

Part06

残りはコードのみ。

Part06Request.kt
import reactor.core.publisher.Flux
import reactor.test.StepVerifier

class Part06Request {
    private val repository = ReactiveUserRepository()

    // Create a StepVerifier that initially requests all values and expect 4 values to be received
    fun requestAllExpectFour(flux: Flux<User>): StepVerifier {
        return StepVerifier.create(flux)
                .expectNextCount(4)
                .expectComplete()
    }

    // Create a StepVerifier that initially requests 1 value and expects User.SKYLER then requests another value and expects User.JESSE.
    fun requestOneExpectSkylerThenRequestOneExpectJesse(flux: Flux<User>): StepVerifier {
        return StepVerifier.create(flux, 1)
                .expectNext(User.SKYLER)
                .thenRequest(1)
                .expectNext(User.JESSE)
                .thenCancel()
    }

    // Return a Flux with all users stored in the repository that prints automatically logs for all Reactive Streams signals
    fun fluxWithLog(): Flux<User> {
        return repository
                .findAll()
                .log()
    }

    // Return a Flux with all users stored in the repository that prints "Starring:" on subscribe, "firstname lastname" for all values and "The end!" on complete
    fun fluxWithDoOnPrintln(): Flux<User> {
        return repository
                .findAll()
                .doOnSubscribe { println("Starring:") }
                .doOnNext { (_, firstname, lastname) -> println("$firstname $lastname") }
                .doOnComplete { println("The end!") }
    }
}
Part06RequestSpec.kt
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import reactor.test.StepVerifier

object Part06RequestSpec: Spek({
    describe("Part06Request") {
        val workshop = Part06Request()
        val repository = ReactiveUserRepository()

        on("requestAll") {
            val flux = repository.findAll()
            it("is requested") {
                workshop.requestAllExpectFour(flux).verify()
            }
        }

        on("requestOneByOne") {
            val flux = repository.findAll()
            it("is requested") {
                workshop.requestOneExpectSkylerThenRequestOneExpectJesse(flux).verify()
            }
        }

        on("experimentWithLog") {
            val flux = workshop.fluxWithLog()
            it("is ok") {
                StepVerifier.create(flux, 0)
                        .thenRequest(1)
                        .expectNextMatches {true}
                        .thenRequest(1)
                        .expectNextMatches {true}
                        .thenRequest(2)
                        .expectNextMatches {true}
                        .expectNextMatches {true}
                        .verifyComplete()
            }
        }

        on("experimentWithDoOn") {
            val flux = workshop.fluxWithDoOnPrintln()
            it("is ok") {
                StepVerifier.create(flux)
                        .expectNextCount(4)
                        .verifyComplete()
            }
        }
    }
})

Part07

Part07Errors.kt
import reactor.core.Exceptions
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

class Part07Errors {

    // Return a Mono<User> containing User.SAUL when an error occurs in the input Mono, else do not change the input Mono.
    fun betterCallSaulForBogusMono(mono: Mono<User>): Mono<User> {
        return mono.onErrorResume {
            Mono.just(User.SAUL)
        }
    }


    // Return a Flux<User> containing User.SAUL and User.JESSE when an error occurs in the input Flux, else do not change the input Flux.
    fun betterCallSaulAndJesseForBogusFlux(flux: Flux<User>): Flux<User> {
        return flux.onErrorResume {
            Flux.just(User.SAUL, User.JESSE)
        }
    }

    // Implement a method that capitalizes each user of the incoming flux using the
    // #capitalizeUser method and emits an error containing a GetOutOfHereException error
    fun capitalizeMany(flux: Flux<User>): Flux<User> {
        return flux.map {
            try {
                capitalizeUser(it)
            } catch (e: GetOutOfHereException) {
                throw Exceptions.propagate(e)
            }
        }
    }

    private fun capitalizeUser(user: User): User {
        if (user == User.SAUL) {
            throw GetOutOfHereException()
        }
        return User(user.username, user.firstname, user.lastname)
    }

    class GetOutOfHereException : Exception()
}
Part07ErrorsSpec.kt
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import reactor.test.test
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

object Part07ErrorsSpec: Spek({
    describe("Part07Errors") {
        val workshop = Part07Errors()

        on("monoWithValueInsteadOfErro") {
            val monoError = workshop.betterCallSaulForBogusMono(Mono.error(IllegalStateException()))
            val monoSuccess = workshop.betterCallSaulForBogusMono(Mono.just(User.SKYLER))

            it("is ok") {
                monoError.test()
                        .expectNext(User.SAUL)
                        .verifyComplete()

                monoSuccess.test()
                        .expectNext(User.SKYLER)
                        .verifyComplete()
            }
        }

        on("fluxWithValueInsteadOfError") {
            val fluxError = workshop.betterCallSaulAndJesseForBogusFlux(Flux.error(IllegalStateException()))
            val fluxSuccess = workshop.betterCallSaulAndJesseForBogusFlux(Flux.just(User.SKYLER, User.WALTER))

            it("is ok") {
                fluxError.test()
                        .expectNext(User.SAUL, User.JESSE)
                        .verifyComplete()

                fluxSuccess.test()
                        .expectNext(User.SKYLER, User.WALTER)
                        .verifyComplete()
            }
        }

        on("handleCheckedExceptions") {
            val flux = workshop.capitalizeMany(Flux.just(User.SAUL, User.JESSE))

            it("has error") {
                flux.test()
                        .verifyError(Part07Errors.GetOutOfHereException::class.java)
            }
        }
    }
})

Part08

Part08OtherOperations.kt
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

class Part08OtherOperations {

    // Create a Flux of user from Flux of username, firstname and lastname.
    fun userFluxFromStringFlux(usernameFlux: Flux<String>, firstnameFlux: Flux<String>, lastnameFlux: Flux<String>): Flux<User> {
        return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux)
                .map { tuple -> User(tuple.t1, tuple.t2, tuple.t3) }
    }

    // Return the mono which returns its value faster
    fun useFastestMono(mono1: Mono<User>, mono2: Mono<User>): Mono<User> = Mono.first(mono1, mono2)

    // Return the flux which returns the first value faster
    fun useFastestFlux(flux1: Flux<User>, flux2: Flux<User>): Flux<User> = Flux.first(flux1, flux2)

    // Convert the input Flux<User> to a Mono<Void> that represents the complete signal of the flux
    fun fluxCompletion(flux: Flux<User>): Mono<Void> = flux.then()

    // Return a valid Mono of user for null input and non null input user (hint: Reactive Streams do not accept null values)
    fun nullAwareUserToMono(user: User?): Mono<User> = Mono.justOrEmpty(user)

    // Return the same mono passed as input parameter, expect that it will emit User.SKYLER when empty
    fun emptyToSkyler(mono: Mono<User>): Mono<User> = mono.defaultIfEmpty(User.SKYLER)
}
Part08OtherOperationsSpec.kt
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import reactor.test.test
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.test.publisher.PublisherProbe

object Part08OtherOperationsSpec: Spek({
    describe("Part08OtherOperations") {
        val workshop = Part08OtherOperations()

        val MARIE = User("mschrader", "Marie", "Schrader")
        val MIKE = User("mehrmantraut", "Mike", "Ehrmantraut")

        on("zipFirstNameAndLastName") {
            val usernameFlux = Flux.just(User.SKYLER.username, User.JESSE.username, User.WALTER.username, User.SAUL.username)
            val firstnameFlux = Flux.just<String>(User.SKYLER.firstname, User.JESSE.firstname, User.WALTER.firstname, User.SAUL.firstname)
            val lastnameFlux = Flux.just<String>(User.SKYLER.lastname, User.JESSE.lastname, User.WALTER.lastname, User.SAUL.lastname)

            val userFlux = workshop.userFluxFromStringFlux(usernameFlux, firstnameFlux, lastnameFlux)
            it("is ok") {
                userFlux.test()
                        .expectNext(User.SKYLER, User.JESSE, User.WALTER, User.SAUL)
                        .verifyComplete()
            }
        }

        on("fastestMono") {
            it("is marie") {
                val repository: ReactiveRepository<User> = ReactiveUserRepository(MARIE)
                val repositoryWithDelay: ReactiveRepository<User> = ReactiveUserRepository(250, MIKE)
                val mono = workshop.useFastestMono(repository.findFirst(), repositoryWithDelay.findFirst())
                mono.test()
                        .expectNext(MARIE)
                        .verifyComplete()
            }

            it("is mike") {
                val repository = ReactiveUserRepository(250, MARIE)
                val repositoryWithDelay = ReactiveUserRepository(MIKE)
                val mono = workshop.useFastestMono(repository.findFirst(), repositoryWithDelay.findFirst())
                mono.test()
                        .expectNext(MIKE)
                        .verifyComplete()
            }
        }

        on("fastestFlux") {
            it("is marie and mike") {
                val repository: ReactiveRepository<User> = ReactiveUserRepository(MARIE, MIKE)
                val repositoryWithDelay: ReactiveRepository<User> = ReactiveUserRepository(250)
                val flux = workshop.useFastestFlux(repository.findAll(), repositoryWithDelay.findAll())
                flux.test()
                        .expectNext(MARIE, MIKE)
                        .verifyComplete()
            }

            it("is not marie or mike") {
                val repository = ReactiveUserRepository(250, MARIE, MIKE)
                val repositoryWithDelay = ReactiveUserRepository()
                val flux = workshop.useFastestFlux(repository.findAll(), repositoryWithDelay.findAll())
                flux.test()
                        .expectNext(User.SKYLER, User.JESSE, User.WALTER, User.SAUL)
                        .verifyComplete()
            }
        }

        on("complete") {
            val repository = ReactiveUserRepository()
            val probe = PublisherProbe.of(repository.findAll())
            val completion = workshop.fluxCompletion(probe.flux())
            it("is ok") {
                completion.test()
                        .verifyComplete()
                probe.assertWasRequested()
            }
        }

        on("nullHandling") {
            it("is skyler") {
                val mono = workshop.nullAwareUserToMono(User.SKYLER)
                mono.test()
                        .expectNext(User.SKYLER)
                        .verifyComplete()
            }
            it("is null") {
                val mono = workshop.nullAwareUserToMono(null)
                mono.test()
                        .verifyComplete()
            }
        }

        on("emptyHandling") {
            it("is walter") {
                val mono = workshop.emptyToSkyler(Mono.just(User.WALTER))
                mono.test()
                        .expectNext(User.WALTER)
                        .verifyComplete()
            }

            it("is skyler") {
                val mono = workshop.emptyToSkyler(Mono.empty())
                mono.test()
                        .expectNext(User.SKYLER)
                        .verifyComplete()
            }
        }
    }
})

残り

Part09以降は力尽きたのでここでおしまい。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?