LoginSignup
16

More than 1 year has passed since last update.

IValueTaskSourceを使用した非同期処理(.NET)

Last updated at Posted at 2019-05-16

はじめに

dotnet core 3.0より、IValueTaskSourceの素として使えそうなManualResetValueTaskSourceCoreなるクラスが追加されたようなので、使い方等を書く。

IValueTaskSource自体については、 http://tooslowexception.com/implementing-custom-ivaluetasksource-async-without-allocations/ が詳しい。

この記事は、単純にライブラリから渡されたValueTaskをawaitするという場合には特に意識するところではないが、
例えば非同期Queueの処理結果を通知したい場合等、今まではTaskCompletionSourceを使っていたような場面で参考になると思う。
今回使ったソースは https://github.com/itn3000/valuetasklabs にまとめてある。

登場の経緯

C#にはasync awaitという非同期処理のための機能が存在する。
ここでクラスとしてTask<T>が主に使われているが、定義としてはクラスのため、どうしても実行の度にアロケーションが行われる。

アロケーションコスト

アロケーションのコストは、個々で見ると大きくはないが、これが数十万以上となってくると、GCやヒープアロケーションの負荷が無視できなくなってきて、効率的な実行を阻害してしまう場合がある。
これ自体は、非同期処理という性質上どうしてもコンテキストが変わるタイミングで必要な情報をスタック以外に確保する必要があるなど、仕方なかった所もある。

しかし、dotnet coreの進歩に伴い、パフォーマンスアップというのがdotnet coreの大きな目標の一つとなった(主にasp.net)ため、このオーバーヘッドをどうにかしたいという要望が出てきた。

ValueTaskの登場

そこで登場したのがValueTask<T>である。
これは従来クラスだったTask<T>に対し、構造体として定義されたTaskのようなオブジェクトとなる。
これにより、awaitを通らないケースが大半の場合の処理で大幅なパフォーマンスアップが見込めることになった。

// ValueTaskで性能向上が見込める場合
async ValueTask<int> XAsync()
{
  if([条件分岐)
  {
    // ここに入るケースはあまりないものとする
    return await YAsync();
  }
  // 大半はこちらのルートになるものとする
  return 0;
}

しかし、awaitのタイミングで結局Task<T>をアロケーションしてしまう+ボクシングが発生してしまうため、awaitの回数が多いと却って遅くなる場合があるという問題点があった。

ValueTask問題の解決

ValueTaskからコンテキスト情報を追い出し、少なくともValueTask内部では完全に新規アロケーションが発生しないようにするため、IValueTaskSourceというものが導入された。
IValueTaskSourceを実装したインスタンスを使い回すことにより、大量に非同期処理を行っても、使い回しの分だけしかアロケーションは発生しないので、より効率的にリソースを使用できるという仕組みになる。
これにより、System.Threading.Channels等のValueTaskを使用するプログラムでは、性能向上とアロケーションフリーが同時に達成された。
だが、 解説ページを見てもわかる通り、実装の仕方がややこしく、うまく実装しないと結局使わないより性能が劣化するということもあった。

そのため、パフォーマンスを重視する人たち(Kestrelを実装している人やStackOverflowの中の人等)は、このIValueTaskSourceを早速駆使して性能向上を図っていたようだが、一般人にはどうにも扱いが難しい代物だった。
そこで、実装を簡単にするべくnetcoreapp3.0及びnetstandard2.1から導入されるのが、冒頭で書いたManualResetValueTaskSourceCoreとなる。

なお、ManualResetValueTaskSourceCoreのソースはここにあるが、内部APIを利用しているためそのままでは実装はできない(特にSynchronizationContextにQueueするところ)。
しかし、参考にできる場所も多いと思う。

使い方

クラス定義

さて、このManualResetValueTaskSourceCore、直接IValueTaskSourceを実装しているわけではない
ManualResetValueTaskSourceCoreをメンバーに持つようなクラスを定義する必要がある。具体例としては以下。

    // using Microsoft.Extensions.ObjectPool;
    // from https://github.com/dotnet/corefx/blob/master/src/Common/tests/System/Threading/Tasks/Sources/ManualResetValueTaskSource.cs
    sealed class ManualResetValueTaskSource<T> : IValueTaskSource<T>, IValueTaskSource
    {
        private ManualResetValueTaskSourceCore<T> _core; // mutable struct; do not make this readonly

        public bool RunContinuationsAsynchronously { get => _core.RunContinuationsAsynchronously; set => _core.RunContinuationsAsynchronously = value; }
        public short Version => _core.Version;
        public void Reset() => _core.Reset();
        public void SetResult(T result) => _core.SetResult(result);
        public void SetException(Exception error) => _core.SetException(error);
        public void SetPool(ObjectPool<ManualResetValueTaskSource<T>> pool)
        {
            _pool = pool;
        }
        ObjectPool<ManualResetValueTaskSource<T>> _pool;

        public T GetResult(short token)
        {
            try
            {
                var ret = _core.GetResult(token);
                return ret;
            }
            finally
            {
                var pool = _pool;
                _pool = null;
                pool?.Return(this);
            }
        }
        void IValueTaskSource.GetResult(short token)
        {
            try
            {
                _core.GetResult(token);
            }
            finally
            {
                var pool = _pool;
                _pool = null;
                pool?.Return(this);
            }
        }

        public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token);
        public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _core.OnCompleted(continuation, state, token, flags);
    }

