LoginSignup
57
48

More than 5 years have passed since last update.

System.Threading.Channelsを使う

Last updated at Posted at 2018-05-15

はじめに

マルチスレッドでの非同期データ受け渡しライブラリのSystem.Threading.Channels(corefxlabにあったころはSystem.Threading.Tasks.Channels)が、corefxに統合され、この度4.5.0-rc1としてリリースされたので、
さすがに大きな変更はないだろうと踏んで使い方などを書く。

何ができるようになるか

非同期でのプロデューサー・コンシューマーパターンを作るのがより容易になる。

特徴としては以下のようになる

  • 順序は必ずFIFO(先入れ先出し)
  • 読み:書き=M:1、1:N、M:Nのパターンに対応
  • asyncと親和的な設計
  • パフォーマンスに配慮
    • netcoreapp2.1では更に特化実装で速い

注意: 現在netcoreapp2.1で、ConcurrentQueueが特定のケースでnetcoreapp2.0よりも遅くなるという現象が発生しており、System.Threading.Channelsもこの影響を受けるとのこと。
具体的には読み込み側スレッドが複数いた場合に性能がかなり不安定になる。(平均自体は1-2倍遅くなる程度だが、分散はかなり大きい)
2.1.0ではおそらく修正されず、2.1.1以降に修正される見込みなので、使用する際は必ずパフォーマンスを計測し、許容範囲に収まるかどうか、確認することをお勧めする。

関連issue: https://github.com/dotnet/corefx/issues/29595

類似ライブラリとの比較

BlockingCollection<T>

MSによる解説

こちらも同パターンを実現するためのもの、とされている。

しかし、こちらはasyncで運用するのにやや難がある実装になっていること、また、パフォーマンスもやや劣っている。
ただし、ChannelsはFIFO固定なのに対し、任意のコレクションを扱うことができるという利点がある。

System.Threading.Tasks.DataFlow

こちらは単純なプロデューサー・コンシューマーパターンというよりは、よりデータの流れというものを意識した要求を実現するためのライブラリとなっている。

ただし、こちらの方は処理ブロックを定義してつなげる処理が必要なことなど、やや準備が煩雑で、気軽に使えるかというとそうでもない。

ConcurrentQueue<T> + セマフォ(またはミューテックス)

ConcurrentQueueを介して読み書きをする。ただし、待機処理等はないため、セマフォ等を使ってそれを実現する。
こちらは自分で実装する範囲が大きく、面倒ごとも多いため、それらをやってくれるChannelの方が便利である。

導入

導入はnugetで行う。パッケージ名はSystem.Threading.Channels

System.Threading.Channelsで提供している機能

以下二つのクラスを起点として使用する

  • System.Threading.Channels.Channel
  • System.Threading.Channels.Channel<T>

生成

System.Threading.Channels.Channel.CreateBounded<T>あるいはSystem.Threading.Channels.Channel.CreateUnbounded<T>を使用する

CreateUnbounded<T>

長さ上限無しのチャンネルを作成する。作成時に追加で以下のようなオプションを追加することができる

  • AllowSynchronousContinuations(bool)
    • チャンネル読み/書き/待ち操作を非同期で行った場合の継続タスクを同期的に実行するかどうか
    • trueの方が性能が出るとのことだが、falseの方が何か性能でるような?
    • デフォルトはfalse
  • SingleReader/SingleWriter(bool)
    • それぞれ読み書きが同時実行される可能性が無い場合にTrueに設定すれば、性能向上が見込める
    • 同時実行される可能性があるときにTrueに設定すると、予期しない動作になることがある
    • デフォルトはfalse

多くの場合はこちらで問題ないが、それでも読み出し側の処理が追い付かないと、メモリ消費量が大変なことになるので、シビアな環境で使う場合は要注意

CreateBounded<T>

長さ上限付きのチャンネルを作成する。上限値のみ渡すこともできるが、作成時に追加でオプションを渡すことができる。
この時に指定できるオプションには、UnboundedChannelOptionsで指定できるものに加え、以下二つが追加指定可能。

  • Capacity: 上限値(int)
  • FullMode: 上限値に達した時にどう振る舞うか(BoundedChannelFullMode)
    • DropNewest: キューの中で一番新しいものを捨てて新規値と入れ替える
    • DropOldest: キューの中で一番古いものを捨てて新規と入れ替える
    • DropWrite: 新規値を捨てる
    • Wait: 追加可能になるまで待たせる(デフォルト)

仕様で動作が決まっているならばWait以外の選択肢もあるが、黙って値を破棄する動作になるので、基本的にはWait推奨

読み書き

生成されたインスタンスには、(ChannelReader<T>)Reader及び(ChannelWriter<T>)Writerがあるので、これを通して読み書きを行う。

書き込み

