C#

MCP 70-483 Programming in C# のお勉強 「1-1 マルチスレッド処理と非同期処理を実装する」

MCP試験 70-483 Programming in C# の学習材料。

目次はこちら

1-1 マルチスレッド処理と非同期処理を実装する

CPUを有効活用してレスポンスタイムやスループットを改善するための仕組み。

スレッド:System.Threading.Thread

生のスレッドを作る。

public static void Main()
{
    Thread thread = new Thread(() =>
    {
        // とても(重い|長い)処理
    });

    // スレッドを開始
    thread.Start();

    // スレッドの完了を待つ
    thread.Join();
}

スレッドプール:System.Threading.ThreadPool

生のスレッドの生成と破棄はコストが大きい。スレッドプールは予め用意されたスレッドを利用・再利用する仕組み。

public static void Main()
{
    ThreadPool.QueueUserWorkItem((state) =>
    {
        Console.WriteLine("とても(重い|長い)処理");
    });
}

終了を待ちたいときはシグナル(Auto/ManualResetEvent)を使う。

public static void Main()
{
    ManualResetEventSlim ev = new ManualResetEventSlim();
    ThreadPool.QueueUserWorkItem((state) =>
    {
        Console.WriteLine("とても(重い|長い)処理");
        ev.Set();
    });

    ev.Wait();
    Console.WriteLine("とても(重い|長い)処理が終わった");
}

パフォーマンス検証:スレッド vs スレッドプール

検証コード

static int tryCount = 10000;

public static void Main()
{
    Stopwatch sw;

    sw = Stopwatch.StartNew();
    RunCreateThread();
    Console.WriteLine($"スレッド: {sw.ElapsedMilliseconds} millisecs.");

    sw = Stopwatch.StartNew();
    RunThreadPool();
    Console.WriteLine($"スレッドプール: {sw.ElapsedMilliseconds} millisecs.");
}

// スレッドを自前で生成して使う
static void RunCreateThread()
{
    for (int i = 0; i < tryCount; i++)
    {
        Thread t = new Thread(() => { });
        t.Start();
        t.Join();
    }
}

// スレッドプールを使う
static void RunThreadPool()
{
    for (int i = 0; i < tryCount; i++)
    {
        ManualResetEventSlim ev = new ManualResetEventSlim();
        ThreadPool.QueueUserWorkItem((state) =>
        {
            ev.Set();
        });
        ev.Wait();
    }
}

検証結果

スレッド: 1583 millisecs.
スレッドプール: 17 millisecs.

スレッドを何回も使用する場合は、スレッドプールを使う方が何倍も速い。

Parallel:System.Threading.Tasks.Parallel

お手軽並列処理。Parallel.Invoke(), Parallel.For(), Parallel.ForEach() が使える。

public static void Main()
{
    Console.WriteLine("Parallel.Invoke()");
    Parallel.Invoke(
        () => { Console.WriteLine("100000007 is prime number? " + IsPrime(100000007)); },
        () => { Console.WriteLine("100000009 is prime number? " + IsPrime(100000009)); }
    );

    Console.WriteLine("Parallel.For()");
    Parallel.For(100000005, 100000010, (i) =>
    {
        Console.WriteLine($"{i} is prime number? " + IsPrime(i)); 
    });

    Console.WriteLine("Parallel.ForEach()");
    int[] nums = new int[] { 100000006, 100000007, 100000008, 100000009 };
    Parallel.ForEach(nums, (i) =>
    {
        Console.WriteLine($"{i} is prime number? " + IsPrime(i));
    });
}

static bool IsPrime(int n)
{
    for (int i = 2; i < n; i++)
    {
        if (n % i == 0) return false;
    }
    return true;
}

PLINQ:System.Linq.ParallelEnumerable

お手軽並列LINQ。ParallelとPLINQをTPL

public static void Main()
{
    // ParallelEnumerable を使う方法
    var primeNums = ParallelEnumerable.Range(100_000_000, 100)
                        .Where(IsPrime)
                        .ToArray();

    // AsParallel() を使う方法
    var primeNums2 = Enumerable.Range(100_000_000, 100)
                        .AsParallel()
                        .Where(IsPrime)
                        .ToArray();

    // ParallelEnumerable/AsParallel は基本的に順序を保証しないが、
    // AsOrdered() すると順序を保ってくれる。
    var primeNums3 = Enumerable.Range(100_000_000, 100)
                        .AsParallel()
                        .AsOrdered()
                        .Where(IsPrime)
                        .ToArray();

    foreach (var n in primeNums3)
    {
        Console.WriteLine(n);
    }
}

static bool IsPrime(int n)
{
    for (int i = 2; i < n; i++)
    {
        if (n % i == 0) return false;
    }
    return true;
}

UIスレッドをブロックしない

UIスレッドをブロックするボタンクリックイベントハンドラ

// www.microsoft.com の応答が遅いとUI応答が停止する(ウィンドウが固まるアレ)。
// これは画面を描画するスレッド(UIスレッドと言う)がダウンロード処理に占有されてしまうため。
private void button1_Click(object sender, EventArgs e)
{
    WebClient wc = new WebClient();
    string html = wc.DownloadString("http://www.microsoft.com/");
    textBox1.Text = html;
}

