LoginSignup
55
38

More than 3 years have passed since last update.

CoroutinesのlaunchとasyncとChannelとFlowの使い分け

Posted at

はじめに

Kotlinでは非同期処理(スレッド)の実装方法が色々でてきました。
Androidアプリ開発をやっている私としてはKotlinでアプリ開発をすると、正直何を使って良いのかわからない。

今一度、詳細を調べてみて、使い分けをはっきりさせようと思います。

動作環境

この記事の動作環境は以下のとおりです。

Kotln:1.4.10

目標

以下を目標とします。

** Coroutinesによる非同期処理の使い分けをはっきりさせる**

各種メソッドの定義

とりあえず、よく見かけるlaunch、async、flow関数(Flowビルダー)、Flow#collect、FlowCollector#emit、Channel#send、Channel#receiveの定義を確認してます。
サイトに少しまとまっているのでそこからまとめてみます。

メソッド メソッドの定義 戻り値 概要
launch fun CoroutineScope.launch(
 context: CoroutineContext = EmptyCoroutineContext,
 start: CoroutineStart = CoroutineStart.DEFAULT,
 block: suspend CoroutineScope.() -> Unit
): Job (source)
Job 任意の結果を返せないコルーチン
async fun CoroutineScope.async(
 context: CoroutineContext = EmptyCoroutineContext,
 start: CoroutineStart = CoroutineStart.DEFAULT,
 block: suspend CoroutineScope.() -> T
): Deferred (source)
Deferred 任意の単一の結果を返す
Channel#send abstract suspend fun send(element: E): Unit (source) なし Channelに結果を送信する。
Channel#receive abstract suspend fun receive(): E (source) Channelのインスタンス化した時にジェネリクスで指定した型 Channnelからデータを取得する。
flow関数(Flowビルダー) fun flow(
block: suspend FlowCollector.() -> Unit
): Flow (source)
Flowオブジェクト ビルダーでインスタンス化を生成する際にジェネリクスで指定した型のデータを返す
FlowCollector#emit abstract suspend fun emit(value: T): Unit (source) なし Flowに値を送信する。
Flow#collect abstract suspend fun collect(
 collector: FlowCollector
): Unit (source)
なし Flowがデータを受け取るやつ。collect拡張関数で収集した値を出力する。

とりあえず、並べてみたけど、一番最初の感想は、「よくわからない!!」です。
やっぱり、実際に動かして見ながら理解していきます。

サンプルプロジェクト

githubからダウンロード可能です。

launch

概要

launchをまず図にしてみました。

launchはスレッドを生成して、実行してくれます。
とうぜんメインの処理とは別スレッドになるので、launchしたスレッドが終わるまで待つ歯科差なさそうです。
スレッドの状態を確認するプロパティがあるので、うまく使って待ってあげる必要がありそうです。

状態を確認するプロパティは下記のようなものがあります。

メソッド 説明
Job#isActive キャンセルされておらず、スレッドが実行中の時にtrueを返す。
Job#isCancelled スレッドがキャンセルされている時にtrueを返す。
Job#isCompleted スレッドの実行が完了している時にtrueを返す。

サンプルコード

実際に、launchを使ったコードを確認します。
新しいスレッドでは、配列の文字列を標準出力で表示しています。

Launch.kt
package launch.basic

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

val stories = arrayOf(
    "赤ずきんちゃんの4行ストーリ",
    "1.赤ずきんちゃん、おばあさんの家に行く",
    "2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
    "3.赤ずきんちゃん、オオカミに食べられる",
    "4.赤ずきんちゃん、猟師に助けられる"
)

fun main() {
    val job = CoroutineScope(Dispatchers.Default).launch {
        repeat(5) { count ->

            delay(500.toLong())

            println(stories[count])
        }
    }

    // Coroutineが終わるまで待つ
    while (!job.isCompleted) {
        Thread.sleep(100)
    }
}

async

次は、よく見かけるasyncを確認していきます。

概要

asyncの特徴をまず図にしてみました。

launchとの違いは、戻り値を返せるということです。
戻り値の型に特に制限が無いため、任意の値を返せます。
また、launchの場合、処理が終わったかどうかをプロパティで判断していました。asyncの場合は、戻り値であるDeferredに処理が終わるまで待つメソッドが用意されています。また、キャンセルも行えます。協力的なキャンセルになっています。協力的なキャンセルについてはこの記事を参照してください。

メソッドの終了と戻り値を受け取るメソッドは以下のとおりです。

メソッド 説明
Deferred#await asyncの処理が終わるまでまちます。
Deferred#getComplete 戻り値を取得できます。戻り値の型に制約はないので、任意の型を指定できます。

サンプルコード

基本

AsyncBasic.kt
package async.basic

import kotlinx.coroutines.*
import launch.basic.stories

val stories = arrayOf(
    "赤ずきんちゃんの4行ストーリ",
    "1.赤ずきんちゃん、おばあさんの家に行く",
    "2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
    "3.赤ずきんちゃん、オオカミに食べられる",
    "4.赤ずきんちゃん、猟師に助けられる"
)

