167
175

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

各言語の非同期処理の仕組みまとめ

Last updated at Posted at 2024-03-19

はじめに

非同期処理はアプリケーション開発においてほぼ必須となっていますが、プログラミング言語やライブラリによってその実現方法は大きく異なります。
この記事では、以下の言語における非同期処理の実現方法を調査し、分類した結果をまとめたものです。

  • JavaScript/TypeScript(Promise)
  • Kotlin(Coroutines)
  • Java(Reactor Core/Completable Future)
  • Python(asyncio)
  • Golang(goroutine/conc)
  • Haskell(async)
  • Rust(tokio)

非同期処理の分類

構文

async/await

採用している言語: JavaScript/TypeScript/Python/Kotlin/Haskell/Rust

もっとも代表的な非同期処理の構文です。
asyncawaitという対になる二つのキーワードを使って非同期処理を表現します。

  • async e:eを非同期で実行するという意味(実際には式や文の構文ではなく、宣言時のアノテーションとして使われることが多い)
  • await e: 非同期処理eの結果を得るまで待機し、結果を得たら実行を再開する

コード例:

async function hello(): Promise<string> {
  ...
}

async function world(): Promise<string> {
  ...
}

async function helloworld(): Promise<string> {
  const task1 = hello();
  const task2 = world();
  try {
    return await task1 + ", " + await task2
  } catch(e) {
    console.log("error", e);
    throw e;
  }
}

利点

  • 構文上どこが非同期処理なのか明確
  • 既存の構文と直交しており、例外処理(リソース解放)やループがシンプルに書ける

欠点

  • 一部の関数をasync/awaitスタイルに変更するとそれを呼び出しているコード全体に修正を加えなくてはならない(asyncの伝播)
  • 直列的な処理しか書けないので、並列的な処理を書くプリミティブが別途必要
    • select/race: どれかの処理が終了するまで待つ
    • all: すべての処理が終了するまで待つが、例外が発生したら即時に戻る

callback chain

採用している言語: JavaScript/TypeScript/Java

非同期処理の対象にcallback関数を設定することで非同期処理を表現する方法です。

  • .then(callback): 正常終了時のコールバック
  • .catch(callback): 例外発生時のコールバック
  • .finally(callback): 終了時に必ず実行されるコールバック

コード例:

// typescript
async function helloWorld(): Promise<string> {
    return Promise.all([hello(), world()]).then([result, result2] => 
        Promise.resolve(result + result2)
    ).catch(e => {
        console.log(e);
        return Promise.reject(e);
    })
}
// java
public Mono<String> helloWorld() {
  return Mono.zip(hello(), world()).flatMap( tuple -> 
    Mono.just(tuple.getT1() + tuple.getT2())
  ).doOnError( e -> {
    System.out.println(e);
  });
}

利点

  • 構文を追加しないので第一級関数がサポートされている言語なら何でも採用できる
  • GUIアプリなどで、ボタンにコールバックを登録するなど自在な処理が書ける

欠点

  • 例外処理の漏れやコールバックの解決忘れなどのバグを埋め込みやすい
  • コールバックのネストが深くなると非常に読みにくい(callback hell)
  • ループが簡単に書けない

fork/join

採用している言語: Java/Kotlin/Golang/Haskell/Rust

これはどちらかというと非同期処理というよりはマルチスレッド処理というべきですが、スレッドを使って非同期処理を書いていくスタイルです。

  • fork action: actionを実行するスレッドを起動し、起動したスレッドIDを返す
  • join threadId: threadIdを持つスレッドが終了するまで待機する

コード例:

fun main() = runBlocking {
  val ch = Channel<String>()
  val job = launch {
      ch.send("hello")
  }
  val job2 = launch {
  	  ch.send("world")
  }
  val m1 = ch.receive()
  val m2 = ch.receive()
  
  job.join()
  job2.join()
  println(m1 + " " + m2)
}

利点

  • 通常の関数をそのまま非同期関数にすることができる
  • 専用の構文がなくても実現可能

欠点

  • スレッド単独では値やエラーを返せないので別途スレッド間通信などの仕組みが必要
  • デッドロックやデータレースなどマルチスレッド特有のバグを埋め込みやすい

非同期処理のプリミティブ

Promise/Future

採用している言語: JavaScript/TypeScript/Python/Java

