はじめに
Kotlinでは非同期処理(スレッド)の実装方法が色々でてきました。
Androidアプリ開発をやっている私としてはKotlinでアプリ開発をすると、正直何を使って良いのかわからない。
今一度、詳細を調べてみて、使い分けをはっきりさせようと思います。
動作環境
この記事の動作環境は以下のとおりです。
Kotln:1.4.10
目標
以下を目標とします。
** Coroutinesによる非同期処理の使い分けをはっきりさせる**
各種メソッドの定義
とりあえず、よく見かけるlaunch、async、flow関数(Flowビルダー)、Flow#collect、FlowCollector#emit、Channel#send、Channel#receiveの定義を確認してます。
サイトに少しまとまっているのでそこからまとめてみます。
メソッド | メソッドの定義 | 戻り値 | 概要 |
---|---|---|---|
launch | fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job (source) |
Job | 任意の結果を返せないコルーチン |
async | fun CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred (source) |
Deferred | 任意の単一の結果を返す |
Channel#send | abstract suspend fun send(element: E): Unit (source) | なし | Channelに結果を送信する。 |
Channel#receive | abstract suspend fun receive(): E (source) | Channelのインスタンス化した時にジェネリクスで指定した型 | Channnelからデータを取得する。 |
flow関数(Flowビルダー) | fun flow( block: suspend FlowCollector.() -> Unit ): Flow (source) |
Flowオブジェクト | ビルダーでインスタンス化を生成する際にジェネリクスで指定した型のデータを返す |
FlowCollector#emit | abstract suspend fun emit(value: T): Unit (source) | なし | Flowに値を送信する。 |
Flow#collect | abstract suspend fun collect( collector: FlowCollector ): Unit (source) |
なし | Flowがデータを受け取るやつ。collect拡張関数で収集した値を出力する。 |
とりあえず、並べてみたけど、一番最初の感想は、「よくわからない!!」です。
やっぱり、実際に動かして見ながら理解していきます。
サンプルプロジェクト
githubからダウンロード可能です。
launch
概要
launchはスレッドを生成して、実行してくれます。
とうぜんメインの処理とは別スレッドになるので、launchしたスレッドが終わるまで待つ歯科差なさそうです。
スレッドの状態を確認するプロパティがあるので、うまく使って待ってあげる必要がありそうです。
状態を確認するプロパティは下記のようなものがあります。
メソッド | 説明 |
---|---|
Job#isActive | キャンセルされておらず、スレッドが実行中の時にtrueを返す。 |
Job#isCancelled | スレッドがキャンセルされている時にtrueを返す。 |
Job#isCompleted | スレッドの実行が完了している時にtrueを返す。 |
サンプルコード
実際に、launchを使ったコードを確認します。
新しいスレッドでは、配列の文字列を標準出力で表示しています。
package launch.basic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
val stories = arrayOf(
"赤ずきんちゃんの4行ストーリ",
"1.赤ずきんちゃん、おばあさんの家に行く",
"2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
"3.赤ずきんちゃん、オオカミに食べられる",
"4.赤ずきんちゃん、猟師に助けられる"
)
fun main() {
val job = CoroutineScope(Dispatchers.Default).launch {
repeat(5) { count ->
delay(500.toLong())
println(stories[count])
}
}
// Coroutineが終わるまで待つ
while (!job.isCompleted) {
Thread.sleep(100)
}
}
async
次は、よく見かけるasyncを確認していきます。
概要
launchとの違いは、戻り値を返せるということです。
戻り値の型に特に制限が無いため、任意の値を返せます。
また、launchの場合、処理が終わったかどうかをプロパティで判断していました。asyncの場合は、戻り値であるDeferredに処理が終わるまで待つメソッドが用意されています。また、キャンセルも行えます。協力的なキャンセルになっています。協力的なキャンセルについてはこの記事を参照してください。
メソッドの終了と戻り値を受け取るメソッドは以下のとおりです。
メソッド | 説明 |
---|---|
Deferred#await | asyncの処理が終わるまでまちます。 |
Deferred#getComplete | 戻り値を取得できます。戻り値の型に制約はないので、任意の型を指定できます。 |
サンプルコード
基本
package async.basic
import kotlinx.coroutines.*
import launch.basic.stories
val stories = arrayOf(
"赤ずきんちゃんの4行ストーリ",
"1.赤ずきんちゃん、おばあさんの家に行く",
"2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
"3.赤ずきんちゃん、オオカミに食べられる",
"4.赤ずきんちゃん、猟師に助けられる"
)
fun main() {
val job = CoroutineScope(Dispatchers.Default).launch {
repeat(5) { count ->
val deffer = async {
delay(500)
stories[count]
}
println("${count + 1}回目のasyncが終わるまで待つ")
deffer.await()
println(deffer.getCompleted())
}
}
// Coroutineが終わるまで待つ
while (!job.isCompleted) {
Thread.sleep(100)
}
}
cancel
続いて、キャンセルを実装したコードも確認しておきましょう。
package async.cancel
import kotlinx.coroutines.*
val stories = arrayOf(
"赤ずきんちゃんの4行ストーリ",
"1.赤ずきんちゃん、おばあさんの家に行く",
"2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
"3.赤ずきんちゃん、オオカミに食べられる",
"4.赤ずきんちゃん、猟師に助けられる"
)
fun main() {
val job = CoroutineScope(Dispatchers.Default).launch {
repeat(5) { count ->
val deffer = async {
// 2回目でキャンセル
if (count == 2) {
cancel("キャンセル")
}
delay(500)
launch.basic.stories[count]
}
println("${count + 1}回目のasyncが終わるまで待つ")
deffer.await()
println(deffer.getCompleted())
}
}
// Coroutineが終わるまで待つ
while (!job.isCompleted || !job.isCancelled) {
Thread.sleep(100)
}
}
Channelの基本
いよいよ1.4で追加されたFlowやChannelについて説明していきます。
まずはChannelから確認していきます。
概要
Channelはホットストリームなんて言われています。
なんとなくのChannelのイメージですが、入れ物的なものでしょうか。
ChannelはメインからもCoroutineからでも使える入れ物で、単なる入れ物ではなく、取り出す(受信する)スレッドに待ってもらったり、Channelにデータを入れる(送信する)スレッドには、データが入っているので送信を待ってもらうなどの交通整理を行います。
Channelがホットストリームといわれる所以であり、特徴的として「 受信者(値を取り出す処理)があるかどうかに関わらず送信処理(データを入れる)を行う。 」ということです。
そのため以下の注意が必要です。
送信した回数分、受信しないとメモリーリークを起こす。言い換えると、受け取ってもらえるまで送信側の処理が動き続ける!!
送受信のメソッドについてサンプルコードを見る前に確認しましょう。
メソッド | 説明 |
---|---|
Channel#send | Channelにデータを送信する。 |
Channel#recieve | Channelからデータを受け取る。 |
サンプルコード
package channel.basic
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
val stories = arrayOf(
"赤ずきんちゃんの4行ストーリ",
"1.赤ずきんちゃん、おばあさんの家に行く",
"2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
"3.赤ずきんちゃん、オオカミに食べられる",
"4.赤ずきんちゃん、猟師に助けられる"
)
fun main() = runBlocking {
// Channel生成(Stringのみを送受信できる)
val channel = Channel<String>()
// 非同期処理の起動
launch {
stories.forEach {
// Channelに文字列を送信
channel.send(it)
println("送信:$it")
// 1行待ち合わせ
delay(1000)
}
}
// Channelに5回送信されるため5回繰り返す
repeat(5) {
// Channelから受信
val story = channel.receive()
println("受信:$story")
}
// 他に下記のようにも記述可能
// for (story in channel) {
// // Channelから受信
// val story = channel.receive()
// println("受信:$story")
// }
println("おわり")
}
データの受取る方法として、forを使っても問題ありません。
for (story in channel) {
// Channelから受信
val story = channel.receive()
println("受信:$story")
}
ChannelのBuffer
概要
Channelはデータの入れ物と説明しましたが、これまでは単一のデータしか格納できませんでした。しかし、バッファーを設定することによって、複数のでーたを格納できます。
まず図にしてみました。
サンプルコード
Channelをインスタンス化する際、コンストラクターに整数を渡すことによりバッファーを設定できます。
package channel.buffer
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
val stories = arrayOf(
"赤ずきんちゃんの4行ストーリ",
"1.赤ずきんちゃん、おばあさんの家に行く",
"2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
"3.赤ずきんちゃん、オオカミに食べられる",
"4.赤ずきんちゃん、猟師に助けられる"
)
fun main() = runBlocking {
// Channel生成(Stringのみを送受信できる)
val channel = Channel<String>(5)
// 非同期処理の起動
launch {
stories.forEach {
// Channelに文字列を送信
channel.send(it)
println("送信:$it")
}
}
// Channelに5回送信されるため5回繰り返す
repeat(5) {
Thread.sleep(1000)
// Channelから受信
val story = channel.receive()
println("受信:$story")
}
println("おわり")
}
Channelのcloseとキャンセル
概要
Channelはメモリーリークしてしまう可能性があると説明しましたが、回避する方法として、closeやcacelがあります。
まず図にしてみました。
それぞれ特徴が異なります。
メソッドの説明は以下のとおりです。
メソッド | 説明 |
---|---|
Channel#canel | 要素の受信をキャンセルします。Channelを閉じ、バッファリングされた送信済み要素をすべて削除します。キャンセルをすると例外(java.util.concurrent.CancellationException)が発生します。 |
Channel#close | Channelを閉じる。今後は呼び出してもfalseを返す。 |
サンプルコード
cancel
package channel.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
val stories = arrayOf(
"赤ずきんちゃんの4行ストーリ",
"1.赤ずきんちゃん、おばあさんの家に行く",
"2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
"3.赤ずきんちゃん、オオカミに食べられる",
"4.赤ずきんちゃん、猟師に助けられる"
)
fun main() = runBlocking {
// Channel生成(Stringのみを送受信できる)
val channel = Channel<String>()
// 非同期処理の起動
launch {
stories.forEach {
// Channelに文字列を送信
channel.send(it)
println("送信:$it")
// 1行待ち合わせ
delay(500)
// 途中で中断
if (stories.indexOf(it) == 2) {
channel.cancel()
}
}
}
// Channelに5回送信されるため5回繰り返す
repeat(5) {
// Channelから受信
val story = channel.receive()
println("受信:$story")
}
println("おわり")
}
close
package channel.close
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
val stories = arrayOf(
"赤ずきんちゃんの4行ストーリ",
"1.赤ずきんちゃん、おばあさんの家に行く",
"2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
"3.赤ずきんちゃん、オオカミに食べられる",
"4.赤ずきんちゃん、猟師に助けられる"
)
fun main() = runBlocking {
// Channel生成(Stringのみを送受信できる)
val channel = Channel<String>()
// 非同期処理の起動
launch {
stories.forEach {
// Channelに文字列を送信
channel.send(it)
println("送信:$it")
// 1行待ち合わせ
delay(500)
// 途中で中断
if (stories.indexOf(it) == 2) {
channel.close()
}
}
}
// Channelに5回送信されるため5回繰り返す
repeat(5) {
// Channelから受信
val story = channel.receive()
println("受信:$story")
}
println("おわり")
}
Flowの基本
概要
Flowはコールドストリームと呼ばれています。ホットストリームであるChannelと大きく動きが異なります。
まず図にしてみました。
FlowはChannelと違い、受信する処理(collectメソッド)が確定していないと、送信処理(emitメソッド)を行いません。結果として、Flowのは実行されません。
そのため、メモリーリークは発生しません。
また、キャンセルは協力的なキャンセルを実装しています。
ここでは、基本的な呼び出しとキャンセルのサンプルコードを確認します。
サンプルコード
基本的なFlowの使い方
package flow.basic
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
val stories = arrayOf(
"赤ずきんちゃんの4行ストーリ",
"1.赤ずきんちゃん、おばあさんの家に行く",
"2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
"3.赤ずきんちゃん、オオカミに食べられる",
"4.赤ずきんちゃん、猟師に助けられる"
)
fun teller() = flow<String> {
repeat(stories.count()) {
Thread.sleep(1000)
emit(stories[it])
println("${it+1}回目のemitを呼び出したよ")
}
}
fun main() = runBlocking<Unit> {
launch {
for (i in 1..3) {
println("${i}mainメソッドで回目の遅延処理")
delay(100)
}
}
val collector = teller()
collector.collect { value ->
println(value)
Thread.sleep(100)
}
println("おわり")
}
協力的なキャンセルを実装した例
package flow.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull
val stories = arrayOf(
"赤ずきんちゃんの4行ストーリ",
"1.赤ずきんちゃん、おばあさんの家に行く",
"2.赤ずきんちゃん、オオカミと遭遇、おばあさんオオカミに食べられる",
"3.赤ずきんちゃん、オオカミに食べられる",
"4.赤ずきんちゃん、猟師に助けられる"
)
fun teller() = flow<String> {
repeat(stories.count()) {
Thread.sleep(1000)
emit(stories[it])
println("${it + 1}回目のemitを呼び出したよ")
}
}
fun main() = runBlocking<Unit> {
val collector = teller()
// 2.5秒以上かかる場合は、キャンセルする
withTimeoutOrNull(2500) {
collector.collect { value ->
println(value)
Thread.sleep(100)
}
}
println("おわり")
}
まとめ
ここまで説明した内容をまとめた上で、使い分けを考えるとしたら、以下のようになります。
種類 | 使い分け |
---|---|
launch | 戻り値は不要でコルーチンを起動したいとき |
async | 任意の戻り値が必要で、複数の処理を並列で実行して、処理を待ち合わせたいとき |
Channel | 同じ型の複数の戻り値でリアルタイムで処理を行いとき |
Flow | 同じ型の複数の戻り値で必ず処理を完結させる必要があるとき |
となります。
Androidで考えるとlaunchでコルーチンを起動して、async、Channel、Flowを子コルーチンとして実行することが現実的な気がしてきました。