UIスレッドをブロックしないボタンクリックイベントハンドラ

// www.microsoft.com の応答が遅くてもウィンドウが固まらない。
// Task(async/await) を使うとダウンロード処理をUIスレッドではない別のスレッドで実行してくれる。
private async void button1_Click(object sender, EventArgs e)
{
    WebClient wc = new WebClient();

    // WebClient.DownloadStringTaskAsync() の戻り値の型は Task<string>
    // await キーワードを付けると Task の完了を待つと同時に string 型に変換してくれる。
    // Task の完了を待っている間、UIスレッドはブロックされない(ウィンドウは固まらない)。
    string html = await wc.DownloadStringTaskAsync("http://www.microsoft.com/");
    textBox1.Text = html;
}

ポイント:
- メソッドに async キーワードを付ける (付けないと await キーワードが使えない)
- 非同期IOメソッドに await キーワードを付ける (付けると、Task<string> ⇒ string )

Task:System.Threading.Tasks.Task

Task.Run() を使ってお手軽 Task 開始

private async void button1_Click(object sender, EventArgs e)
{
    // テキストボックスに入力された数値が素数かどうかを表示する。
    // 入力された数値が非常に大きい場合、
    // 計算に時間がかかりUIスレッドがブロックされてしまうので
    // Task.Run()を使って別スレッドに処理させる。
    bool isPrime = await Task.Run(() =>
    {
        long n = Int64.Parse(textBox1.Text);
        for (long i = 2; i < n; i++)
        {
            if (n % i == 0) return false;
        }
        return true;
    });

    textBox2.Text = isPrime ? "素数だね" : "素数じゃない";
}

Task.ContinueWith() を使って複数の Task を継続して実行させる。

private async void button1_Click(object sender, EventArgs e)
{
    // ダウンロードしたWebコンテンツからリンクURLを抜き出して表示する。
    // ダウンロードとスクレイピングには時間がかかる可能性がある。
    // ダウンロードが終わったらスクレイピング処理を実行させたい。
    Task<string> downloadTask = new WebClient().DownloadStringTaskAsync("https://msdn.microsoft.com/ja-jp/library/dd460693(v=vs.110).aspx");
    Task<string> scrapingTask = downloadTask.ContinueWith(dlTask =>
    {
        StringBuilder sb = new StringBuilder();
        foreach (Match m in Regex.Matches(dlTask.Result, "href=\"(.*?)\""))
        {
            sb.AppendLine(m.Groups[1].Value);
        }
        return sb.ToString();
    });

    textBox1.Text = await scrapingTask;
}

ConcurrentCollection:System.Collections.Concurrent.*

スレッドセーフでないオブジェクトに対し、複数のスレッドから同時に書き込もうとすると、内部のデータがおかしな状態になってしまうことがある。これをレース(競合)とか言ったりする。

List<T> はスレッドセーフなデータ構造ではない。

// マルチコアCPUなPCで実行すると毎回結果が変わる不思議なプログラム
public static void Main()
{
    int num = 1000000;

    List<int> oddNumbers = new List<int>();
    Parallel.For(0, num, i =>
    {
        if (i % 2 == 1) oddNumbers.Add(i);
    });
    Console.WriteLine($"List: {num}以下の奇数は{oddNumbers.Count}個です。");
}

ConcurrentBag<T> はスレッドセーフなデータ構造。

// 毎回正確な結果が得られる安全なプログラム。
public static void Main()
{
    int num = 1000000;

    ConcurrentBag<int> oddNumbers = new ConcurrentBag<int>();
    Parallel.For(0, num, i =>
    {
        if (i % 2 == 1) oddNumbers.Add(i);
    });
    Console.WriteLine($"ConcurrentBag: {num}以下の奇数は{oddNumbers.Count}個です。");
}

ConcurrentBag<T>のほか、BlockingCollection<T>, ConcurrentDictionary<TKey, TValue>, ConcurrentQueue<T>, ConcurrentStack<T> などもある。

その他覚えておいた方がよさそうなキーワード

  • TPL, TAP
  • CPU-bound, IO-bound

リソース

Quiz

  1. 要素数10000程度の文字列配列の各文字列のSHA-1ハッシュ値を計算したい。CPUを有効活用しようとするとき、どのAPIを使うのが良いか。

  2. Paralell または PLINQ で実行される同時並列数を制限したい。どうすればよいか。

  3. async キーワードを付けたメソッドの戻り値は、Task, Task<T>, void のいずれかである。ただし void は基本的に避けるべきとされている。なぜ避けるべきか?また void を使うのはどんなときか?

  4. スレッドを何度も使う場合、Thread よりも ThreadPool を使う方がパフォーマンスが良い。そして Thread よりも Task のほうが多機能。では Thread を使うはどんなとき?

  5. 下記のキーワードを整理しろ。

    • スレッド
    • 並列
    • 並行
    • 非同期