Promise(あるいはFuture)は非同期計算の計算結果を保持するオブジェクトです。
正式な定義はありませんが、単にFutureといったときは、.get()のようなメソッドで計算結果(計算途中、値または例外)が取得できるインターフェースを指し、Promiseといったときには、Futureの中でも、「.get()の結果が値かエラーに確定したらもうそれ以降は変更できない」という性質をもつものを指すようです。

狭義のPromiseは計算結果を保持するだけなので、どのように計算を行うかは非同期処理のライブラリによって変わりますが、多くのライブラリ(JavaScript/TypeScript/Python/Java(CompletableFuture))ではPromiseにcallbackを設定することで、計算を進める仕組みをとっています。

Suspending function (coroutine)

採用している言語: Python/Kotlin/Rust

通常の関数は呼び出してから結果が返ってくるまで呼び出し元に制御が戻らないものですが、Suspending functionは実行の途中で呼び出し元に制御を返したり、中断した状態から再開することができる関数のことを指します。
このsuspending functionを用いるとブロックする処理を実行する際に、実行を中断して別の処理をすすめることができます。スケジューラと組み合わせることで非同期処理を実現することができます。

Lightweight (green) threads

採用している言語: Golang/Haskell

JavaなどのマルチスレッドプログラミングはOSが実行スケジュールを制御するため、スレッド数があまりスケールしません。
OSスレッドの代わりに言語ランタイムが実行スケジュールを制御する方式をlightweight threadとかgreen threadといいます。
lightweightスレッドは、OSスレッドに比べるとメモリサイズやコンテキストスイッチのコストが小さいので、非常にたくさんのスレッドを実行することで、大規模な非同期計算を実現できます。

Publisher/Subscriber

採用している言語: Java(Reactor core)

これは厳密にいうと非同期計算というよりストリーム処理なのですが、データの生成を担うPublisherとデータの消費を担うSubscriberを組み合わせる仕組みです。(実際には中間にPublisherかつSubscriberの役割を持つProcessorもあります)

これだけだと直列の処理しか書けないですが、ストリームの要素ごとで並行に処理を行う仕組みと組み合わせることで非同期処理を表現できます。

非同期処理のランタイムによる分類

シングルスレッド方式(イベントループ)

採用している言語: JavaScript/TypeScript

シングルスレッドで全ての非同期処理をスケジュールする方式です。
イベントループと呼ばれるオブジェクトが非同期処理の制御を行います。

シングルスレッドの利点は同期処理を(ほとんど)考えなくても良いところですが、
CPUバウンドな処理を行うとスレッドを占有して性能が出ないという欠点があります。

マルチスレッド方式

採用している言語: Golang/Haskell

こちらはマルチスレッドを用いて非同期処理をスケジュールします。マルチスレッドなのでスケールしやすいですが、スレッド間の同期処理や通信処理に気をつけないといけません。非同期処理毎にスレッドを作成するため、この方式を採用する場合はその言語がLight weight threadsをサポートしていなければなりません。

ハイブリッド方式

採用している言語: Python/Java/Kotlin/Java/Rust

基本的にはメインスレッドで非同期処理をスケジュールするのですが、プログラマー側で必要に応じてマルチスレッドを使用するなどスケジューラーを変更できる方式です。同期処理が必要な個所のみ(スレッドプールを用いた)マルチスレッドにすることで二つの方式のバランスをとることができます。

キャンセル方法

非同期処理は通常長い時間がかかるので、ユーザのリクエストに応じてキャンセルできる必要があります。

例外によるキャンセル

採用している言語: Python/Kotlin/Java/Haskell

実行しているスレッドやSuspending functionで例外を発生させることで非同期処理をキャンセルする方式です。
この方式ではキャンセルが発生すると通常の例外と同じように処理されるため、
ライブラリの使用者は特別な処理を書かなくても安全にキャンセルできるのが利点です。
ただし、雑な例外処理を書いていると、キャンセルの例外を途中でcatchして例外の伝播を中断させてしまうことがあります。

シグナルによるキャンセル

採用している言語: JavaScript/TypeScript/Java/Golang/Rust

非同期処理のプリミティブにキャンセル状態を通知するシグナルを明示的に渡すことでキャンセルできるようにします。
キャンセルが発生した場合は、シグナルから中断イベントが通知され、非同期処理が中断されます。

この方式ではキャンセルを制御するのはライブラリの使用者の責任です。
柔軟にキャンセル処理を記述できる一方で、シグナルを渡し忘れることでキャンセルできないバグを埋め込みやすくなります。