fun main() {
    val job = CoroutineScope(Dispatchers.Default).launch {
        repeat(5) { count ->

            val deffer = async {
                delay(500)
                stories[count]
            }

            println("${count + 1}回目のasyncが終わるまで待つ")
            deffer.await()
            println(deffer.getCompleted())
        }
    }

    // Coroutineが終わるまで待つ
    while (!job.isCompleted) {
        Thread.sleep(100)
    }
}

cancel

続いて、キャンセルを実装したコードも確認しておきましょう。

AsyncCancel.kt
package async.cancel

import kotlinx.coroutines.*

val stories = arrayOf(
    "赤ずきんちゃんの4行ストーリ",
    "1.赤ずきんちゃん、おばあさんの家に行く",
    "2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
    "3.赤ずきんちゃん、オオカミに食べられる",
    "4.赤ずきんちゃん、猟師に助けられる"
)

fun main() {
    val job = CoroutineScope(Dispatchers.Default).launch {
        repeat(5) { count ->

            val deffer = async {
                // 2回目でキャンセル
                if (count == 2) {
                    cancel("キャンセル")
                }
                delay(500)
                launch.basic.stories[count]
            }

            println("${count + 1}回目のasyncが終わるまで待つ")
            deffer.await()
            println(deffer.getCompleted())
        }
    }

    // Coroutineが終わるまで待つ
    while (!job.isCompleted || !job.isCancelled) {
        Thread.sleep(100)
    }
}

Channelの基本

いよいよ1.4で追加されたFlowやChannelについて説明していきます。
まずはChannelから確認していきます。

概要

channelをまず図にしてみました。

Channelはホットストリームなんて言われています。
なんとなくのChannelのイメージですが、入れ物的なものでしょうか。
ChannelはメインからもCoroutineからでも使える入れ物で、単なる入れ物ではなく、取り出す(受信する)スレッドに待ってもらったり、Channelにデータを入れる(送信する)スレッドには、データが入っているので送信を待ってもらうなどの交通整理を行います。

Channelがホットストリームといわれる所以であり、特徴的として「 受信者(値を取り出す処理)があるかどうかに関わらず送信処理(データを入れる)を行う。 」ということです。

そのため以下の注意が必要です。

送信した回数分、受信しないとメモリーリークを起こす。言い換えると、受け取ってもらえるまで送信側の処理が動き続ける!!

送受信のメソッドについてサンプルコードを見る前に確認しましょう。

メソッド 説明
Channel#send Channelにデータを送信する。
Channel#recieve Channelからデータを受け取る。

サンプルコード

ChannelBasic.kt
package channel.basic

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

val stories = arrayOf(
    "赤ずきんちゃんの4行ストーリ",
    "1.赤ずきんちゃん、おばあさんの家に行く",
    "2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
    "3.赤ずきんちゃん、オオカミに食べられる",
    "4.赤ずきんちゃん、猟師に助けられる"
)

fun main() = runBlocking {
    // Channel生成(Stringのみを送受信できる)
    val channel = Channel<String>()

    // 非同期処理の起動
    launch {
        stories.forEach {

            // Channelに文字列を送信
            channel.send(it)
            println("送信:$it")
            // 1行待ち合わせ
            delay(1000)
        }
    }

    // Channelに5回送信されるため5回繰り返す
    repeat(5) {
        // Channelから受信
        val story = channel.receive()
        println("受信:$story")
    }

    // 他に下記のようにも記述可能
//    for (story in channel) {
//        // Channelから受信
//        val story = channel.receive()
//        println("受信:$story")
//    }

    println("おわり")
}

データの受取る方法として、forを使っても問題ありません。

    for (story in channel) {
        // Channelから受信
        val story = channel.receive()
        println("受信:$story")
    }

ChannelのBuffer

概要

Channelはデータの入れ物と説明しましたが、これまでは単一のデータしか格納できませんでした。しかし、バッファーを設定することによって、複数のでーたを格納できます。
まず図にしてみました。

サンプルコード

Channelをインスタンス化する際、コンストラクターに整数を渡すことによりバッファーを設定できます。

ChannelBuffer.kt
package channel.buffer

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

val stories = arrayOf(
    "赤ずきんちゃんの4行ストーリ",
    "1.赤ずきんちゃん、おばあさんの家に行く",
    "2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
    "3.赤ずきんちゃん、オオカミに食べられる",
    "4.赤ずきんちゃん、猟師に助けられる"
)

fun main() = runBlocking {
    // Channel生成(Stringのみを送受信できる)
    val channel = Channel<String>(5)

    // 非同期処理の起動
    launch {
        stories.forEach {
            // Channelに文字列を送信
            channel.send(it)
            println("送信:$it")
        }
    }

    // Channelに5回送信されるため5回繰り返す
    repeat(5) {
        Thread.sleep(1000)
        // Channelから受信
        val story = channel.receive()
        println("受信:$story")
    }
    println("おわり")
}

Channelのcloseとキャンセル

概要

Channelはメモリーリークしてしまう可能性があると説明しましたが、回避する方法として、closeやcacelがあります。
まず図にしてみました。

それぞれ特徴が異なります。
メソッドの説明は以下のとおりです。

