C#
.NET
非同期処理
マルチスレッド

C# 並行・並列プログラミング パターン集

More than 1 year has passed since last update.

Task, Parallel, PLINQの使い方集。ついでに.NETの非同期APIの歴史。

2017/4/20:
- 非同期Taskの並行実行 を追記
- Parallel, PLINQの最大同時並列数について追記
- C# Taskの待ちかた集 へのリンクを追記

Task

Taskの使い方

同期コードをTaskを使って非同期コードにする

同期バージョン(ボタンを押した後ダウンロードが終わるまで画面が固まる)

private void button1_Click(object sender, EventArgs e)
{
    WebClient wc = new WebClient();
    var html = wc.DownloadString("http://www.google.co.jp/");
    textBox1.Text = html;
}

非同期バージョン(画面が固まらない)

private async void button1_Click(object sender, EventArgs e)
{
    WebClient wc = new WebClient();
    var html = await wc.DownloadStringTaskAsync("http://www.google.co.jp/");
    textBox1.Text = html;
}
  • メソッドに async を付ける。
  • TaskAsyncメソッドを使い、await を付ける。
  • メソッドの戻り値を Task にする ※上記のコードはイベントハンドラーなので仕方なくvoidのまま

非同期Taskを並行実行する

複数のWebサイトから同時にダウンロードするサンプル。

async Task ConcurrentDownload()
{
    var urls = new[]{
        "http://www.amazon.com/",
        "http://www.apple.com/",
        "http://www.facebook.com/",
        "http://www.google.com/",
        "http://www.microsoft.com/",
        "http://www.twitter.com/",
    };
    var hc = new HttpClient();
    var downloadTasks = urls.Select(url => hc.GetStringAsync(url));
    string[] htmls = await Task.WhenAll(downloadTasks);
}

最大同時並行数を抑えたい場合は、Semaphoreの仕組みが便利。

async Task ConcurrentDownloadThrottle()
{
    var urls = new[]{
        "http://www.amazon.com/",
        "http://www.apple.com/",
        "http://www.facebook.com/",
        "http://www.google.com/",
        "http://www.microsoft.com/",
        "http://www.twitter.com/",
    };
    var hc = new HttpClient();
    var sem = new SemaphoreSlim(2); // 最大同時実行数:2
    var downloadTasks = urls.Select(async url =>
    {
        await sem.WaitAsync();
        Debug.WriteLine($"Start: {url}");
        try
        {
            return await hc.GetStringAsync(url);
        }
        finally
        {
            Debug.WriteLine($"Completed: {url}");
            sem.Release();
        }
    });
    string[] htmls = await Task.WhenAll(downloadTasks);
}

Taskの待ち方

C# Taskの待ちかた集 に書いた

Taskの作り方

CPU主体の処理

CPU主体の処理には Task.Run() が便利。

