Help us understand the problem. What is going on with this article?

Kotlin コルーチンでネットワーク I/O 処理実験

Kotlin のコルーチンを勉強中。

Kotlin のコルーチンは、Python のコルーチンのようにシングルスレッドで実行されるわけではなく、スレッドプールを用いた M:N スレッドモデルが採用されている。

Java を使うことは近年ほとんどなかったので余り知識がないところではあるが、Java のネットワーク I/O 周りの処理は基本的にブロッキングするものだと捉えている。著名な HTTP ライブラリもそのはず。
そのようなブロックするネットワーク処理を同時に多数実行しようとした場合、コルーチンを使って非同期に扱ったとしても、バックグラウンドのスレッドがブロックしてしまうので、結局は多くのスレッドを生成しなくてはいけないのではないか? スレッドは高価なのでリソースが枯渇してしまうのではないか? そのような疑問が生じてきた。

Go の Goroutine も M:N スレッドモデルだが、ブロックするようなシステムコールが実行された際にそれを検知し、Goroutine から実行スレッドを切り離してその Goroutine は待ち行列的なキューに戻る仕組みだとどこかで読んだことがある。

ライブラリとして実装されている Kotlin コルーチンにはそこまでの仕組みはないだろうけど、どのような挙動になるのか実際に多数の HTTP リクエストを同時に送信するサンプルコードを書いて確認してみることにした。

実験に使った HTTP サーバ実装

Python の aiohttp を使ってローカル環境に HTTP サーバを作成し、このサーバに対してリクエストを送信する。PyPy 3.5 で実行。

  • 時間が掛かるリモート処理をエミュレートするため、GET リクエストを送ると 3 秒待ってレスポンスを返すようにした。
  • num クエリパラメーターに指定された値を Hello {num} のようにレスポンステキストに含める。
import argparse
import asyncio

from aiohttp import web


async def handle(request):
    name = request.query.get('num', 'World')
    text = "Hello " + name
    await asyncio.sleep(3)
    return web.Response(text=text)


app = web.Application()


def main(port: int):
    app.add_routes([
        web.get('/', handle),
    ])
    web.run_app(app, port=port)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Run HTTP server.')
    parser.add_argument('-p', '--port',
                        metavar='port',
                        dest='port', action='store', type=int, default=7000,
                        help='Port')
    args = parser.parse_args()
    port = args.port
    main(port)

実験に使ったクライアント実装

Kotlin、Python、Go でコルーチン(Go の場合は Goroutine)を使って、上記の HTTP サーバに対して同時に 200 個のリクエストを送信する HTTP クライアントとなるサンプルコードを書いた。

HTTP サーバ、HTTP クライアント共に同一の MacBook Pro (Core i7 / 論理コア 4 / メモリ 16GB) 上で実行した。

Kotlin (ブロッキング I/O) の場合

CoroutineDispatcher に Dispatchers.IO を使用して、ブロッキングする HTTP リクエスト送信処理を実行した。

fun main() = runBlocking {
    println("Start of main() [${Thread.currentThread().name}]\tThread count: ${Thread.activeCount()}")
    val time = measureTimeMillis {
        val jobs = mutableListOf<Job>()
        val count = 200
        repeat(count) { n ->
            jobs += launch(Dispatchers.IO) {
                callSlowRemoteServiceWithBlocking(n + 1)
            }
        }
        jobs.forEach {
            it.join()
        }
    }
    println("End of main() [${Thread.currentThread().name}]\tTime: $time ms\tThread count: ${Thread.activeCount()}")
}

fun callSlowRemoteServiceWithBlocking(n: Int) {
    URL("http://127.0.0.1:7000/?num=$n").openStream().use {
        val text = it.bufferedReader().readLine()
        println("$text [${Thread.currentThread().name}]")
    }
}

実行結果

Start of main() [main]  Thread count: 3
Hello 1 [DefaultDispatcher-worker-1]
Hello 13 [DefaultDispatcher-worker-13]
Hello 11 [DefaultDispatcher-worker-11]
(略)
Hello 198 [DefaultDispatcher-worker-7]
Hello 200 [DefaultDispatcher-worker-6]
Hello 199 [DefaultDispatcher-worker-12]
End of main() [main]    Time: 12097 ms  Thread count: 71

Dispatchers.IO が使うスレッドプールのスレッド最大数がデフォルトでは 64 なので、200 リクエストのうち 64 コずつ処理されていた。試していないが、このスレッドプールの上限を増やせば一気に処理できそう。