Writerメンバの以下のメソッドを使用する

  • ValueTask WaitToWriteAsync(CancellationToken ct)
    • 書き込み可能になるまで待機する
    • これがtrueを返しても、書き込みを割り込まれる可能性があるので注意すること
    • ctがキャンセル状態になるとOperationCancelledExceptionを投げる
  • bool TryWrite(T item)
    • 値を書き込む
    • 制限付きでFullMode==Waitの場合、いっぱいだった場合はfalseになるが、それ以外はtrueになる
  • ValueTask WriteAsync(T item, CancellationToken ct)
    • 値を非同期で書き込む
    • WaitToWriteAsync+TryWriteを行っているイメージ
  • void Complete(Exception e),bool TryComplete(Exception e)
    • 書き込み完了マークを付加し、以後全ての書き込み操作で例外を出すようにする
    • Disposeは実装していないので、終了を表現したい場合はこれを使う
    • Completeした後に更にTryComplete/Completeを呼んだ場合、TryCompleteはfalseを返し、Completeは例外を出す
    • 引数のExceptionは通常nullだが、ここに例外を指定すると、後の読み側のCompletionがFaulted状態になる

読み込み

Readerメンバの以下を使用する

  • ValueTask<bool> WaitToReadAsync(CancellationToken ct)
    • 読み込めるようになるまで(値が書き込まれるまで)待つ
    • 書き込みの方で完了マークがついており、かつもう読み込めるものが無い場合は、falseが返ってくる
  • bool TryRead(out T item)
    • 値の読出しを行う
    • 読み込めるものが無かった場合はfalseを返す
  • ValueTask<T> ReadAsync(CancellationToken ct)
    • 値の読出しを非同期で行う
    • WaitToReadAsync + TryReadを行うイメージ
  • Task Completion(プロパティ)
    • 書き込み側が完了しており(Complete/TryCompleteが呼ばれている)、かつ全ての要素が全て読みだされると完了状態になる
    • 書き込み側のCompleteで例外を指定していると、Faulted状態になる = await等でエラーになる
    • 終了監視用?

使用例

一般的にどういう手順で処理するべきかを書く。なお、基本的にはWaitで待ち、Tryで一気に読み出すのが効率が良いらしい。

書き込み側

  1. ChannelWriter<T>.WaitToWriteAsync()で書き込み可能になるまで待つ(大抵一瞬)
  2. ChannelWriter.TryWrite()で書き込み
    • この時、falseが戻ってきて、回復不能と判断したらエラー処理を行う(Complete(Exception)を実行する)
  3. アプリシャットダウン時や、全てのデータが書き終わった後はCompleteを実行する

読み込み側

  1. ChannelReader<T>.WaitToReadAsync()で読み込み可能、またはキューに何か入ってくるのを待つ
    • falseが来た場合はCompleteが呼ばれ、かつ残データが空ということなので終了
  2. TryReadがfalseになるまでデータ読み出し→処理を実行する
  3. 最後にCompletionを見てエラー処理

コード例

リクエストは任意のスレッドから投げたいけど、実際の処理はシングルスレッドに集約するというやつ。
大して短くなってないじゃないかと思われるかもしれないけど、これでも結構考慮事項が減って楽になっていると思う。
後、複数タスクの処理もやろうと思えば簡単に移行できる。

namespace channelstest
{
    using System;
    using System.Threading.Channels;
    using System.Threading;
    using System.Threading.Tasks;
    sealed class ConsumerAsync : IDisposable
    {
        Task _ConsumerThread;
        public ConsumerAsync()
        {
            _Channel = Channel.CreateUnbounded<TaskCompletionSource<int>>();
            _ConsumerThread = Worker();
        }

        public async Task<int> EnqueueAndGetValue()
        {
            // net461 or before, RunContinuationsAsynchronously may be ignored because of 
            // https://github.com/dotnet/coreclr/issues/2021 and https://blog.stephencleary.com/2012/12/dont-block-in-asynchronous-code.html
            var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
            await _Channel.Writer.WriteAsync(tcs).ConfigureAwait(false);
            return await tcs.Task.ConfigureAwait(false);
        }

        async Task Worker()
        {
            await Task.Yield();
            int currentValue = 0;
            try
            {
                while(await _Channel.Reader.WaitToReadAsync().ConfigureAwait(false))
                {
                    while (_Channel.Reader.TryRead(out var item))
                    {
                        // net461 or before
                        // var t1 = Task.Run(() => item.TrySetResult(Interlocked.Increment(ref currentValue)));
                        // item.Task.Wait();
                        item.TrySetResult(Interlocked.Increment(ref currentValue));
                    }
                }
            }
            catch (Exception e)
            {
                Console.WriteLine($"unexpected exception:{e}");
                _Channel.Writer.TryComplete(e);
            }
        }

        Channel<TaskCompletionSource<int>> _Channel;

        bool _Disposed = false;

        public void Dispose()
        {
            if (!_Disposed)
            {
                var completeSuccess = _Channel.Writer.TryComplete();
                if (_ConsumerThread != null)
                {
                    _ConsumerThread.Wait();
                    _ConsumerThread = null;
                }
                _Disposed = true;
            }
        }
    }
}

終りに

corefxlabにあったころと比べると、随分機能削ったなーという印象。まあ基幹に入る場合はなるべくシンプルな方が良いと思うのでこれはこれで。

実際IOが絡むと、単一スレッドに集約みたいなことはよくやるので、このようなちょうどいいライブラリが出たというのは大変うれしい。
Spanみたいに、どうしてもこれでなければ実現できない機能という事でもないけど、やっぱり便利なツールは使っていきたい。

パフォーマンス等の話はこちらの記事で

後、より重要な機能と思われるSystem.IO.Pipelinesも4.5.0-rc1が出たけど、これは誰か他の人がより詳しい解説を書いてくれると思う。

57
48
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
57
48