12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

TaskCompletionSourceの代わりにChannelを使えばデフォルト設定でもデッドロックしない話追試

Last updated at Posted at 2021-05-19

この投稿について

TaskCompletionSourceについて

.NET 4.0以降には、TaskCompletionSource<T>という非同期処理用のユーティリティがあります。イベントベースのAPIをラップしてasync/awaitの中に組み込むために使うもので、Future的な物です。Task<T> Task { get; }void SetResult(T result);を持っており、tcs.SetResult("hoge")した結果を別タスクでstring result = await tcs.Task;として受け取るという調子で使います。

TaskCompletionSourceについての面白い記事

The danger of TaskCompletionSource class という記事がMicrosoft Devblogsに載っています。
結論だけ抜き出すと「TaskCompletionSource.Taskがデフォルト設定ではシングルスレッド寄りで、デッドロックすることがあるのでTaskCreationOptions.RunContinuationsAsynchronouslyを指定したほうがよい」という記事です。解説がawaitの実装内部まで踏み込んでいて、面白い記事です。

記事中にサンプルがあるのですが、断片的な感じでコピペして実行ができなかったので、補いました、というのがこの投稿です。

Channelについて

TaskCompletionSourceより後発のユーティリティとしてChannelがあります(初出は .NET Core 3.0)。名前の表す通りキューの長さが指定できて、無限もOKという、TaskCompletionSourceの上位互換です。

こちらも上と同様、AllowSynchronousContinuations
というタスク生成オプションがあります。デフォルトはfalseで、TaskCompletionSourceと逆です。つまり、taskCompletionSource.SetResultはデフォルト設定ではスレッドが一本に戻りますが、await channel.WriteAsync(...)では戻りません。

この点があるのでChannelでデッドロックしないことはわかっているのですが、一応試しました。これもソースを載せます。

TaskCompletionSourceがデッドロックするサンプル

まず、デッドロックするサンプルです。元記事になかったPrint関数を補って実行できるようにしました。

ソース

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

using DatabaseFacade facade = new();
using Logger logger = new(facade);

logger.WriteLine("My message");
await Task.Delay(100);

await facade.SaveAsync("Another string");
Util.Print("The string is saved");

public static class Util
{
    public static void Print(string msg)
    {
        Console.WriteLine($"[{Thread.CurrentThread.ManagedThreadId}] {msg}");
    }
}

public class DatabaseFacade : IDisposable
{
    private readonly BlockingCollection<(string item, TaskCompletionSource<string> result)> _queue;
    private readonly Task _processItemsTask;

    public DatabaseFacade()
    {
        _queue = new();
        _processItemsTask = Task.Run(ProcessItems);
    }

    public void Dispose() => _queue.CompleteAdding();

    public async Task SaveAsync(string command)
    {
        Util.Print($"Saving: {command}");

        // ここで TaskCreationOptions.RunContinuationsAsynchronously を指定するとデッドロックしない
        TaskCompletionSource<string> tcs = new();
        _queue.Add((item: command, result: tcs));
        string result = await tcs.Task;
        Util.Print($"Saved: {result}");
    }

    private async Task ProcessItems()
    {
        foreach ((string item, TaskCompletionSource<string> result) in _queue.GetConsumingEnumerable())
        {
            Util.Print($"DatabaseFacade: executing '{item}'...");

            // Waiting a bit to emulate some IO-bound operation
            await Task.Delay(100);
            result.SetResult($"OK ({item})");

            Util.Print("DatabaseFacade: done.");
        }
    }
}

public class Logger : IDisposable
{
    private readonly DatabaseFacade _facade;
    private readonly BlockingCollection<string> _queue = new();

    private readonly Task _saveMessageTask;

    public Logger(DatabaseFacade facade)
    {
        _facade = facade;
        _saveMessageTask = Task.Run(SaveMessage);
    }

    public void Dispose() => _queue.CompleteAdding();

    public void WriteLine(string message) => _queue.Add(message);

    private async Task SaveMessage()
    {
        foreach (string message in _queue.GetConsumingEnumerable())
        {
            // "Saving" message to the file
            Util.Print($"Logging: {message}");

            // And to our database through the facade
            await _facade.SaveAsync(message);
        }
    }
}

実行結果

以下、ログ行頭の番号はスレッドIDです。

[5] Logging: My message
[5] Saving: My message
[4] DatabaseFacade: executing 'My message'...
[4] Saving: Another string
[4] Saved: OK (My message)
<ここで出力が止まって終了しない>