基本的には委譲するだけであるが、注意点としては、GetResult(short token)の時に、オブジェクトプールに自分自身を返還しているという点である。この辺りは厳密に考えると、CompareExchange等で安全性を見た方が良いかもしれないが、サンプルコードなのでとりあえずこのまま。

ValueTask生成側

さて、上記を定義したら、今度はValueTaskを生成する側である。基本的には、

  1. IValueTaskSourceの実装クラスを確保
    • オブジェクトプールから取り出す
    • 初期化もこの時行う
  2. 別スレッド等に確保したインスタンスを渡す
  3. return new ValueTask<T>(vts)でValueTaskを返す
  4. 別スレッドに渡した処理の中で、処理が完了したらvts.SetResult(result)を実行する

というようになる。具体例としては以下のようになる。

        // ValueTaskの生成
        public ValueTask<int> Enqueue()
        {
            if (_Worker.IsCompleted || _Worker.IsCanceled)
            {
                throw new Exception("thread already stopped");
            }
            var vts = _Pool.Get();
            vts.SetPool(_Pool);
            vts.Reset();
            // System.Threading.ChannelsにIValueTaskSourceのインスタンスを渡す
            if (!_Channel.Writer.TryWrite(vts))
            {
                throw new Exception("failed to write to channel");
            }
            return new ValueTask<int>(vts, vts.Version);
        }
        // ワーカースレッド
        async ValueTask Worker()
        {
            try
            {
                while (!_Cts.IsCancellationRequested)
                {
                    if (!await _Channel.Reader.WaitToReadAsync().ConfigureAwait(false))
                    {
                        break;
                    }
                    while (!_Cts.IsCancellationRequested && _Channel.Reader.TryRead(out var item))
                    {
                        item.SetResult(1);
                    }
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }

ValueTaskを使う側

単純に、他のTaskやValueTaskと同じようにawaitすればいいだけである。

参考値

参考までに今回の方法で実装した非同期キューと、TaskCompletionSource<int>で実装したソースの比較値を書いておく。
ベンチマークコード


BenchmarkDotNet=v0.11.5, OS=Windows 10.0.17134.706 (1803/April2018Update/Redstone4)
Intel Core i7-4712MQ CPU 2.30GHz (Haswell), 1 CPU, 8 logical and 4 physical cores
Frequency=2240914 Hz, Resolution=446.2465 ns, Timer=TSC
.NET Core SDK=3.0.100-preview3-010431
  [Host]   : .NET Core 3.0.0-preview3-27503-5 (CoreCLR 4.6.27422.72, CoreFX 4.7.19.12807), 64bit RyuJIT
  ShortRun : .NET Core 3.0.0-preview3-27503-5 (CoreCLR 4.6.27422.72, CoreFX 4.7.19.12807), 64bit RyuJIT

Job=ShortRun  IterationCount=3  LaunchCount=1  
WarmupCount=3  

Method LoopNum TaskNum Mean Error StdDev Gen 0 Gen 1 Gen 2 Allocated
ValueTaskBench 100000 1 21.05 ms 0.9455 ms 0.0518 ms - - - 2.73 KB
TcsBench 100000 1 220.13 ms 7.0272 ms 0.3852 ms 5000.0000 - - 2.6 KB
ValueTaskBench 100000 10 224.37 ms 33.5831 ms 1.8408 ms - - - 4.84 KB
TcsBench 100000 10 592.02 ms 185.3798 ms 10.1613 ms 30000.0000 - - 4.91 KB
ValueTaskBench 100000 100 2,173.28 ms 444.9744 ms 24.3905 ms - - - 26.14 KB
TcsBench 100000 100 5,038.77 ms 5,215.3341 ms 285.8701 ms 308000.0000 - - 28.26 KB

dotnet-sdk-3.0pre5での注意点

さて、IValueTaskSourceだが、netstandard2.1のライブラリで、System.Threading.Tasks.Extensionsを参照するライブラリと一緒に使おうとすると、ビルド時に以下のようなエラーになる。

C:\Program Files\dotnet\sdk\3.0.100-preview5-011568\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.RuntimeIdentifierInference.targets(157,5): message NETSDK1057: プレビュー版の .NET Core を使用しています。https://aka.ms/dotnet-core-preview をご覧ください [G:\src\gitrepos\dotnet-sandbox\valuetasklabs\valuetasklabs.lib\valuetasklabs.lib.csproj]
QueueWithValueTask.cs(11,50): error CS0433: 型 'IValueTaskSource<TResult>' が 'System.Threading.Tasks.Extensions, Version=4.2.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' と 'netstandard, Version=2.1.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' の両方に存在します。 [G:\src\gitrepos\dotnet-sandbox\valuetasklabs\valuetasklabs.lib\valuetasklabs.lib.csproj]
QueueWithValueTask.cs(11,71): error CS0433: 型 'IValueTaskSource' が 'System.Threading.Tasks.Extensions, Version=4.2.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' と 'netstandard, Version=2.1.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' の両方に存在します。 [G:\src\gitrepos\dotnet-sandbox\valuetasklabs\valuetasklabs.lib\valuetasklabs.lib.csproj]
QueueWithValueTask.cs(39,14): error CS0433: 型 'IValueTaskSource' が 'System.Threading.Tasks.Extensions, Version=4.2.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' と 'netstandard, Version=2.1.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' の両方に存在します。 [G:\src\gitrepos\dotnet-sandbox\valuetasklabs\valuetasklabs.lib\valuetasklabs.lib.csproj]
QueueWithValueTask.cs(57,15): error CS0433: 型 'ValueTask' が 'System.Threading.Tasks.Extensions, Version=4.2.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' と 'netstandard, Version=2.1.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' の両方に存在します。 [G:\src\gitrepos\dotnet-sandbox\valuetasklabs\valuetasklabs.lib\valuetasklabs.lib.csproj]
QueueWithValueTask.cs(78,16): error CS0433: 型 'ValueTask<TResult>' が 'System.Threading.Tasks.Extensions, Version=4.2.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' と 'netstandard, Version=2.1.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' の両方に存在します。 [G:\src\gitrepos\dotnet-sandbox\valuetasklabs\valuetasklabs.lib\valuetasklabs.lib.csproj]
QueueWithValueTask.cs(40,16): error CS0433: 型 'ValueTaskSourceStatus' が 'System.Threading.Tasks.Extensions, Version=4.2.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' と 'netstandard, Version=2.1.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' の両方に存在します。 [G:\src\gitrepos\dotnet-sandbox\valuetasklabs\valuetasklabs.lib\valuetasklabs.lib.csproj]
QueueWithValueTask.cs(41,89): error CS0433: 型 'ValueTaskSourceOnCompletedFlags' が 'System.Threading.Tasks.Extensions, Version=4.2.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' と 'netstandard, Version=2.1.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51' の両方に存在します。 [G:\src\gitrepos\dotnet-sandbox\valuetasklabs\valuetasklabs.lib\valuetasklabs.lib.csproj]
QueueWithValueTask.cs(39,14): error CS0538: '明示的インターフェイス宣言の中の 'IValueTaskSource' はインターフェイスではありません。 [G:\src\gitrepos\dotnet-sandbox\valuetasklabs\valuetasklabs.lib\valuetasklabs.lib.csproj]

ビルドに失敗しました。

csprojは以下のようになる。

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <TargetFramework>netstandard2.1</TargetFramework>
    <LangVersion>8.0</LangVersion>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="System.Threading.Channels" Version="4.6.0-preview4.19212.13" />
    <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="2.2.0" />
  </ItemGroup>

</Project>

これは、netstandard2.1でIValueTaskSourceとValueTaskが標準入りしたため、nugetパッケージのSystem.Threading.Tasks.Extensionsのものと競合を起こしているためである。
netcoreapp3.0では特殊実装されているため問題は無いが、preview段階でのnetstandard2.1の実装の際は注意が必要。
同パッケージを使っているライブラリは多いため(NpgsqlとかSE.Redisとか)、割と致命的だと思うので、リリースまでには対処してくるとは思う。
3.0pre6で対応済み

終りに

今までTaskCompletionSourceを使用していた部分に採用すれば、より効率の良い処理が実装できると思うので、早くdotnet core 3.0リリースを願うばかりである。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16