Structured Concurrency

採用している言語:Kotlin/Python/Java/Golang/Haskell

並行プログラミングでは非同期処理のリークがしばしば問題になります。これは、起動した非同期処理のawait/join/cancelの呼び出し漏れがあり、非同期処理やその中で取得したリソースが停止できなくなってしまうバグです。例えばJavaScriptのPromiseでは起動したPromiseをawaitしなければ、呼び出し元が終了してもタスクが残り続けます。

例えば、以下の例では、typoによりpromise2のawait漏れがあるため、subTask2がリークしています。

async function subTask1() {
    return new Promise((resolve, reject) => {
      setTimeout(() => {
        resolve(42);
      }, 1000);
    })
}
async function subTask2() {
    return new Promise((resolve, reject) => {
      setTimeout(() => {
        resolve(57);
      }, 5000);
    })
}

async function main() {
  const promise1 = subTask1();
  const promise2 = subTask2();
  const ret1 = await promise1;
  const ret2 = await promise1; //typo
  console.log(ret1 + ret2);
}

このようなバグを構文的に防ごうというのがstructured concurrencyという言語機能です。標準化された定義があるわけではないですが、おおむね以下の性質を持つものを指します。

  • 非同期処理の呼び出し元が親となる(あるいは子タスクのライフサイクルを管理するオブジェクトが存在する)
  • 子タスクがすべて終了するまで親は終了しない
  • 子のキャッチされない例外(キャンセル以外)を親に伝播する
  • 親がキャンセルされた場合は、子のタスクをすべてキャンセルする

Kotlinではスコープを用いてstructured concurrencyを実現しています。

import kotlinx.coroutines.*

suspend fun mainAsync() = coroutineScope {
    launch {
        delay(1000L)
        println("world!")
    }
    println("hello")
}

fun main() = runBlocking {
    var job = launch {
        mainAsync()
    }
    job.join()
    println("Main end")
}

この例では、mainAsync内でlaunchを使って起動している非同期処理はリークしていそうに見えます。しかし実際に実行してみると、以下のようにmainAsyncの実行が終了するのはworld!を出力した後になります。

hello
world!
Main end

Kotlinではこれを実現するためにcoroutineScopeを使って子タスクのライフサイクルを管理しています。coroutineScopeのブロック内で起動したタスクは自動的にCoroutineContextに登録され、ブロック終了時にまとめてjoinされるようになっています。また、子タスク内で発生した例外は親コンテキストに伝播されます。

Structured Concurrencyは近年様々な言語でサポートされつつありますが、言語により様々な名前で呼ばれているようです。

まとめ

観点 JS/TS Python Java(reactor core) Java(CompletableFuture) Kotlin Golang Haskell(async) Rust
async/await
callback chain
fork/join
プリミティブ Promise Suspending function and Promise Publisher/Subscriber Promise Suspending function LW thread LW thread Suspending function and Future
ランタイム Single Thread 1 Hybrid Hybrid Hybrid Hybrid Multi Thread Multi Thread Hybrid
キャンセル シグナル 例外 シグナル 例外 例外 シグナル 例外 シグナル
Structured Concurrency TaskGroup Subscription StructuredTaskScope CoroutineContext ContextPool withAsync

言語ごとの比較

実際に各言語で非同期処理の書き方がどのように変わるのかをサンプルプログラムを書いてみて比較してみました。

サンプルプログラムの仕様

  • heavy_taskという重い処理を100回非同期で実行する
  • heavy_taskの同時実行数は10までとする
  • heavy_taskは時々失敗するので、3回までリトライする
  • heavy_taskが3回連続で失敗したら全体の実行を中断する
  • Ctrl-Cで安全にキャンセルする

JavaScript/TypeScript

import process from "process";

async function sleep(ms: number, signal: AbortSignal) {
    return new Promise((resolve, reject) => {
        if (signal.aborted) {
            reject(new Error("aborted"));
            return;
        }
        const timeout = setTimeout(() => {
            resolve(null);
            signal.removeEventListener("abort", onAbort);
        }, ms);
        const onAbort = () => {
            reject(new Error("aborted"));
            clearTimeout(timeout);
        };
        signal.addEventListener("abort", onAbort, { once: true });
    });
}

