はじめに
1つのスレッドの中で処理Aをした後、イベント通知(例えばユーザーが「確認」ボタンを押すとか)をもらって、処理Bを行いたいときが、たまにあります。処理Aと処理Bでスレッドを分ければ await
使えるだろ、という意見もあると思いますが、処理A+処理Bを1つのメソッドで書きたい(例えばライブラリとか)場合もあると思います。このようなイベント通知をもらってスレッドを再開、というケースではawait
を使えず、シグナル(Signal)と呼ばれるメカニズムを使う必要があります。今回はそういう話です。
例:コンテンツのダウンロードを再開する
例として、インターネット上のあるコンテンツ http://hoge.example.com/contents.dat をダウンロードするWindowsアプリケーションがあるが、コンテンツが大きいのでまず1行目だけをダウンロード&表示し、ユーザーがそれを確認して「OK」を押した後、全体をダウンロードする、という処理があったとしよう。正直言って、こんな処理に一体どういう実用上の意味、価値があるか分かりませんが、他に良い例が思いつかなかったので、勘弁してください。
プログラム例としては、こんな感じになるでしょうか。エラー処理はすべて省きます。
public async Task<string> DownloadAsync()
{
using (var client = new HttpClient())
{
var response = await client.GetAsync(@"http://hoge.example.com/contents.dat");
using (var stream = new StreamReader(await response.Content.ReadAsStreamAsync()))
{
var first = stream.ReadLine();
FirstLineDownloaded?.Invoke(this, first);
}
// ①ユーザーから「OK」ボタンが押されるまで待ちたい
return await response.Content.ReadAsStringAsync();
}
}
public void DownloadAcceppted(object sender, EventArgs args)
{
// ②「OK」が押されたら、DownloadAsync()を再開したい
}
FirstLineDownload
は、1行目をダウンロードしたらどこかに通知するイベントハンドラで、今回の話では本質的ではないのでどうでもいいです。問題点は、コメント①②の部分をどう実装すれば良いか、という話になります。ちなみに「キャンセル」を押したらどう中断するのか、は何も考えていません。
System.Threading.CountdownEvent
①のように、イベントを受け取るまで処理を待つ(「待ち受け(await
)」ではない)には、イベント待機ハンドラー(Event Wait Handler)と呼ばれているものを使います。ハンドラ―とか名前が付いていますが、イベントハンドラ―とは関係がありません。イベント待機ハンドラ―は、C#にはいくつか種類がありますが、今回のケースでは、というかほとんどのケースでは System.Threading.CountdownEvent
を使えば良いです。
使い方は、プログラム例を見てもらった方が早いでしょう。
private readonly CountdownEvent condition = new CountdownEvent(1);
public async Task<string> DownloadAsync()
{
using (var client = new HttpClient())
{
var response = await client.GetAsync(@"http://hoge.example.com/contents.dat");
using (var stream = new StreamReader(await response.Content.ReadAsStreamAsync()))
{
var first = stream.ReadLine();
FirstLineDownloaded?.Invoke(this, first);
}
// ①ユーザーから「OK」ボタンが押されるまで待ちたい
condition.Wait();
return await response.Content.ReadAsStringAsync();
}
}
public void DownloadAcceppted(object sender, EventArgs args)
{
// ②「OK」が押されたら、DownloadAsync()を再開したい
condition.Signal();
Thread.Sleep(1);
condition.Reset();
}
①は、CountdownEvent#Wait()
で待つことができます。②で CountdownEvent#Signal()
でシグナル状態にすると、CountdownEvent#Wait()
で待っていたスレッドは再開をします。CountdownEvent
は一度シグナル状態になると、そのままシグナル状態になりっぱなしのため、CountdownEvent#Reset()
で非シグナル状態に戻します。CountdownEvent
を1回限りの使い捨ての場合は、Reset()
は不要です。Thread.Sleep(1)
を挟んでいるのは、Signal()
した後即座に Reset()
すると、Wait()
しているスレッドが再開する前に、非シグナル状態になってしまうときがあるので、ちょっと待つようにしていますが、本来はいらない処理のハズです。
シグナルの説明に関してはこれで全部なのですが、CountdownEvent
のコンストラクタで渡している 1
ってなんだ、が気になる人に補足すると、これは、シグナル状態になるまでの回数を指定できます。1
は1回で、つまり Signal()
1回でシグナル状態になります。3
なら Signal()
3回でシグナル状態です。Reset()
を呼ぶと、内部のカウンターが初期値に戻ります。また、1
以外の値を指定したいケースというのは、2つ以上のスレッドからシグナルをもらったときに処理を再開したい場合に使います。
AutoResetEventとManualResetEvent
イベント待機ハンドラ―は複数ある、と言ったのでもう少し書きます。C#のシグナル処理で古いページをみると、サンプルは AutoResetEvent
を使っているものが多いのではないでしょうか。AutoResetEvent
も、カウント1のCountdownEvent
と使い方はほとんど変わりません。
private readonly AutoResetEvent condition = new AutoResetEvent();
public async Task<string> DownloadAsync()
{
using (var client = new HttpClient())
{
var response = await client.GetAsync(@"http://hoge.example.com/contents.dat");
using (var stream = new StreamReader(await response.Content.ReadAsStreamAsync()))
{
var first = stream.ReadLine();
FirstLineDownloaded?.Invoke(this, first);
}
// ①ユーザーから「OK」ボタンが押されるまで待ちたい
condition.WaitOne();
return await response.Content.ReadAsStringAsync();
}
}
public void DownloadAcceppted(object sender, EventArgs args)
{
// ②「OK」が押されたら、DownloadAsync()を再開したい
condition.Set();
}
AutoResetEvent
の何が Auto
なのかと言うと、シグナル状態になった後、何かのスレッドが再開すれば自動的に非シグナル状態になるからです。つまり、Set()
でシグナル状態にした後、Reset()
を呼ぶ必要が無いということです。
ManualResetEvent
は、想像がつくとおり、手動で Reset()
を呼んで非シグナル状態にします。それ以外には、複数スレッドへのシグナルを通知とかができます。
EventWaitHandle
はどこ行ったんだ、というこの記事を読む必要が無い人に補足しておくと、EventWaitHandle
は AutoResetEvent
と ManualResetEvent
のスーパークラス(ベースクラス? C#は何て呼ぶんだ?)なので、同じものです。
CountdownEvnet v.s. AutoResetEvent
CountdownEvent
と AutoResetEvent
は何が違うのでしょうか。それはオーバーヘッドが違います。CountdownEvent
の方が、オーバーヘッドが小さいです(1/20くらい)。また、CountdownEvent
は .NET 4.0以降から使用可能です。つまり今の環境(多分 .NET 4.5以上)であれば、ほぼCountdownEvent
を使えば良い、ということになります。もし古いシステムで .NET 3.5 である場合は、AutoResetEvent
を使わなければならくなります(そもそも await/async と Task<T>
が使えませんが)。
ちなみに ManualResetEventSlim
という ManualResetEvent
の亜種がいるのですが、これは ManualResetEvent
のオーバーヘッド小さい版です。ManualResetEvent
を使おうとしている人は、ManualResetEventSlim
を使うことをお勧めします(ただし、.NET 4.0以降です)。
Producer-Consumer
他言語の経験がある人だと、C#のシグナル処理に何か違和感を感じるかもしれません。それは、ロックが全く絡んでいないからです。そういう訳で、ロックが絡むシグナル処理として、超有名な Producer-Consumer パターンを例にして説明します。
Producer-Consumer パターンとは?
ググれば山ほどヒットするので、いまさら私の下手くそな説明は不要な気がしますが、簡単に説明します。
データを生成する処理A(Producer)と、データを使用する処理B(Consumer)があって、別々に非同期でマルチスレッドで処理しようぜ、というパターンです。処理A(Producer)で生成したデータをどこかに置いておく必要があるのですが、それがバッファで通常はキューで実装されます。そしてこのバッファですが、複数スレッドからアクセスされるので、同期が必要で、同期オブジェクトに何を使ってどう同期すればいいでしょうか、という話です。
ここでもしかしたら、処理A+処理Bを1つのスレッドで処理してしまえば、バッファも同期も不要なんじゃない、と思った人、そこに気付いてしまったか。えぇ、その通りですとも! では、それでもなぜ処理Aと処理Bで別スレッドにしたいかというと、俺スレッド使いこなせるから、とドヤ顔できる 効率よくコンピューターリソースが使えるようになるからです。
コンピューターの世界では、入出力が絡むと処理Aと処理Bがほとんど同じ程度の処理時間なることはほとんどないです。例えばデータベースからSELECTして(処理A)、何かチェック処理する(処理B)場合、大抵外部I/Oがある処理Aは処理Bに比べ 10 倍くらい遅いです。この場合、処理Aにスレッドを多く割り当てることで、効率良く処理できるようになる訳です。とは言っても、スレッドをたくさん作ることでコストが上がるので、一概に効率良くなる訳ではありませんが(テストが難しくなる、という問題もある)。パフォーマンスがクリティカルな処理では、Producer-Consumerパターンを検討してみてもいいと思います。
プログラム例
Monitor.Wait()
で待ち、Monitor.Pulse()
or Monitor.PulseAll()
でシグナルを送ります。メソッドの引数はロックオブジェクトを指定します。スレッドの中断(キャンセル)を考えないと、次のような感じになるでしょうか。ちなみにキューに Queue<T>
ではなく List<T>
を使っているのは … Queue<T>
を使うと後悔するからです。
public class MyQueue<T>
{
private readonly List<T> queue = new List<T>();
private readonly int capacity;
private readonly object locker = new object();
public MyQueue(int capacity)
{
this.capacity = capacity;
}
public void Put(T data)
{
lock (locker)
{
while (queue.Count >= capacity)
Monitor.Wait(locker);
queue.Add(data);
Monitor.PulseAll(locker);
}
}
public T Poll()
{
lock (locker)
{
while (queue.Count <= 0)
Monitor.Wait(locker);
var data = queue[0];
queue.RemoveAt(0);
Monitor.PulseAll(locker);
return data;
}
}
}
ポイントとなるのは、Monitor.Wait()
の部分で、処理がここに入るとロックを解放して待ちに入ります。そして Monitor.Pulse()
or Monitor.PulseAll()
が呼ばれると、引数で指定したロックオブジェクトで Wait()
しているスレッドに通知し、ロックを獲得してから続きの処理をします。もし、Monitor.Wait()
ではなくイベント待機ハンドルである CountdownEvent#Wait()
を使ってしまうと、ロックを解放せず待ちに入ってしまうので、デッドロックになってしまいます。自前でロックの解放と獲得、すなわち Monitor.Enter()
と Monitor.Exit()
を CountdownEvent#Wait()
の前後で呼び出す、というやり方もありますが、ロックというリソースを手動で管理するのは結構大変です。Monitor.Enter()
はともかく Monitor.Exit()
は確実に呼び出されるようにしないと、ロック解放漏れというリソースリークを起こし、デッドロックになるからです。
また、Monitor
の Wait()
、 PulseAll()
の使い方は定型句があって、次のようになります。この形は丸暗記でもいいと思います。
lock (ロックオブジェクト)
{
while (ガード条件)
Monitor.Wait(ロックオブジェクト);
本体処理
}
(ガード条件)とは、(not 本体処理をする条件)で、例えばキューから取り出す処理(Poll
)ならば、キューが空だと処理ができないので、(not queue.Count > 0)
→ queue.Count <= 0
になります。
ガード条件の判定文が if
ではなく while
なのは、…忘れました。
ProducerとConsumerは、スレッド起動してPut()
したりPoll()
したりするだけなので、スレッドの終了・中断を考えなければ、特に難しいところはありません。
// キュー生成
var sharedQueue = new MyQueue<string>(5);
// Producerスレッド生成&開始
Task.Run(() => {
var data = new string[] {"foo", "bar", "baz"};
var index = 0;
// 適当な回数、dataを投入
for (int i=0; i<10; ++i)
{
sharedQueue.Put(data[index++ % 3]);
Thread.Sleep(100); // ちょっと待つ
}
});
// Consumerスレッド生成&開始
Task.Run(() => {
// スレッド終了を何も考えていない while (true)
// 実際は、キーが入力されたらスレッドを終了する、などが必要
while (true)
{
var data = sharedQueue.Poll();
Console.WriteLine($"Poll: {data}");
}
});
まとめ
C#の記事や書籍には、スレッドというと await/async & Task<T>
という非同期の話であり、同期オブジェクトの話が全然無いように思ったので、記事を書いてみました。また同期オブジェクトの話に集中したかったので、ロックの注意点については何も書かなかったのですが、C#の人々はどの程度ロックに慣れているのでしょうか?(私はあまりC#をやらないので、その辺の加減が良く分からない) この記事を見ての通り、同期オブジェクトは基本的な使い方であれば、何も難しいところはありません。
参考サイト
サイトというか本(E-book)なので、これ1冊読んでおけばいいんじゃないかな、という気にさせられる本。著者はC#の本を書いている有名な人のようです(アマゾンに沢山本があります)。日本語訳は、残念ながら Part1だけしかありません。
Javaの本ですが、スレッドの使い方やパターンがまとまっている良い本だと思います。C#版も誰か書いてくれ、というかC#のスレッドの本が少なすぎる。