大量に同時実行するコルーチンにブロックする処理を入れてはいけないのだと認識した。

Kotlin (ノンブロッキング I/O) の場合

Java 11 の HttpClient で非同期 API が使えると読んだので、先のコードを次のように書き換えて実行してみた。

fun main() = runBlocking {
    println("Start of main() [${Thread.currentThread().name}]\tThread count: ${Thread.activeCount()}")
    val time = measureTimeMillis {
        val jobs = mutableListOf<Job>()
        val count = 200
        repeat(count) { n ->
            jobs += launch {
                callSlowRemoteServiceWithNonBlocking(n + 1)
            }
        }
        jobs.forEach {
            it.join()
        }
    }
    println("End of main() [${Thread.currentThread().name}]\tTime: $time ms\tThread count: ${Thread.activeCount()}")
}

val client = HttpClient.newBuilder().build()!!

suspend fun callSlowRemoteServiceWithNonBlocking(n: Int) {
    val req = HttpRequest.newBuilder().uri(URI.create("http://127.0.0.1:7000/?num=$n")).build()
    val res = client.sendAsync(req, HttpResponse.BodyHandlers.ofString())
    val text = res.await().body()
    println("$text [${Thread.currentThread().name}]")
}

ここでは launch {...} で CoroutineDispacher を指定せず、メインスレッドの CoroutineContext のまま実行している。

実行結果

Start of main() [main]  Thread count: 3
Hello 41 [main]
Hello 42 [main]
Hello 132 [main]
(略)
Hello 103 [main]
Hello 129 [main]
Hello 73 [main]
End of main() [main]    Time: 5072 ms   Thread count: 215

HttpClient の使い方がよく分かってないので、もっと上手く書けるのかも知れないが、現状だとどこかでスレッドが沢山生成されている。

Kotlin (Ktor Http Client) の場合

やっぱりコルーチンネイティブな API を用いないとダメなんじゃないかと、Ktor の HTTP クライアントを使って試してみることにした。

fun main(args: Array<String>) = runBlocking {
    println("Start of main() [${Thread.currentThread().name}]\tThread count: ${Thread.activeCount()}")
    val time = measureTimeMillis {
        val jobs = mutableListOf<Job>()
        val count = 200
        repeat(count) { n ->
            jobs += launch {
                callSlowRemoteServiceWithKtor(n + 1)
            }
        }
        jobs.forEach {
            it.join()
        }
    }
    println("End of main() [${Thread.currentThread().name}]\tTime: $time ms\tThread count: ${Thread.activeCount()}")
}

val client = HttpClient(CIO)

suspend fun callSlowRemoteServiceWithKtor(i: Int) {
    val text: String = client.get("http://127.0.0.1:7000/?num=$i")
    println("$text [${Thread.currentThread().name}]")
}

Python や JavaScript の async/await に慣れていると、client.get() の戻り値に対して await() を呼びたい気持ちになるが、直接レスポンス内容が返ってくるようだ。

実行結果

Start of main() [main]  Thread count: 5
Hello 21 [main]
Hello 23 [main]
Hello 9 [main]
(略)
Hello 152 [main]
Hello 15 [main]
Hello 148 [main]
End of main() [main]    Time: 3655 ms   Thread count: 8

Ktor のセットアップのため、先の二つのものとは異なるプロジェクト構成になってしまっているので厳密な比較はできないが、メインスレッドだけで実行されており、生成されているスレッドも少なくて良い感じ。

  • 500 リクエストでも 4 秒前後で 8 スレッド。
  • 1000 リクエストでも実行できた場合は 5 秒前後の 8 スレッド。
    但し、 Exception in thread "main" java.net.SocketException: Connection reset by peer で落ちてしまうことの方が多かった。同一マシンだし、HTTP サーバ側の限界か。

Python (asyncio) の場合

Python のコルーチンを使ってシングルスレッドでノンブロッキングなネットワーク I/O 処理を実行。HTTP サーバと同じく aiohttp を使用して HTTP クライアントを実装した。PyPy 3.5 で実行。

import threading
import time
from asyncio import gather, get_event_loop

from aiohttp import ClientSession


async def main():
    print('Start of main() [{}]'.format(threading.current_thread().name))
    start = time.time()

    coros = []
    count = 200  # 同時リクエスト数
    for i in range(count):
        coro = call_slow_remote_service(i + 1)
        coros.append(coro)

    await gather(*coros)

    end = time.time()
    elapsed = int((end - start) * 1000)
    print('End of main() [{}]\tTime: {} ms'.format(threading.current_thread().name, elapsed))