async function heavyTask(taskname: string, signal: AbortSignal) {
    console.log(`Starting ${taskname}`);
    const duration = Math.floor(Math.random() * 1000) + 1000;
    if (duration % 10 === 0) {
        throw new Error(`Failed ${taskname}`);
    }
    await sleep(duration, signal);
    console.log(`Finished ${taskname}`);
}

async function retryHeavyTask(taskname: string, signal: AbortSignal) {
    let error = undefined;
    for (let i = 0; i < 3; i++) {
        try {
            await heavyTask(taskname, signal);
            return;
        } catch (e) {
            console.error(`failed ${taskname} retry count=${i} error=${e}`);
            error = e;
            if (e instanceof Error && e.message === "aborted") {
                break;
            }
        }
    }
    throw error;
}

async function worker(tasks: string[], controller: AbortController) {
    try {
        while (tasks.length > 0) {
            const task = tasks.pop();
            if (task !== undefined) {
                await retryHeavyTask(task, controller.signal);
            }
        }
    } catch (e) {
        if (!controller.signal.aborted) {
            controller.abort();
        }
        throw e;
    }
}

async function asyncMain() {
    console.log("Main start");
    let abortController: AbortController = new AbortController();
    process.on("SIGINT", () => {
        abortController.abort();
    });

    const tasks: string[] = [];
    for (let i = 0; i < 100; i++) {
        tasks.push(`task${i + 1}`);
    }
    tasks.reverse();

    const workers: Promise<void>[] = [];
    for (let i = 0; i < 10; i++) {
        workers.push(worker(tasks, abortController));
    }

    await Promise.allSettled(workers);
    console.log("Main end");
}

asyncMain();

TypeScriptでは、非同期処理を記述するためにPromiseを使用して、async/awaitの記法やcallback chainを使用します。ここでは、ワーカーを10個作成し、それぞれがキューからタスクを一つずつ取り出して処理するようにしています。また、retry処理もtry-catchとループを組み合わせることで自然に実装できます。

JavaScriptにはなぜかPromise版のsleep処理が存在しないため、ここでは自作しています。callbackベースのコードをPromiseに変換する際にはやや複雑な書き方になります。また、Promiseは作成した時点で実行がスケジュールされるため、注意が必要です。

TypeScriptはシングルスレッド方式ですので、キューからタスクを取り出す際にはロックを取る必要がありません。これは便利で良いですね。ただし、キャンセル処理はAbortControllerを使用して行うため、やや面倒です。キャンセルしたい非同期処理にAbortSignalを渡し、キャンセルを制御したい処理にはAbortControllerを渡します。AbortControllerのabort()メソッドを呼ぶと、そのAbortSignalが渡っていた箇所がすべてキャンセルされます。

Python

import asyncio
import collections
import random


async def heavy_task(taskname):
    try:
        print(f"start {taskname}")
        if random.random() < 0.1:
            raise Exception("random error")
        await asyncio.sleep(1 + random.random())
        print(f"end {taskname}")
    except asyncio.CancelledError:
        print(f"cancelled {taskname}")
        raise


async def retry_heavy_task(taskname):
    error = None
    for i in range(3):
        try:
            await heavy_task(taskname)
            break
        except Exception as e:
            error = e
            print(f"failed {taskname} retry count= {i} error= {e}")
    else:
        if error:
            raise error


async def executor(queue: collections.deque):
    while queue:
        taskname = queue.popleft()
        await retry_heavy_task(taskname)


async def main():
    try:
        queue = collections.deque(f"task{i+1}" for i in range(100))
        async with asyncio.TaskGroup() as group:
            for j in range(10):
                group.create_task(executor(queue))
    except asyncio.CancelledError:
        print("main cancelled")
    except Exception as e:
        print(f"main error= {e}")


if __name__ == "__main__":
    asyncio.run(main())

Pythonではasync defasync withといった記法で非同期処理を記述します。
非同期処理はコルーチンとも呼ばれ、内部的にはFutureを返すGenerator関数として実装されています。
コルーチンはシングルスレッドで処理されるので、基本的には同期処理を考える必要はありません。(一部run_in_executorで実行したタスクはマルチスレッドで処理されるものもあります)

JavaScriptとの違いはキャンセルの方式です。Pythonではキャンセルが発生するとコルーチンにCancelledErrorが送出されます。例外ハンドラやwithを使ったリソース解放処理を書いていれば安全にキャンセルさせることができます。
また、3.11バージョンにて導入されたTaskGroupでグループ化したタスクは、そのうちのいずれかが例外(キャンセル以外)で異常終了した場合、ほかの兄弟タスクをキャンセルさせる仕様になっています。いわゆる構造化された並列性(Structured Concurrency)と呼ばれるものです。