Task task = Task.Run(() =>
            {
                HeavyWork();
            );

IO待ち主体の処理

IO主体の処理には別の手段が用意されている。※Task.Run() を使うとスレッドプールを有効活用できなくなる

/// 時刻 time になったら起こしてくれる Task
public Task<string> WakeOnTime(DateTime time)
{
    if (time < DateTime.Now)
    {
        // すぐに終わらせられる(完了している)なら Task.FromResult() を使う
        return Task.FromResult("起きろ!"); 
    }

    // 待ち時間が発生するようなタスクには TaskCompletionSource<string> を使う
    var tcs = new TaskCompletionSource<string>();

    Timer timer = null;
    timer = new Timer(delegate
            {
                timer.Dispose();
                tcs.TrySetResult("起きて");
            });

    int waitMilliseconds = (int)(time - DateTime.Now).TotalMilliseconds;
    timer.Change(waitMilliseconds, Timeout.Infinite);
    return tcs.Task;
}

ほかの便利なもの:

  • Task.FromException() や Task.FromCanceled() もある
  • TaskCompletionSource<TResult>も同じように SetCanceled() や SetException() がある
  • 「完了しているタスク」Task.CompletedTask なんてのがあったり

Parallel

CPU主体の処理を並列実行したいときに有用。

Parallel.Invoke(
    new ParallelOptions() { MaxDegreeOfParallelism = 4 }, // 最大同時並列数:4
    HeavyWorkA, // Invoke の引数は Action
    () => { HeavyWorkB(100); }, // メソッドがActionでない場合は置き換えればよい
    () => 
    {
        // ここに直接処理を書いても良い
        int n = 400;
        HeavyWorkC(n);
    }
);

上記のコードを実行すると、HeavyWorkA,B,Cが同時並行に動く。

※CPUのコア数やスレッドプールの空き状況によっては、同時に動かないこともある。

なお、Parallel クラスには Parallel.Invoke のほかに、Parallel.For や Parallel.ForEach もある。また、第一引数の ParallelOptions は省略可能。

ParallelEnumerable (PLINQ)

LINQで並列処理ができる。

// ファイルのハッシュ値を計算するプログラム
var files = Directory.GetFiles(Environment.SystemDirectory, "*.exe");
var filehash = files
               .Select(f => new { File = f, Data = File.ReadAllBytes(f) }) // A
               .AsParallel()
               .WithDegreeOfParallelism(4) // 最大同時並列数:4
               .Select(f => new { File = f.File, Hash = SHA256.Create().ComputeHash(f.Data)}) // B
               .ToArray();

PLINQではAsParallel()以降のコードが並列化される。上記のコードでは、A(ファイル読み)はシングルスレッドで実行され、B(ハッシュ値計算)はマルチスレッドで実行される。

なお、AsParallel() のあと のWithDegreeOfParallelism() は省略可能。

例外処理

TaskやParallelの中で発生した例外は AggregateException としてcatchできる。

try
{
    Parallel.Invoke(
        () => throw new ArgumentException(),
        () => throw new InvalidOperationException(),
        () => throw new FormatException()
    );
}
catch (AggregateException ae)
{
    // 原因となった Exception を知りたい場合は
    // Flatten() や InnerExceptions が便利
    var exceptions = ae.Flatten().InnerExceptions;
    foreach (var ex in exceptions)
    {
        Debug.WriteLine(ex.GetType());
    }
}

ただし Task を await する場合は元の Exception で catch できる。

try
{
    var addresses = await Dns.GetHostAddressesAsync("example.jp");
}
catch(SocketException se)
{
    Debug.WriteLine(se.ToString());
}

Taskの中で発生した(catch されてない)例外をまとめて処理したいときは TaskScheduler.UnobservedTaskException

TaskScheduler.UnobservedTaskException += (sender, e) =>
{
    AggregateException ae = e.Exception;
    Debug.WriteLine(ae.ToString());
    e.SetObserved(); // .NET4.0だとこれをしないとアプリが死ぬようになってた
};

非同期パターンの歴史

.NETには古い非同期APIも残されている。間違って使ってしまうことがないよう、簡単に紹介しておく。
詳細は MSDN:Asynchronous Programming Patterns で。

.NET1.0時代:Asynchronous Programming Model (APM)

Beginなんとか~Endなんとか。使うな

var fs = new FileStream(@"test.jpg", FileMode.Open);
byte[] buf = new byte[65536];

fs.BeginRead(buf, 0, buf.Length, 
(ar) =>
{
    int readbytes = fs.EndRead(ar);
    buf.Dump();
}, null);

.NET2.0~3.5時代:Event-based Asynchronous Pattern (EAP)

なんとかCompletedイベントとか。使うな

WebClient wc = new WebClient();
wc.DownloadStringCompleted += (sender, args) =>
{
    var html = args.Result;
    Console.WriteLine(html);
};
wc.DownloadStringAsync(new Uri("http://www.google.co.jp/"));

.NET4.5以降:Task-based Asynchronous Pattern (TAP)

async/await. 今から始めるならこれを使えばよい。 上2つのコードはTAPではこうなる。

var fs = new FileStream(@"test.jpg", FileMode.Open);
byte[] buf = new byte[65536];

int readbytes = await fs.ReadAsync(buf, 0, buf.Length);
string html = await wc.DownloadDataTaskAsync("http://www.google.co.jp/");
Console.WriteLine(html);