189
220

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

C# 並行・並列プログラミング パターン集

Last updated at Posted at 2017-04-15

Task, Parallel, PLINQの使い方集。ついでに.NETの非同期APIの歴史。

Task

Taskの使い方

同期コードをTaskを使って非同期コードにする

同期バージョン(ボタンを押した後ダウンロードが終わるまで画面が固まる)

private void button1_Click(object sender, EventArgs e)
{
    WebClient wc = new WebClient();
    var html = wc.DownloadString("http://www.google.co.jp/");
    textBox1.Text = html;
}

非同期バージョン(画面が固まらない)

private async void button1_Click(object sender, EventArgs e)
{
    WebClient wc = new WebClient();
    var html = await wc.DownloadStringTaskAsync("http://www.google.co.jp/");
    textBox1.Text = html;
}
  • メソッドに async を付ける。
  • TaskAsyncメソッドを使い、await を付ける。
  • メソッドの戻り値を Task にする ※上記のコードはイベントハンドラーなので仕方なくvoidのまま

非同期Taskを並行実行する

複数のWebサイトから同時にダウンロードするサンプル。

async Task ConcurrentDownload()
{
    var urls = new[]{
        "http://www.amazon.com/",
        "http://www.apple.com/",
        "http://www.facebook.com/",
        "http://www.google.com/",
        "http://www.microsoft.com/",
        "http://www.twitter.com/",
    };
    var hc = new HttpClient();
    var downloadTasks = urls.Select(url => hc.GetStringAsync(url));
    string[] htmls = await Task.WhenAll(downloadTasks);
}

最大同時並行数を抑えたい場合は、Semaphoreの仕組みが便利。

async Task ConcurrentDownloadThrottle()
{
    var urls = new[]{
        "http://www.amazon.com/",
        "http://www.apple.com/",
        "http://www.facebook.com/",
        "http://www.google.com/",
        "http://www.microsoft.com/",
        "http://www.twitter.com/",
    };
    var hc = new HttpClient();
    var sem = new SemaphoreSlim(2); // 最大同時実行数:2
    var downloadTasks = urls.Select(async url =>
    {
        await sem.WaitAsync();
        Debug.WriteLine($"Start: {url}");
        try
        {
            return await hc.GetStringAsync(url);
        }
        finally
        {
            Debug.WriteLine($"Completed: {url}");
            sem.Release();
        }
    });
    string[] htmls = await Task.WhenAll(downloadTasks);
}

Taskの待ち方

C# Taskの待ちかた集 に書いた

Taskの作り方

CPU主体の処理

CPU主体の処理には Task.Run() が便利。