Java (Reactor Core)

package async;

import java.io.IOException;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Signal;
import java.util.concurrent.atomic.AtomicBoolean;

import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class Sample {
    private static Logger logger = LoggerFactory.getLogger(Sample.class);

    public static void main(String[] args) throws IOException {
        logger.info("Start main");

        AtomicBoolean stop = new AtomicBoolean(false);
        Disposable d = new Sample().run(stop);
        // Ctrl-Cでキャンセルする処理
        Signal.handle(new Signal("INT"), signal -> {
            logger.info("Signal {}", signal);
            d.dispose();
        });
        while (!stop.get()) {
            synchronized (Sample.class) {
                try {
                    Sample.class.wait();
                } catch (InterruptedException e) {
                }
            }
        }
        logger.info("End main");
    }

    public Mono<Void> heavyTask(String taskname) {
        return Mono.just(taskname)
                .doOnNext(name -> logger.info("start {}", name))
                .flatMap(name -> {
                    long durationMillis = 1000 + (long) (Math.random() * 1000);
                    if (durationMillis % 5 == 0) {
                        return Mono.error(new RuntimeException("error"));
                    } else {
                        return Mono.delay(Duration.ofMillis(durationMillis), Schedulers.parallel()).then();
                    }
                })
                .doOnSuccess(delay -> logger.info("end {} {}", taskname, delay))
                .doOnError(e -> logger.error("error {}", taskname, e))
                .doOnCancel(() -> logger.info("cancelled {}", taskname));
    }

    public Disposable run(AtomicBoolean stop) {
        Scheduler scheduler = Schedulers.parallel();
        Thread mainThread = Thread.currentThread();
        return Flux.range(1, 100)
                .map(i -> "task" + i)
                .flatMap(taskname -> heavyTask(taskname).retry(3), 10)
                .subscribeOn(scheduler)
                .doOnError(e -> logger.error("aborted by error", e))
                .doOnComplete(() -> logger.info("all tasks completed"))
                .doFinally(signalType -> {
                    logger.info("finally {}", signalType);
                    stop.set(true);
                    synchronized (Sample.class) {
                        Sample.class.notifyAll();
                    }
                })
                .subscribe();

    }

}

Javaでは非同期処理のライブラリがいくつかあるのですが、今回はReactor Coreを用いて書いてみました。
Reactor coreではMono<T>Flux<T>というデータ型を用いてメソッドチェーンによって非同期処理を記述します。

  • Mono<T>: 0個または1個のT型の値を返す非同期処理を表すデータ型
  • Flux<T>: N個のT型のストリームを表すデータ型

これらは.map(f).flatMap(f)で直列的な処理を書いたり、doOnErrorなどで例外時のフックを設定することができます。メソッドチェーンで使えるメソッドの数は非常に多い()ので、大体の非同期処理パターンをメソッドチェーンのみで書くことができます。
今回だと「10個のワーカーに分割してheavyTaskを呼び出す、ただし、3回までリトライ処理をする。」というのが以下の一行で表現できます。

.flatMap(taskname -> heavyTask(taskname).retry(3), 10)

また、ReactorではsubscribeOn(scheduler)publishOn(scheduler)を使って実行スケジューラも柔軟に変えることができます。

ただ、メソッドの数が多いので習得コストが高いのと、Publisher/Subscriber実行モデルが複雑なのでデバッグが難しいというのがデメリットです。

Kotlin

package async
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import sun.misc.Signal

// define Exception
class MyException : Exception()

suspend fun heavyTask(taskname: String) {
    println("Starting $taskname")
    try {
        val durationMillis = 1000L + (1000 * Math.random()).toLong()
        if (durationMillis % 3 == 0L) {
            throw MyException()
        }
        delay(durationMillis)
    } finally {
        println("Ending $taskname")
    }
}

suspend fun retryHeavyTask(taskname: String) {
    // capture error to a variable
    var error: Throwable? = null

    // retry 3 times
    for (i in 0..3) {
        try {
            heavyTask(taskname)
            return
        } catch (e: MyException) {
            println("During: $taskname Caught exception: $e retry count: $i")
            error = e
        }
    }
    if (error != null) {
        throw error
    }
}

