C++
coroutine
C++Day 1

C++ で async/await をする

はじめに

この記事は C++ Advent Calendar 2018 の 1 日目の記事です。

前回では C++ でのコルーチンについてやっていきましたが、今回はいよいよ本題の async/await です。参考情報等は前回と同じです。

実装例の完全版は GitHub に上げていますのでこちらを参照してください。

Visual Studio を使っている場合は C++/WinRT のコードもとても参考になります。こんな書き方もありなんだ、と発見が結構ありました。

async/await とは

async/await は基本的にはコルーチンの応用機能で

  • await が記述されたところでコルーチンが一旦中断される。その際、デザインパターンで言うところの Future オブジェクトを返すようにする。
  • await は Future オブジェクトが完了したら、その戻り値を持って await で中断した位置から再開する。

といった機能です。コルーチンは再開のトリガーは未定義ですが、その再開の仕方を定義しているのが特徴と言えます。

await してみる

実は await をしてみるだけだったら簡単だったりします。 async/await も基本はコルーチンなので、

  • Corotine traits の条件を満たした関数内で
  • await 可能な値に対して "co_await" をする。

ことで await ができます。ちなみに C# では await を使うメソッドに "async" という予約語をつけますが、 C++ の場合はそれに対応する予約語はありません。

標準で std::future が await 可能な型になっていたので次のような感じで書く事で await を試すことができます。
比較用に C# で同様の動作をするコードを記事の最後に書いておきましたので比べて見てください。

std::future<int> TestAsync()
{
    std::cout << "TestAsync tid:" << std::this_thread::get_id() << std::endl;
    auto n = co_await std::async(std::launch::async, []() -> int
    {
        std::cout << "TestAsync - async tid:" << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
        return 10;
    });

    std::cout << "TestAsync - await tid:" << std::this_thread::get_id() << " " << n << std::endl;

    return 20;
}

int main()
{
    std::cout << "main - before TestAsync tid:" << std::this_thread::get_id() << std::endl;

    auto f1 = TestAsync();
    std::cout << "main - after TestAsync tid:" << std::this_thread::get_id() << std::endl;

    std::cout << "main - result tid:" << std::this_thread::get_id() << " " << f1.get() << std::endl;

    return 0;
}

これを実行してみると次のようになっていました。

main - before TestAsync tid:20776
TestAsync tid:20776
TestAsync - async tid:19696
main - after TestAsync tid:20776
TestAsync - await tid:18412 10
main - result tid:20776 20

確かに std::async で書かれている非同期処理が別スレッドで実行された後に await で処理が復帰、継続しているようですが、ここでの注目のポイントがあります。

  • std::async の処理をした後、 await から復帰した処理は よくわからない新しいスレッドで実行されている。

.NET やっている人は「await 直後はメインスレッドに戻ってきてほしい」と考えるかもしれませんが、そんなのできるわけありません。 SynchronizationContext (に相当する) ものがないのですから。ちなみに .NET でもコンソールアプリでは (通常は) メインスレッドに戻ってくることはありません。

ということでメインスレッドに戻したいのであれば戻せるような Coroutine traits の実装をしなくてはいけなくなるわけです。

await 可能な型の実装

await を可能にする型には大きく二つの要素が必要になります。

  • コルーチン化するための Corotuine traits
  • await_ready, await_suspend, await_resume メソッドを持った型

後者はドラフトの中では awaiter という名前で実装していたので awaiter と呼びます。

co_await で受ける型 (式の戻り値型) は原則として awaiter である必要があります。

Corotine traits を実装する

基本的には 前回記事 と同じです。要点は promise_type の実装で

  • initial_suspend は suspend_never を返す。

とすることだけです。

initial_suspend は suspend_always を返すとコルーチンの初期状態が中断状態になるので指定したコルーチン関数が実行されません。

async/await を前提とする Coroutine traits では awaiter で中断、再開の制御をする事になるため co_yield 等が入ることは考えにくいです。そのため yield_value の実装は不要です。

同様に return_value の実装も必須ではなかったりします (co_await で指定した非同期処理の戻り値のやり取りは awaiter の実装で行います) が、 co_await を含んだコルーチン関数通しての戻り値として何か値を返したい場合は return_value を実装した方がよいでしょう。

また、前回の generator の実装では resume の制御を外部から行うために coroutine_handle の引き回しが必要でしたが、 co_await のために Coroutine traits を実装する場合は awaiter 側の方で coroutine_handle を適当な形でまわしてくれる (後述) ので、 promise_type の実装で coroutine_handle の引き回しをしなくても大丈夫です (というか awaiter の仕組みで resume を行うべきなので coroutine_handle を外に出すべきではない) 。

awaiter を実装する

awaiter となるクラスは 3 つのメソッドを実装する必要があります。

await_ready

await できるかどうか、というか非同期処理が実行済かどうかを返すメソッドです。 true を返すと実行済と判断され await_suspend はスキップします。

