0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Kotlin】「Kotlin I/O × Coroutine」 — 非同期ファイル読み書きと Channel の活用

Last updated at Posted at 2025-10-14

概要

従来の Kotlin I/O は同期的(blocking)な設計が多く、
大きなファイルやネットワーク I/O を扱うとスレッドがブロックされがちです。

しかし Kotlin の Coroutine(コルーチン) を組み合わせることで、
ノンブロッキングな非同期I/O をシンプルに実現できます。


1. なぜ Coroutine × I/O なのか?

比較項目 同期I/O Coroutine × I/O
スレッドブロッキング あり なし(suspendで中断再開)
同時処理 難しい 簡単(launch / async)
CPU利用効率 低い 高い
記述の簡潔さ 普通 高い(suspend関数)

2. Coroutine I/O の基本構造

Kotlin では Dispatchers.IO を使うことで
ファイルI/OやDBアクセスなどの I/O向けスレッドプール に自動切り替えが行われます。

import kotlinx.coroutines.*
import java.io.File

suspend fun readFileAsync(path: String): String =
    withContext(Dispatchers.IO) {
        File(path).readText()
    }

fun main() = runBlocking {
    val content = readFileAsync("data.txt")
    println(content)
}

withContext(Dispatchers.IO)
I/O専用スレッドプールで安全に実行 するためのブロック。


3. 複数ファイルを並列読み込み

import kotlinx.coroutines.*
import java.io.File

suspend fun readFile(path: String): String = withContext(Dispatchers.IO) {
    File(path).readText()
}

fun main() = runBlocking {
    val files = listOf("a.txt", "b.txt", "c.txt")

    val results = files.map { path ->
        async { readFile(path) }
    }.awaitAll()

    results.forEachIndexed { i, content ->
        println("File ${files[i]}: ${content.length} bytes")
    }
}

async + awaitAll()
3つのファイルを 同時非同期に読み込み 可能。


4. ファイルを逐次読み込み(ストリーム処理)

ファイルを1行ずつ処理する場合、
useLines を suspend 化するだけで高速・省メモリ処理が可能です。

import kotlinx.coroutines.*
import kotlin.io.path.*

suspend fun processLargeFile(path: String) = withContext(Dispatchers.IO) {
    path.useLines { lines ->
        lines.forEach { line ->
            // 重い処理を模擬
            println("処理中: $line")
        }
    }
}

fun main() = runBlocking {
    val path = Path("large.txt")
    processLargeFile(path)
}

メリット:

  • メモリ効率が高い(全体を読み込まない)
  • Dispatchers.IO により安全な非同期処理が可能

5. Channel を使ったリアクティブな I/O パイプライン

Channel は、コルーチン間でデータを安全に渡すための非同期キュー
ファイル→処理→出力 という流れを自然に作れます。

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import java.io.File

fun CoroutineScope.fileReader(path: String): ReceiveChannel<String> = produce {
    File(path).forEachLine { send(it) }
    close()
}

fun CoroutineScope.fileProcessor(input: ReceiveChannel<String>): ReceiveChannel<String> = produce {
    for (line in input) {
        send("Processed: ${line.uppercase()}")
    }
}

fun CoroutineScope.fileWriter(outputPath: String, input: ReceiveChannel<String>) = launch {
    File(outputPath).printWriter().use { writer ->
        for (line in input) {
            writer.println(line)
        }
    }
}

fun main() = runBlocking {
    val reader = fileReader("input.txt")
    val processor = fileProcessor(reader)
    fileWriter("output.txt", processor).join()
    println("完了しました!")
}

構造図:

この設計は「Producer → Transformer → Consumer」パターン。
巨大ファイルも逐次処理でき、メモリ効率・スループットともに高い


6. Flow を使ったストリームI/O

Kotlin Flow を使えば、Channelよりも高レベルなAPIで同様のことが可能です。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.io.File

fun fileFlow(path: String): Flow<String> = flow {
    File(path).forEachLine { emit(it) }
}

suspend fun writeProcessedFlow(output: String, flow: Flow<String>) {
    File(output).printWriter().use { writer ->
        flow.map { "Line: ${it.lowercase()}" }
            .collect { writer.println(it) }
    }
}

fun main() = runBlocking {
    fileFlow("input.txt")
        .let { writeProcessedFlow("out.txt", it) }
    println("Flow処理完了")
}

flow

  • Backpressure制御対応
  • suspend対応
  • map, filter, collect など高階関数利用可

7. 並列 + 非同期で大量ログ処理

import kotlinx.coroutines.*
import java.io.File

suspend fun processLogs(files: List<File>) = coroutineScope {
    files.map { file ->
        async(Dispatchers.IO) {
            file.useLines { lines ->
                lines.count { it.contains("ERROR") }
            }
        }
    }.awaitAll().sum()
}

fun main() = runBlocking {
    val files = (1..10).map { File("log$it.txt") }
    val errorCount = processLogs(files)
    println("合計エラー数: $errorCount")
}

各ファイルを独立スレッドで解析し、結果を合計。
CPUとI/Oをバランスよく活用できる


8. よく使うパターンまとめ

用途 構文例 解説
単一I/Oを安全に実行 withContext(Dispatchers.IO) I/Oスレッドに切替
同時並列処理 async { ... } + awaitAll() 非同期で複数タスク
ストリーム読み込み useLines {} 逐次処理で省メモリ
Channel パイプライン produce, send, receive 非同期データ連携
Flow ストリーム flow { emit() } 宣言的リアクティブI/O

まとめ

観点 同期I/O Coroutine I/O
処理方式 Blocking Non-blocking
スレッド管理 手動 自動 (Dispatchers.IO)
並列性 弱い 強い (async, Flow)
適用例 小規模処理 大規模ログ、バッチ処理、ETL 等

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?