suspend fun executor(channel: Channel<String>) {
    while (true) {
        try {
            val task = channel.receive()
            retryHeavyTask(task)
        } catch (e: ClosedReceiveChannelException) {
            break
        }
    }
}

suspend fun main() = supervisorScope {
    println("Start main")
    val deferred = async {
        val channel = Channel<String>()
        repeat(10) {
            launch {
                executor(channel)
            }
        }

        try {
            for (task in 1..100) {
                println("Sending task$task ")
                channel.send("task$task")
            }
        } finally {
            channel.close()
        }
    }
    Signal.handle(Signal("INT")) {
        deferred.cancel()
    }
    try {
        deferred.await()
    } catch (e: CancellationException) {
        println("Cancelled")
    } catch (e: MyException) {
        println("Caught exception: $e")
    }
    println("End main")
}

Kotlinではsuspend funで定義したsuspending functionを使って非同期処理を記述します。
suspending functionのなかで非同期処理を呼び出すには launchasyncを用います。通常のfunctionからsuspending functionを呼び出す際はrunBlockingなどを用います。

実行モデルはおおむねPythonの仕組みと同じですが、
Pythonは原則シングルスレッドで一部マルチスレッドなのに対し、Kotlinはマルチスレッドよりなため、同期処理が必要な場面ではChannelやmutexを使う必要があります。今回はChannelを使って各Executorがタスクキューから要素を取得する処理を記述しています。

またKotlinはブロックを用いた構造化された並列性をサポートしており、Pythonのようにいちいちタスクグループを作成しなくても、ブロックスコープ単位で起動した非同期処理を構造化してくれるので、非同期ジョブがリークする心配はありません。

Golang

package main

import (
	"context"
	"fmt"
	"math/rand"
	"os"
	"os/signal"
	"time"

	"github.com/sourcegraph/conc/pool"
)

type MyError struct {
	Message string
}

func (e *MyError) Error() string {
	return fmt.Sprintf("MyError: %s", e.Message)
}

// return error
func heavyTask(ctx context.Context, taskname string) error {
	fmt.Printf("Task %s started\n", taskname)

	// generate random number range (0,1000)
	num := rand.Intn(1000)
	if num%5 == 0 {
		return &MyError{Message: "Random error"}
	}
	duration := time.Duration(1000+num) * time.Millisecond
	select {
	case <-ctx.Done():
		fmt.Printf("Task %s Cancelled\n", taskname)
		return ctx.Err()
	case <-time.After(duration):
		fmt.Printf("Task %s completed\n", taskname)
		return nil
	}
}

func retryHeavyTask(ctx context.Context, taskname string) error {
	var err error
	for i := 0; i < 3; i++ {
		err = heavyTask(ctx, taskname)
		if err == nil {
			return nil
		} else {
			switch e := err.(type) {
			case *MyError:
				fmt.Printf("Task %s failed: %s\n", taskname, e.Message)
				continue
			default:
				return err
			}
		}
	}
	return err
}

func main() {
	fmt.Println("Main started")
	var ctx, cancel = context.WithCancel(context.Background())
	pool := pool.New().WithMaxGoroutines(10)
	// Channel to receive SIGINT signal
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, os.Interrupt)

	go func() {
		<-quit
		fmt.Println("Received interrupt signal")
		cancel()
	}()

	for i := 0; i < 100; i++ {
		fmt.Printf("Scheduling Task-%d\n", i)
		pool.Go(func() {
			err := retryHeavyTask(ctx, fmt.Sprintf("Task-%d", i))
			if err != nil {
				cancel()
			}
		})
		if ctx.Err() != nil {
			break
		}
	}

	pool.Wait()
	fmt.Println("Main completed")
}

Go言語ではおなじみGo routineとチャンネルによるスレッド間通信を用いて非同期処理を記述していきます。
Go routineは実行スレッドをランタイムが制御するマルチスレッド方式を用いています。

それだけだとスレッドプールを表現するのが難しいので、
今回はconcというライブラリを用いて10並列での処理を記述しています。

pool := pool.New().WithMaxGoroutines(10)

Go言語ではContextを用いて非同期処理のキャンセルを処理します。Contextのcancel関数を呼ぶと、Context.Done()チャンネルにキャンセルのシグナルが送出されるので、非同期待ちをしている箇所でシグナルを受信してキャンセル処理を行う、という仕組みです。