co_await で指定した非同期処理がタイミングによってはすでに実行済だった場合、 suspend をする必要もないのでそのような場合に true を返します。

std::future<int> _future;
bool await_ready() const
{
    return _future.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}

この場合は future が set_value 済かどうかで判断しています。

await_suspend

await_ready が false だった場合、コルーチンが中断し await_suspend が呼ばれます。 await_suspend ではコルーチンを再開する仕込みをする必要があります。

std::future<int> _future;
void await_suspend(std::experimental::coroutine_handle<> h)
{
    std::thread([this, h]()
    {
        _future.wait();
        h.resume();
    }).detach();
}

await_suspend の引数で coroutine_handle が渡されるので、渡された coroutine_handle を用いて非同期処理の完了待機からコルーチン再開 (resume) まで行うようにする必要があります。

ちなみに VC++ での std::future に対する awaiter の実装はまさに上記例のような感じで、 ワーカースレッドで future に値がセットされるまで待機し、値がセットされたら resume してコルーチンを再開するようにしています。 future を監視しているワーカースレッドでそのまま resume してしまうので、このワーカースレッドでコルーチンが再開することになります。これが先に書いた「よくわからないスレッドで await 後の処理が実行される」理由です。

await_resume

resume される際に await_resume が呼ばれます。 co_await の戻り値として await_resume の戻り値が使われます。

std::future<int> _future;
int await_resume()
{
    return _future.get();
}

任意の型を await できるようにする

co_await で受けられるのは前述のように awaiter を実装している型である必要がありますが、 awaiter を実装していない既存型を無改造で await したくなることもあるはずです。そのような場合、次のような方法で任意の型を実装した awaiter に変換するコードを記述することでスマートに await することが可能になります。

promise_type に "await_transform" メソッドを実装する

promise_type に await_transform というメソッドを実装しておくと任意の入力を awaiter に変換するコードを実装することができます。

auto await_transform(int n)
{
    struct Awaiter
    {
        Awaiter(int n)
        {
            //  略
        }

        //  略
    };

    return Awaiter(n);
}

co_await を operator オーバーロードする

co_await は演算子扱いで operator オーバーロードが可能です。 operator オーバーロードでも変換処理を実装することが可能なようです。

auto operator co_await(int n)
{
    //  上記と同じ
}

await 後にメインスレッドに戻す

await 後にメインスレッドに戻したい場合は非同期処理の完了後にメインスレッドにスイッチし、そこでコルーチンを再開すればよいわけです。例えば

  • Windows の UI アプリでは GetMessage/TranslateMessage API によるイベントループでメインスレッドのイベント待機をしているので、ここにユーザーイベントメッセージ (WM_USER) 等を通知して、そのイベントハンドラの中でコルーチンを再開する。
  • いわゆるゲームループの中で非同期処理の完了通知を受けたらその中でコルーチンを再開する。

などが考えられます。

ここでは前者のパターンを想定し、 .NET の SynchronizationContext の簡易版のようなものを作って対応をしてみました。

SynchronizationContext _sync;
std::future<int> _future;
void await_suspend(std::experimental::coroutine_handle<> h)
{
    std::thread([this, h]()
    {
        _future.wait();
        _sync.Post([h]()
        {
            h.resume();
        });
    }).detach();
}

Post メソッドは指定の関数をキューに積むメソッドです。

while (true)
{
    _sync.Main();
}

Main メソッドは Post の通知を受けるまで待機し、通知を受けると通知で渡された関数を実行します。これをメインスレッドで行う事でメインスレッドで処理の継続ができるようになります。上記は説明のために簡略化していますが、ループから抜けるための条件は別途設定する必要があるでしょう。

awaiter 実装例

await_suspend からの resume はそのままワーカースレッドで実行になっています。

template<typename ValueType>
struct Awaitable
{
    struct promise_type
    {
        std::promise<ValueType> _value;

        std::experimental::suspend_never initial_suspend() { return {}; }
        std::experimental::suspend_always final_suspend() { return {}; }

        auto get_return_object()
        {
            return Awaitable(_value.get_future(), HandleType::from_promise(*this));
        }

        void unhandled_exception()
        {
            std::terminate();
        }

        std::experimental::suspend_never return_value(ValueType value)
        {
            _value.set_value(value);
            return {};
        }

        template<typename FutureValueType>
        auto await_transform(std::future<FutureValueType> f)
        {
            struct Awaiter
            {
                std::future<FutureValueType> _future;

                bool await_ready() const
                {
                    return _future.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
                }

                void await_suspend(HandleType h)
                {
                    std::thread([this, h]()
                    {
                        _future.wait();
                        h.resume();
                    }).detach();
                }

                FutureValueType await_resume()
                {
                    return _future.get();
                }
            };

            return Awaiter{ std::move(f) };
        }
    };

private:
    using HandleType = std::experimental::coroutine_handle<promise_type>;
    std::future<ValueType> _future;
    HandleType _coroutineHandle;

public:
    explicit Awaitable(std::future<ValueType>&& future, HandleType h) :
        _future(std::move(future)),
        _coroutineHandle(h)
    {
    }

