この投稿について
TaskCompletionSourceについて
.NET 4.0以降には、TaskCompletionSource<T>
という非同期処理用のユーティリティがあります。イベントベースのAPIをラップしてasync/awaitの中に組み込むために使うもので、Future的な物です。Task<T> Task { get; }
とvoid SetResult(T result);
を持っており、tcs.SetResult("hoge")
した結果を別タスクでstring result = await tcs.Task;
として受け取るという調子で使います。
TaskCompletionSourceについての面白い記事
The danger of TaskCompletionSource class という記事がMicrosoft Devblogsに載っています。
結論だけ抜き出すと「TaskCompletionSource.Task
がデフォルト設定ではシングルスレッド寄りで、デッドロックすることがあるのでTaskCreationOptions.RunContinuationsAsynchronously
を指定したほうがよい」という記事です。解説がawait
の実装内部まで踏み込んでいて、面白い記事です。
記事中にサンプルがあるのですが、断片的な感じでコピペして実行ができなかったので、補いました、というのがこの投稿です。
Channelについて
TaskCompletionSource
より後発のユーティリティとしてChannel
があります(初出は .NET Core 3.0)。名前の表す通りキューの長さが指定できて、無限もOKという、TaskCompletionSourceの上位互換です。
こちらも上と同様、AllowSynchronousContinuations
というタスク生成オプションがあります。デフォルトはfalse
で、TaskCompletionSourceと逆です。つまり、taskCompletionSource.SetResult
はデフォルト設定ではスレッドが一本に戻りますが、await channel.WriteAsync(...)
では戻りません。
この点があるのでChannelでデッドロックしないことはわかっているのですが、一応試しました。これもソースを載せます。
TaskCompletionSourceがデッドロックするサンプル
まず、デッドロックするサンプルです。元記事になかったPrint関数を補って実行できるようにしました。
ソース
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using DatabaseFacade facade = new();
using Logger logger = new(facade);
logger.WriteLine("My message");
await Task.Delay(100);
await facade.SaveAsync("Another string");
Util.Print("The string is saved");
public static class Util
{
public static void Print(string msg)
{
Console.WriteLine($"[{Thread.CurrentThread.ManagedThreadId}] {msg}");
}
}
public class DatabaseFacade : IDisposable
{
private readonly BlockingCollection<(string item, TaskCompletionSource<string> result)> _queue;
private readonly Task _processItemsTask;
public DatabaseFacade()
{
_queue = new();
_processItemsTask = Task.Run(ProcessItems);
}
public void Dispose() => _queue.CompleteAdding();
public async Task SaveAsync(string command)
{
Util.Print($"Saving: {command}");
// ここで TaskCreationOptions.RunContinuationsAsynchronously を指定するとデッドロックしない
TaskCompletionSource<string> tcs = new();
_queue.Add((item: command, result: tcs));
string result = await tcs.Task;
Util.Print($"Saved: {result}");
}
private async Task ProcessItems()
{
foreach ((string item, TaskCompletionSource<string> result) in _queue.GetConsumingEnumerable())
{
Util.Print($"DatabaseFacade: executing '{item}'...");
// Waiting a bit to emulate some IO-bound operation
await Task.Delay(100);
result.SetResult($"OK ({item})");
Util.Print("DatabaseFacade: done.");
}
}
}
public class Logger : IDisposable
{
private readonly DatabaseFacade _facade;
private readonly BlockingCollection<string> _queue = new();
private readonly Task _saveMessageTask;
public Logger(DatabaseFacade facade)
{
_facade = facade;
_saveMessageTask = Task.Run(SaveMessage);
}
public void Dispose() => _queue.CompleteAdding();
public void WriteLine(string message) => _queue.Add(message);
private async Task SaveMessage()
{
foreach (string message in _queue.GetConsumingEnumerable())
{
// "Saving" message to the file
Util.Print($"Logging: {message}");
// And to our database through the facade
await _facade.SaveAsync(message);
}
}
}
実行結果
以下、ログ行頭の番号はスレッドIDです。
[5] Logging: My message
[5] Saving: My message
[4] DatabaseFacade: executing 'My message'...
[4] Saving: Another string
[4] Saved: OK (My message)
<ここで出力が止まって終了しない>
記事で解説されているように、TaskCreationOptions.RunContinuationsAsynchronously
を指定するとデッドロックしなくなります。
[5] Logging: My message
[5] Saving: My message
[4] DatabaseFacade: executing 'My message'...
[5] Saving: Another string
[4] DatabaseFacade: done.
[7] Saved: OK (My message)
[4] DatabaseFacade: executing 'Another string'...
[4] DatabaseFacade: done.
[5] Saved: OK (Another string)
[5] The string is saved
<終了>
Channelがデッドロックしないサンプル
上記サンプルでTaskCompletionSource
を使っているところに代わりにChannel
を使うとこうなります。1
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using DatabaseFacade facade = new();
using Logger logger = new(facade);
logger.WriteLine("My message");
await Task.Delay(100);
await facade.SaveAsync("Another string");
Util.Print("The string is saved");
public static class Util
{
public static void Print(string msg)
{
Console.WriteLine($"[{Thread.CurrentThread.ManagedThreadId}] {msg}");
}
}
public class DatabaseFacade : IDisposable
{
private readonly BlockingCollection<(string item, ChannelWriter<string> result)> _queue;
private readonly Task _processItemsTask;
public DatabaseFacade()
{
_queue = new();
_processItemsTask = Task.Run(ProcessItems);
}
public void Dispose() =>
_queue.CompleteAdding();
public async Task SaveAsync(string command)
{
Util.Print($"Saving: {command}");
// ここで new BoundedChannelOptions(1) { AllowSynchronousContinuations = true } を渡すとデッドロックするようになる
Channel<string> tcs = Channel.CreateBounded<string>(1);
_queue.Add((item: command, result: tcs));
string result = await tcs.Reader.ReadAsync();
Util.Print($"Saved: {result}");
}
private async Task ProcessItems()
{
foreach ((string item, ChannelWriter<string> result) in _queue.GetConsumingEnumerable())
{
Util.Print($"DatabaseFacade: executing '{item}'...");
// Waiting a bit to emulate some IO-bound operation
await Task.Delay(100);
await result.WriteAsync($"OK {item}");
result.Complete();
Util.Print("DatabaseFacade: done.");
}
Util.Print($"cancelled");
}
}
public class Logger : IDisposable
{
private readonly DatabaseFacade _facade;
private readonly BlockingCollection<string> _queue;
private readonly Task _saveMessageTask;
public Logger(DatabaseFacade facade)
{
_facade = facade;
_saveMessageTask = Task.Run(SaveMessage);
_queue = new();
}
public void Dispose() =>
_queue.CompleteAdding();
public void WriteLine(string message) =>
_queue.Add(message);
private async Task SaveMessage()
{
foreach (string message in _queue.GetConsumingEnumerable())
{
// "Saving" message to the file
Util.Print($"Logging: {message}");
// And to our database through the facade
await _facade.SaveAsync(message);
}
Util.Print($"cancelled");
}
}
実行結果
[5] Logging: My message
[5] Saving: My message
[4] DatabaseFacade: executing 'My message'...
[4] Saving: Another string
[5] Saved: OK (My message)
[4] DatabaseFacade: done.
[4] DatabaseFacade: executing 'Another string'...
[4] DatabaseFacade: done.
[7] Saved: OK (Another string)
[7] The string is saved
<終了>
標準でTaskCreationOptions.RunContinuationsAsynchronously
相当の動作をするようです。
シングルスレッド寄りの動作にするオプション(new BoundedChannelOptions(1) { AllowSynchronousContinuations = true }
)を渡したところ、しっかりデッドロックしました。
[5] Logging: My message
[5] Saving: My message
[4] DatabaseFacade: executing 'My message'...
[5] Saving: Another string
[5] Saved: OK (My message)
<出力がここで止まり終了しない>
感想
ChannelのデフォルトオプションはTaskCompletionSourceより安全寄りになっており、性能を追求したいときにオプションを指定して安全装置を外すという思想のようでよくできていると思います。
あと、よく使うConfigureAwait(false)
はこのケースでは役に立たないようでした(たぶん問題はGetConsumingEnumerable
やSetResult
にあるため)。
リンク
- The danger of TaskCompletionSource class
- System.Threading.Channels.Channel - .NET API browser
- System.Threading.Tasks.TaskCompletionSource - .NET API browser
-
TaskCompletionSource
だけでなくBlockingCollection
もChannel
で置き換えできますが、今回確認したい点については無関係だったのでなるべく元記事の形のままにしました ↩