以下のコードではselectで囲まれたブロックは、以下の二つのシグナルのうち、先に受信したものいずれかに対してcase文の処理を実行します。

  • <-ctx.Done(): 非同期処理のキャンセルシグナルを受信したことを表す
  • <-time.After(duration): duration後に送られてくるシグナルを受信したことを表す
select {
case <-ctx.Done():
    fmt.Printf("Task %s Cancelled\n", taskname)
    return ctx.Err()
case <-time.After(duration):
    fmt.Printf("Task %s completed\n", taskname)
    return nil
}

Haskell

{-# LANGUAGE GHC2021 #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}

module Main where

import Control.Concurrent (Chan, newChan, readChan, threadDelay, writeList2Chan)
import Control.Concurrent.Async (AsyncCancelled, replicateConcurrently_)
import Control.Exception (BlockedIndefinitelyOnMVar (BlockedIndefinitelyOnMVar), Exception, catch, throwIO)
import Control.Monad (forever, when)
import Data.Data (Typeable)
import System.Random (randomRIO)

newtype MyException = MyException String
  deriving (Show, Typeable)
  deriving anyclass (Exception)

heavyTask :: String -> IO ()
heavyTask taskname = task `catch` handler
  where
    task = do
      putStrLn $ "Starting task: " ++ taskname
      r <- randomRIO (0, 1000000) :: IO Int
      when (r `mod` 5 == 0) $
        throwIO $
          MyException "random error"
      threadDelay $ 1000000 + r
      putStrLn $ "Ending task: " ++ taskname
    handler :: AsyncCancelled -> IO ()
    handler e = do
      putStrLn $ "Cancelled: " ++ taskname
      throwIO e

retryHeavyTask :: Int -> String -> IO ()
retryHeavyTask 1 taskname = heavyTask taskname
retryHeavyTask n taskname = heavyTask taskname `catch` handler
  where
    handler (e :: MyException) = do
      putStrLn $ "Failed " ++ taskname ++ ", Caught exception: " ++ show e ++ ", retrying"
      retryHeavyTask (n - 1) taskname

worker :: Chan String -> IO ()
worker ch = forever step `catch` handler
  where
    step = do
      task <- readChan ch
      retryHeavyTask 3 task
    handler BlockedIndefinitelyOnMVar =
      putStrLn "Channel is closed, exiting"

main :: IO ()
main = do
  ch <- newChan
  writeList2Chan ch $ map (\i -> "task" ++ show i) [1 .. 100 :: Int]
  replicateConcurrently_ 10 $ worker ch

HaskellではGo言語と同じようにforkIOでlight weight スレッドを起動できます。
今回は標準ライブラリだとfork/joinスタイルの書き方しかできないのですが、asyncライブラリを用いるとasync/awaitスタイルでも書けます。
(今回の例ではreplicateConcurrently_のみで十分なのでプログラム中には出現していませんが)

その他

すみません、力尽きましたのでChatGPT(GPT-4)で書いたコードを載せます。一部の仕様を満たしていないかもしれませんが、雰囲気はわかると思いますので、ご了承ください。

Java (Completable Future)

import java.util.concurrent.*;
import java.util.stream.*;

public class AsyncTasks {

    private static final int MAX_RETRIES = 3;
    private static final int TOTAL_TASKS = 100;
    private static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        CompletableFuture<?>[] futures = IntStream.range(0, TOTAL_TASKS)
                .mapToObj(i -> CompletableFuture.supplyAsync(() -> runTaskWithRetry("Task " + i, 0), executor)
                        .exceptionally(ex -> {
                            System.out.println("Failed to complete task after retries: " + ex.getMessage());
                            executor.shutdownNow(); // 中断する
                            return null;
                        }))
                .toArray(CompletableFuture[]::new);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            executor.shutdownNow(); // 安全にキャンセル
            System.out.println("Shutdown hook triggered.");
        }));

        CompletableFuture.allOf(futures).join(); // 全てのタスクが完了するまで待機
        executor.shutdown();
    }

    private static boolean runTaskWithRetry(String taskName, int retryCount) {
        try {
            System.out.println("start " + taskName);
            if (ThreadLocalRandom.current().nextDouble() < 0.1) {
                throw new RuntimeException("random error");
            }
            CompletableFuture<Void> delay = CompletableFuture.runAsync(() -> {}, CompletableFuture.delayedExecutor(1 + ThreadLocalRandom.current().nextInt(1000), TimeUnit.MILLISECONDS));
            delay.join(); // 非同期の遅延
            System.out.println("end " + taskName);
            return true;
        } catch (Exception e) {
            System.out.println(taskName + " failed: " + e.getMessage());
            if (retryCount < MAX_RETRIES) {
                System.out.println(taskName + " retrying...");
                return runTaskWithRetry(taskName, retryCount + 1);
            } else {
                throw new RuntimeException("Max retries reached for " + taskName);
            }
        }
    }
}