async def call_slow_remote_service(num: int):
    async with ClientSession() as session:
        async with session.get('http://127.0.0.1:7000/', params={'num': str(num)}) as resp:
            msg = await resp.text()
            print('{} [{}]'.format(msg, threading.current_thread().name))


if __name__ == '__main__':
    loop = get_event_loop()
    loop.run_until_complete(main())

実行結果

Start of main() [MainThread]
Hello 1 [MainThread]
Hello 2 [MainThread]
Hello 3 [MainThread]
(略)
Hello 198 [MainThread]
Hello 199 [MainThread]
Hello 200 [MainThread]
End of main() [MainThread]  Time: 3735 ms
  • 200 リクエストの場合、平均して 3 秒台後半。
  • 500 リクエストで 5 秒弱。
  • 1000 リクエストで 6 秒前後。

Python (スレッド) の場合

コルーチンを使っていないが、参考までに ThreadPoolExecutor で生成したスレッドを使った並行処理の実装も。

import threading
import time
from concurrent.futures import ThreadPoolExecutor
from urllib.request import urlopen


def main():
    print('Start of main() [{}]'.format(threading.current_thread().name))
    start = time.time()

    count = 200
    with ThreadPoolExecutor() as executor:
        for i in range(count):
            executor.submit(call_slow_remote_service, i + 1)

    end = time.time()
    elapsed = int((end - start) * 1000)
    print('End of main() [{}]\tTime: {} ms'.format(threading.current_thread().name, elapsed))


def call_slow_remote_service(num: int):
    with urlopen('http://127.0.0.1:7000/?num=' + str(num)) as resp:
        text = resp.read().decode()
        print('{} [{}]'.format(text, threading.current_thread().name))


if __name__ == '__main__':
    main()

実行結果

Start of main() [MainThread]
Hello 3 [Thread-3]
Hello 4 [Thread-4]
Hello 7 [Thread-7]
(略)
Hello 191 [Thread-18]
Hello 196 [Thread-6]
Hello 197 [Thread-19]
End of main() [MainThread]  Time: 30453 ms

遅い。
ThreadPoolExecutor が使う最大スレッド数はデフォルトで CPU コア数 × 5 なので、今回は 20 スレッドが使用されているため。

Python では GIL があるため同時に実行されるスレッドは CPU コア数に関わらず 1 つだが、ネットワーク I/O でスレッドがブロックされている間は GIL が解放されるので並行処理は可能だ。

ThreadPoolExecutor(max_workers=200) と、最大スレッド数をリクエスト数に合わせて実行すると結果は次のように高速化された。

(略)
Hello 199 [Thread-199]
Hello 198 [Thread-198]
Hello 200 [Thread-200]
End of main() [MainThread]  Time: 3608 ms

Go (Goroutine) の場合

func main() {
    println("Start of main()")
    wg := sync.WaitGroup{}
    start := time.Now()

    count := 200
    for i := 0; i < count; i++ {
        wg.Add(1)
        go callSlowRemoteService(i, &wg)
    }
    wg.Wait()

    end := time.Now()
    elapsed := end.Sub(start) / time.Millisecond
    println("End of main()\tTime: " , elapsed, "ms")
}

func callSlowRemoteService(i int, wg *sync.WaitGroup) {
    values := url.Values{}
    values.Add("num", strconv.Itoa(i))
    resp, err := http.Get("http://127.0.0.1:7000/?" + values.Encode())
    if err != nil {
        fmt.Println(err)
        wg.Done()
        return
    }
    defer resp.Body.Close()
    body, _ := ioutil.ReadAll(resp.Body)
    println(string(body))
    wg.Done()
}

実行結果

Start of main()
Hello 0
Hello 1
Hello 125
(略)
Hello 168
Hello 169
Hello 170
End of main()   Time:  3195 ms

実行時間は速い。

  • 500 リクエストの場合で 3 秒台半ば。
  • 1000 リクエスト前後で、以下のようなエラーが出始める。HTTP サーバ側の限界か。

考察

実験の最後には当初の目的が何だったのか分からなくなってきていたが、Kotlin のコルーチンでもブロッキングするネットワーク I/O 処理は基本的には入れてはいけない。

ただ、同時に数百とかの数にならないケースであれば、Dispatchers.IO のスレッドプールや newSingleThreadContext() で専用に生成したスレッドを使って対処できそうだ。

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away