記事で解説されているように、TaskCreationOptions.RunContinuationsAsynchronouslyを指定するとデッドロックしなくなります。

[5] Logging: My message
[5] Saving: My message
[4] DatabaseFacade: executing 'My message'...
[5] Saving: Another string
[4] DatabaseFacade: done.
[7] Saved: OK (My message)
[4] DatabaseFacade: executing 'Another string'...
[4] DatabaseFacade: done.
[5] Saved: OK (Another string)
[5] The string is saved
<終了>

Channelがデッドロックしないサンプル

上記サンプルでTaskCompletionSourceを使っているところに代わりにChannelを使うとこうなります。1

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

using DatabaseFacade facade = new();
using Logger logger = new(facade);

logger.WriteLine("My message");
await Task.Delay(100);

await facade.SaveAsync("Another string");
Util.Print("The string is saved");

public static class Util
{
    public static void Print(string msg)
    {
        Console.WriteLine($"[{Thread.CurrentThread.ManagedThreadId}] {msg}");
    }
}

public class DatabaseFacade : IDisposable
{
    private readonly BlockingCollection<(string item, ChannelWriter<string> result)> _queue;
    private readonly Task _processItemsTask;

    public DatabaseFacade()
    {
        _queue = new();
        _processItemsTask = Task.Run(ProcessItems);
    }

    public void Dispose() =>
        _queue.CompleteAdding();

    public async Task SaveAsync(string command)
    {
        Util.Print($"Saving: {command}");

        // ここで new BoundedChannelOptions(1) { AllowSynchronousContinuations = true } を渡すとデッドロックするようになる
        Channel<string> tcs = Channel.CreateBounded<string>(1);
        _queue.Add((item: command, result: tcs));

        string result = await tcs.Reader.ReadAsync();
        Util.Print($"Saved: {result}");
    }

    private async Task ProcessItems()
    {
        foreach ((string item, ChannelWriter<string> result) in _queue.GetConsumingEnumerable())
        {
            Util.Print($"DatabaseFacade: executing '{item}'...");

            // Waiting a bit to emulate some IO-bound operation
            await Task.Delay(100);
            await result.WriteAsync($"OK {item}");
            result.Complete();
            Util.Print("DatabaseFacade: done.");
        }
        Util.Print($"cancelled");
    }
}

public class Logger : IDisposable
{
    private readonly DatabaseFacade _facade;
    private readonly BlockingCollection<string> _queue;

    private readonly Task _saveMessageTask;

    public Logger(DatabaseFacade facade)
    {
        _facade = facade;
        _saveMessageTask = Task.Run(SaveMessage);
        _queue = new();
    }

    public void Dispose() =>
        _queue.CompleteAdding();

    public void WriteLine(string message) =>
         _queue.Add(message);

    private async Task SaveMessage()
    {
        foreach (string message in _queue.GetConsumingEnumerable())
        {
            // "Saving" message to the file
            Util.Print($"Logging: {message}");

            // And to our database through the facade
            await _facade.SaveAsync(message);
        }
        Util.Print($"cancelled");
    }
}

実行結果

[5] Logging: My message
[5] Saving: My message
[4] DatabaseFacade: executing 'My message'...
[4] Saving: Another string
[5] Saved: OK (My message)
[4] DatabaseFacade: done.
[4] DatabaseFacade: executing 'Another string'...
[4] DatabaseFacade: done.
[7] Saved: OK (Another string)
[7] The string is saved
<終了>

標準でTaskCreationOptions.RunContinuationsAsynchronously相当の動作をするようです。
シングルスレッド寄りの動作にするオプション(new BoundedChannelOptions(1) { AllowSynchronousContinuations = true })を渡したところ、しっかりデッドロックしました。

[5] Logging: My message
[5] Saving: My message
[4] DatabaseFacade: executing 'My message'...
[5] Saving: Another string
[5] Saved: OK (My message)
<出力がここで止まり終了しない>

感想

ChannelのデフォルトオプションはTaskCompletionSourceより安全寄りになっており、性能を追求したいときにオプションを指定して安全装置を外すという思想のようでよくできていると思います。

あと、よく使うConfigureAwait(false)はこのケースでは役に立たないようでした(たぶん問題はGetConsumingEnumerableSetResultにあるため)。

リンク

  1. TaskCompletionSourceだけでなくBlockingCollectionChannelで置き換えできますが、今回確認したい点については無関係だったのでなるべく元記事の形のままにしました

12
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?