Rust

use tokio::signal;
use tokio::sync::{Semaphore, broadcast};
use tokio::time::{sleep, Duration};
use rand::Rng;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(10)); // 同時に実行されるタスクの最大数を制限
    let (tx, _rx) = broadcast::channel(1); // キャンセルシグナルの送信用

    let handler = tokio::spawn(async move {
        signal::ctrl_c().await.expect("failed to listen for event");
        println!("Ctrl-C received, cancelling tasks...");
    });

    let tasks = 100;
    let futures: Vec<_> = (0..tasks).map(|i| {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        let tx_clone = tx.clone();
        tokio::spawn(async move {
            let task_name = format!("Task {}", i);
            tokio::select! {
                _ = heavy_task(&task_name) => {
                    if let Err(_) = heavy_task(&task_name).await {
                        println!("{} failed after maximum retries.", task_name);
                    }
                },
                _ = tx_clone.subscribe().recv() => {
                    println!("{} was cancelled.", task_name);
                },
            }
            drop(permit);
        })
    }).collect();

    futures::future::join_all(futures).await;
    handler.await.unwrap();
}

async fn heavy_task(task_name: &str) -> Result<(), ()> {
    for _ in 0..3 {
        if attempt_task(task_name).await.is_ok() {
            return Ok(());
        }
    }
    Err(())
}

async fn attempt_task(task_name: &str) -> Result<(), ()> {
    println!("start {}", task_name);
    if rand::thread_rng().gen_bool(0.1) { // 10%の確率でエラー
        println!("{} encountered a random error", task_name);
        return Err(());
    }
    sleep(Duration::from_secs_f64(rand::thread_rng().gen_range(1.0..2.0))).await;
    println!("end {}", task_name);
    Ok(())
}

まとめ

この記事では、複数のプログラミング言語における非同期処理の実装方法とその特徴について概観しました。各言語ごとの非同期処理のアプローチを比較することで、言語設計における異なる哲学や、特定の問題領域において一部のアプローチがどのように適しているかを理解することができます。

  • JavaScript/TypeScriptはPromiseとasync/awaitを用いた非同期処理が一般的で、イベントループに基づくシングルスレッドのランタイムが特徴です
  • Pythonではasyncioライブラリを用いたasync/await構文が非同期I/O処理に広く使われています
  • JavaではCompletableFutureやReactor Coreを用いたリアクティブプログラミングが非同期処理の主流です
  • Kotlinではコルーチンを通じた非同期処理がシンプルかつ強力な制御構造を提供します
  • Go言語ではGoroutineとチャンネルを用いたシンプルな非同期処理が可能です
  • HaskellではasyncパッケージとLight weight threadsを用いて高度な非同期処理が実現されます
  • RustではFutureトレイトとtokioランタイムを使った非同期処理が、パフォーマンスと安全性を重視する場面で利用されます

特に、非同期処理の書きやすさという観点ではPythonKotlinがボイラープレートや学習コストが小さく、おすすめだと筆者は考えています。一方で大規模な非同期処理を高効率で処理したい場合は、Go言語HaskellRustなどが選択肢に入ってくるでしょう。

非同期処理の実現方法は、言語の特性やランタイムの設計に深く依存しています。そのため、各言語の非同期処理モデルを理解し、その上で適切なパターンやライブラリを選択することが、効率的かつ安全な非同期プログラミングを行う上での鍵となります。

また、キャンセル処理や例外処理の取り扱い、さらにはstructured concurrencyのような新しいプログラミングパラダイムの導入など、非同期処理に関わる概念は常に進化しています。これらの進化を追いかけることで、より堅牢で読みやすく、メンテナンスしやすいコードを書くことが可能になります。

非同期処理は多くのプログラミング言語で必要不可欠な機能です。各言語が提供する非同期処理のメカニズムを理解し、プロジェクトの要件に合わせて最適なものを選択することが重要です。

  1. worker threadsのことは無視するものとします。

167
175
5

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
167
175

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?