今日こんなコードに出会ってよく理解できなかった。しかし、ここから、Multithreaded execution was detected.
がスローされているのは間違いない。しかしやってることがよくわからなかった。
DurableOrchestrationContext.cs
private void ThrowIfInvalidAccess()
{
if (this.innerContext == null)
{
throw new InvalidOperationException("The inner context has not been initialized.");
}
// TODO: This should be considered best effort because it's possible that async work
// was scheduled and the CLR decided to run it on the same thread. The only guaranteed
// way to detect cross-thread access is to do it in the Durable Task Framework directly.
if (this.owningThreadId != -1 && this.owningThreadId != Thread.CurrentThread.ManagedThreadId)
{
throw new InvalidOperationException(
"Multithreaded execution was detected. This can happen if the orchestrator function previously resumed from an unsupported async callback.");
}
}
コードを調べていると結局 Thread.CurrentThread.ManagedThreadId
が現在のスレッドで、このクラスが初期化されたときに、owingThread
にスレッドIDが記入される。つまり、ここで、セットしたスレッドIDと違っていれば例外をスローする。つまり、シングルスレッドしか許されていないことを表現するためのコードの様子。
再現しようとしてみる
さて、じゃあ、マルチスレッドが発生するようなコードを試しに書いてみた。実はなかなかうまくいかなかった。スレッドプールは相当優秀な様子。しかし、Parallel.For
や、親子のスレッドを作ること再現できた。
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace MultiThreadingTesting
{
class Program
{
static void Main(string[] args)
{
var context = new OrchestratorContext();
Console.WriteLine("No threading");
context.AssignToCurrentThread();
context.Execute();
context.ExecuteAsync().Wait();
Console.ReadKey();
}
class OrchestratorContext
{
private int owingThreadId;
internal void AssignToCurrentThread()
{
this.owingThreadId = Thread.CurrentThread.ManagedThreadId;
}
public void Execute()
{
Console.WriteLine($"Execute(): OwingThreadId: {this.owingThreadId} Current: {Thread.CurrentThread.ManagedThreadId}");
var t = Task.Factory.StartNew(() =>
{
Console.WriteLine($"Parent: OwingThreadId: {owingThreadId} Current: {Thread.CurrentThread.ManagedThreadId}");
Task.Factory.StartNew(() =>
{
Console.WriteLine($"Children: OwingThreadId: {owingThreadId} Current: {Thread.CurrentThread.ManagedThreadId}");
}, TaskCreationOptions.AttachedToParent);
});
}
public async Task ExecuteAsync()
{
Console.WriteLine($"ExecuteAsync(): OwingThreadId: {this.owingThreadId} Current: {Thread.CurrentThread.ManagedThreadId}");
var tasks = new List<Task>();
for (int i = 0; i < 20; i++)
{
var task = HelloAsync();
tasks.Add(task);
}
await Task.WhenAll(tasks);
Parallel.For(0, 5, i =>
{
for (int j = 0; j < 5; j++)
{
Thread.Sleep(1);
Console.WriteLine($"Parallel.For(): OwingThreadId: {this.owingThreadId} Current: {Thread.CurrentThread.ManagedThreadId}");
}
});
}
public async Task HelloAsync()
{
Console.WriteLine($"HelloAsync(): OwingThreadId: {this.owingThreadId} Current: {Thread.CurrentThread.ManagedThreadId}");
await Hello2Async();
await Hello2Async();
}
public async Task Hello2Async()
{
Console.WriteLine($"Hello2Async(): OwingThreadId: {this.owingThreadId} Current: {Thread.CurrentThread.ManagedThreadId}");
}
}
}
}
この実行結果はこんな感じ。クラスのなかで、Async のクラスを何回も呼びまくっても、スレッドプールがうまくやっているため、スレッドが切り替わっていないのもわかる。親子や、Parallel.For だと様子が違うみたいだ。
No threading
Execute(): OwingThreadId: 1 Current: 1
Parent: OwingThreadId: 1 Current: 3
Children: OwingThreadId: 1 Current: 4
ExecuteAsync(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
HelloAsync(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Hello2Async(): OwingThreadId: 1 Current: 1
Parallel.For(): OwingThreadId: 1 Current: 3
Parallel.For(): OwingThreadId: 1 Current: 5
Parallel.For(): OwingThreadId: 1 Current: 3
Parallel.For(): OwingThreadId: 1 Current: 3
Parallel.For(): OwingThreadId: 1 Current: 1
Parallel.For(): OwingThreadId: 1 Current: 4
Parallel.For(): OwingThreadId: 1 Current: 6
Parallel.For(): OwingThreadId: 1 Current: 5
Parallel.For(): OwingThreadId: 1 Current: 5
Parallel.For(): OwingThreadId: 1 Current: 1
Parallel.For(): OwingThreadId: 1 Current: 6
Parallel.For(): OwingThreadId: 1 Current: 1
Parallel.For(): OwingThreadId: 1 Current: 3
Parallel.For(): OwingThreadId: 1 Current: 6
Parallel.For(): OwingThreadId: 1 Current: 3
Parallel.For(): OwingThreadId: 1 Current: 6
Parallel.For(): OwingThreadId: 1 Current: 5
Parallel.For(): OwingThreadId: 1 Current: 6
Parallel.For(): OwingThreadId: 1 Current: 4
Parallel.For(): OwingThreadId: 1 Current: 4
Parallel.For(): OwingThreadId: 1 Current: 5
Parallel.For(): OwingThreadId: 1 Current: 1
Parallel.For(): OwingThreadId: 1 Current: 1
Parallel.For(): OwingThreadId: 1 Current: 4
Parallel.For(): OwingThreadId: 1 Current: 4
考察
もともとはこのコードは、Durable Functions で、マルチスレッドのコードを禁止するためのコードだ。しかし、残念ながらスレッドプールがあるため、必ずしもマルチスレッドのコードが出てくとされるわけではない。だから、注意書きに次のように書いていたのだろう。
// TODO: This should be considered best effort because it's possible that async work
// was scheduled and the CLR decided to run it on the same thread. The only guaranteed
// way to detect cross-thread access is to do it in the Durable Task Framework directly.
TODO: これは、ベストエフォートの解決策。async の仕事がスケジューリングされても、同じスレッドで実行される可能性がある。Durable Functions から、クロススレッドを検出するための唯一の保証された方法。
この部分のテストはフリーキーだけど、それもうなづける内容。スレッドプールは本当に優秀だな。しかし、残念ながら、なぜ、このエラーが発生したのか知りたいところ。コードをもっと読んでみよう。