    Awaitable(const Awaitable&) = delete;
    Awaitable(Awaitable&& rhs) :
        _future(std::move(rhs._future)),
        _coroutineHandle(rhs._coroutineHandle)
    {
        rhs._coroutineHandle = nullptr;
    }

    ~Awaitable()
    {
        if (_coroutineHandle != nullptr)
        {
            _coroutineHandle.destroy();
        }
        _coroutineHandle = nullptr;
    }

    ValueType GetReturnValue()
    {
        return _future.get();
    }

    bool IsCompleted() const
    {
        return _future.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
    }
};

Awaitable<int> TestAsync()
{
    auto n = co_await std::async(std::launch::async, []() -> int
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        return 10;
    });

    return n;
}

std::experimental::coroutine_traits<R, Args...> で Coroutine traits を実装してみる

前回記事の b) パターンを実装してみます。

  • b) std::experimental::coroutine_traits<R, Args...> テンプレートクラスを定義し、そのインナークラスとして所定のメソッドを実装した promise_type を定義する

VC++ の std::future に対する実装はこれで行われています。ここでも std::future に対する実装で解説していきます。

namespace std
{
namespace experimnetal

template<typename RetType, typename Args...>
struct coroutine_traits<future<RetType>, Args...>
{
    struct promise_type
    {
        promise<RetType> _promise;

        suspend_never initial_suspend() { return {}; }
        suspend_always final_suspend() { return {}; }

        auto get_return_object()
        {
            return _promise.get_future();
        }

        void return_value(ValueType value)
        {
            _promise.set_value(value);
        }
    };
};

}
}

a) パターンと違ってコルーチン関数の戻り値型の実装が不要で promise_type のみの実装になっているのでシンプルになっています。

戻り値型は promise - future パターンの future 側で、 promise_type が promise 側の実装になるわけですが、 std::future, std::promise はまさにそのためのクラスなので、それをそのまま使っているため余計な実装がないところもシンプルな要因です。

この実装の要点は

  • get_return_object は戻り値型のものを返す必要がありますが、ここでは promise から get_future で取得した future をそのまま返す。
  • return_value でコルーチン関数を抜けた時の値を get_return_object で返したインスタンスに通知する必要があるわけですが、これは promise にセットすればよい。

一方、この b) パターンで generator を実装しようとした場合、何かしらの方法で coroutine_handle を引き回してコルーチン関数の戻り値型と関連付けしなくてはなりません。さらに coroutine_handle は promise_type を知っておかなくてはいけない・・・と定義順まで考えるとちょっと難しいかなという感じです。

そういったわけでこの std::experimental::coroutine_traits を実装するパターンは「こういったやり方もある」という事を認知しておけばとりあえずはよいかなあという感じです。

おわりに

C++ でのコルーチン、 async/await のやり方を見ていきました。

現状では experimental という事もあり、使おうとした場合は自前で Coroutine traits や awaiter の実装をしないといけない場面も多そうです。 Coroutine traits の実装はいろいろやり方がありそうな感じですが、前回と今回の記事のやり方でおおよそ目的は達せられそうです。

async/await に関しては await_suspend の実装でどのように await からの復帰を行うかが結構重要なポイントになると思います。今回の実装では実開発にはあまり参考にならないと思いますのでいろいろ考えてやってみてください。

参考: C# での async/await

最初の std::future での await 実装例とほぼ同じ動作をする C# コードを比較用として書いておきます。

public static async Task<int> TestAsync()
{
    Console.WriteLine("TestAsync tid:{0}", Thread.CurrentThread.ManagedThreadId);

    var n = await Task.Run(() =>
    {
        Console.WriteLine("TestAsync - async tid:{0}", Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(1000);
        return 10;
    });

    Console.WriteLine("TestAsync - await tid:{0} {1}", Thread.CurrentThread.ManagedThreadId, n);

    return 20;
}

static void Main()
{
    Console.WriteLine("main - before TestAsync tid:{0}", Thread.CurrentThread.ManagedThreadId);
    var f = TestAsync();
    Console.WriteLine("main - after TestAsync tid:{0}", Thread.CurrentThread.ManagedThreadId);
    Console.WriteLine("main - result TestAsync tid:{0} {1}", Thread.CurrentThread.ManagedThreadId, f.Result);
}

コンソールアプリとして実行すると次のようになります。 await からの復帰後も Task.Run で実行しているスレッドと同じスレッドで動いています。
(ContinueWith で継続しているため)

main - before TestAsync tid:1
TestAsync tid:1
main - after TestAsync tid:1
TestAsync - async tid:3
TestAsync - await tid:3 10
main - result TestAsync tid:1 20

Windows Form アプリでイベントハンドラーから TestAsync だけ実行した場合は次の通り。 await 後はメインスレッドで実行されています。 SynchronizationContext を通しているためです。

TestAsync tid:1
TestAsync - async tid:3
TestAsync - await tid:1 10