メソッド 説明
Channel#canel 要素の受信をキャンセルします。Channelを閉じ、バッファリングされた送信済み要素をすべて削除します。キャンセルをすると例外(java.util.concurrent.CancellationException)が発生します。
Channel#close Channelを閉じる。今後は呼び出してもfalseを返す。

サンプルコード

cancel

ChannelCancel.kt
package channel.cancel


import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

val stories = arrayOf(
    "赤ずきんちゃんの4行ストーリ",
    "1.赤ずきんちゃん、おばあさんの家に行く",
    "2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
    "3.赤ずきんちゃん、オオカミに食べられる",
    "4.赤ずきんちゃん、猟師に助けられる"
)

fun main() = runBlocking {
    // Channel生成(Stringのみを送受信できる)
    val channel = Channel<String>()

    // 非同期処理の起動
    launch {
        stories.forEach {

            // Channelに文字列を送信
            channel.send(it)
            println("送信:$it")
            // 1行待ち合わせ
            delay(500)

            // 途中で中断
            if (stories.indexOf(it) == 2) {
                channel.cancel()
            }
        }
    }

    // Channelに5回送信されるため5回繰り返す
    repeat(5) {
        // Channelから受信
        val story = channel.receive()
        println("受信:$story")
    }
    println("おわり")
}

close

ChannelClose.kt
package channel.close

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

val stories = arrayOf(
    "赤ずきんちゃんの4行ストーリ",
    "1.赤ずきんちゃん、おばあさんの家に行く",
    "2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
    "3.赤ずきんちゃん、オオカミに食べられる",
    "4.赤ずきんちゃん、猟師に助けられる"
)

fun main() = runBlocking {
    // Channel生成(Stringのみを送受信できる)
    val channel = Channel<String>()

    // 非同期処理の起動
    launch {
        stories.forEach {

            // Channelに文字列を送信
            channel.send(it)
            println("送信:$it")
            // 1行待ち合わせ
            delay(500)

            // 途中で中断
            if (stories.indexOf(it) == 2) {
                channel.close()
            }
        }
    }

    // Channelに5回送信されるため5回繰り返す
    repeat(5) {
        // Channelから受信
        val story = channel.receive()
        println("受信:$story")
    }
    println("おわり")
}

Flowの基本

概要

Flowはコールドストリームと呼ばれています。ホットストリームであるChannelと大きく動きが異なります。
まず図にしてみました。

FlowはChannelと違い、受信する処理(collectメソッド)が確定していないと、送信処理(emitメソッド)を行いません。結果として、Flowのは実行されません。
そのため、メモリーリークは発生しません。
また、キャンセルは協力的なキャンセルを実装しています。
ここでは、基本的な呼び出しとキャンセルのサンプルコードを確認します。

サンプルコード

基本的なFlowの使い方

FlowBasic.kt
package flow.basic

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

val stories = arrayOf(
    "赤ずきんちゃんの4行ストーリ",
    "1.赤ずきんちゃん、おばあさんの家に行く",
    "2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
    "3.赤ずきんちゃん、オオカミに食べられる",
    "4.赤ずきんちゃん、猟師に助けられる"
)

fun teller() = flow<String> {

    repeat(stories.count()) {
        Thread.sleep(1000)
        emit(stories[it])
        println("${it+1}回目のemitを呼び出したよ")
    }
}


fun main() = runBlocking<Unit> {
    launch {
        for (i in 1..3) {
            println("${i}mainメソッドで回目の遅延処理")
            delay(100)
        }
    }

    val collector = teller()

    collector.collect { value ->
        println(value)
        Thread.sleep(100)
    }
    println("おわり")
}

協力的なキャンセルを実装した例

FlowCancel.kt
package flow.cancel

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull

val stories = arrayOf(
    "赤ずきんちゃんの4行ストーリ",
    "1.赤ずきんちゃん、おばあさんの家に行く",
    "2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
    "3.赤ずきんちゃん、オオカミに食べられる",
    "4.赤ずきんちゃん、猟師に助けられる"
)

fun teller() = flow<String> {

    repeat(stories.count()) {
        Thread.sleep(1000)
        emit(stories[it])
        println("${it + 1}回目のemitを呼び出したよ")
    }
}


fun main() = runBlocking<Unit> {

    val collector = teller()

    // 2.5秒以上かかる場合は、キャンセルする
    withTimeoutOrNull(2500) {
        collector.collect { value ->
            println(value)
            Thread.sleep(100)
        }
    }
    println("おわり")
}

まとめ

ここまで説明した内容をまとめた上で、使い分けを考えるとしたら、以下のようになります。

種類 使い分け
launch 戻り値は不要でコルーチンを起動したいとき
async 任意の戻り値が必要で、複数の処理を並列で実行して、処理を待ち合わせたいとき
Channel 同じ型の複数の戻り値でリアルタイムで処理を行いとき
Flow 同じ型の複数の戻り値で必ず処理を完結させる必要があるとき

となります。
Androidで考えるとlaunchでコルーチンを起動して、async、Channel、Flowを子コルーチンとして実行することが現実的な気がしてきました。

55
38
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
55
38