Task task = Task.Run(() =>
            {
                HeavyWork();
            );

IO待ち主体の処理

IO主体の処理には別の手段が用意されている。※Task.Run() を使うとスレッドプールを有効活用できなくなる

/// 時刻 time になったら起こしてくれる Task
public Task<string> WakeOnTime(DateTime time)
{
    if (time < DateTime.Now)
    {
        // すぐに終わらせられる(完了している)なら Task.FromResult() を使う
    	return Task.FromResult("起きろ!"); 
    }

    // 待ち時間が発生するようなタスクには TaskCompletionSource<string> を使う
    var tcs = new TaskCompletionSource<string>();

    Timer timer = null;
    timer = new Timer(delegate
            {
                timer.Dispose();
                tcs.TrySetResult("起きて");
            });
    
    int waitMilliseconds = (int)(time - DateTime.Now).TotalMilliseconds;
    timer.Change(waitMilliseconds, Timeout.Infinite);
    return tcs.Task;
}

ほかの便利なもの:

  • Task.FromException() や Task.FromCanceled() もある
  • TaskCompletionSource<TResult>も同じように SetCanceled() や SetException() がある
  • 「完了しているタスク」Task.CompletedTask なんてのがあったり

Parallel

CPU主体の処理を並列実行したいときに有用。

Parallel.Invoke(
    new ParallelOptions() { MaxDegreeOfParallelism = 4 }, // 最大同時並列数:4
    HeavyWorkA, // Invoke の引数は Action
    () => { HeavyWorkB(100); }, // メソッドがActionでない場合は置き換えればよい
    () => 
    {
        // ここに直接処理を書いても良い
        int n = 400;
        HeavyWorkC(n);
    }
);

上記のコードを実行すると、HeavyWorkA,B,Cが同時並行に動く。

※CPUのコア数やスレッドプールの空き状況によっては、同時に動かないこともある。

なお、Parallel クラスには Parallel.Invoke のほかに、Parallel.For や Parallel.ForEach もある。また、第一引数の ParallelOptions は省略可能。

ParallelEnumerable (PLINQ)

LINQで並列処理ができる。

// ファイルのハッシュ値を計算するプログラム
var files = Directory.GetFiles(Environment.SystemDirectory, "*.exe");
var filehash = files
               .Select(f => new { File = f, Data = File.ReadAllBytes(f) }) // A
               .AsParallel()
               .WithDegreeOfParallelism(4) // 最大同時並列数:4
               .Select(f => new { File = f.File, Hash = SHA256.Create().ComputeHash(f.Data)}) // B
               .ToArray();

PLINQではAsParallel()以降のコードが並列化される。上記のコードでは、A(ファイル読み)はシングルスレッドで実行され、B(ハッシュ値計算)はマルチスレッドで実行される。

なお、AsParallel() のあと のWithDegreeOfParallelism() は省略可能。

例外処理

TaskやParallelの中で発生した例外は AggregateException としてcatchできる。

try
{
    Parallel.Invoke(
        () => throw new ArgumentException(),
        () => throw new InvalidOperationException(),
        () => throw new FormatException()
    );
}
catch (AggregateException ae)
{
    // 原因となった Exception を知りたい場合は
    // Flatten() や InnerExceptions が便利
    var exceptions = ae.Flatten().InnerExceptions;
    foreach (var ex in exceptions)
    {
        Debug.WriteLine(ex.GetType());
    }
}

ただし Task を await する場合は元の Exception で catch できる。

try
{
    var addresses = await Dns.GetHostAddressesAsync("example.jp");
}
catch(SocketException se)
{
    Debug.WriteLine(se.ToString());
}

Taskの中で発生した(catch されてない)例外をまとめて処理したいときは TaskScheduler.UnobservedTaskException

TaskScheduler.UnobservedTaskException += (sender, e) =>
{
    AggregateException ae = e.Exception;
    Debug.WriteLine(ae.ToString());
    e.SetObserved(); // .NET Framework 4.0だとこれをしないとアプリが死ぬようになってた
};

非同期パターンの歴史

.NETには古い非同期APIも残されている。間違って使ってしまうことがないよう、簡単に紹介しておく。
詳細は MSDN:Asynchronous Programming Patterns で。

.NET Framework 1.0時代:Asynchronous Programming Model (APM)

Beginなんとか~Endなんとか。使うな

var fs = new FileStream(@"test.jpg", FileMode.Open);
byte[] buf = new byte[65536];

fs.BeginRead(buf, 0, buf.Length, 
(ar) =>
{
    int readbytes = fs.EndRead(ar);
    buf.Dump();
}, null);

.NET Framework 2.0~3.5時代:Event-based Asynchronous Pattern (EAP)

なんとかCompletedイベントとか。使うな

WebClient wc = new WebClient();
wc.DownloadStringCompleted += (sender, args) =>
{
    var html = args.Result;
    Console.WriteLine(html);
};
wc.DownloadStringAsync(new Uri("http://www.google.co.jp/"));

.NET Framework 4.5以降:Task-based Asynchronous Pattern (TAP)

async/await. 今から始めるならこれを使えばよい。 上2つのコードはTAPではこうなる。

var fs = new FileStream(@"test.jpg", FileMode.Open);
byte[] buf = new byte[65536];

int readbytes = await fs.ReadAsync(buf, 0, buf.Length);
string html = await wc.DownloadDataTaskAsync("http://www.google.co.jp/");
Console.WriteLine(html);
189
220
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
189
220

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?