こちらは Sansan Advent Calendar 2017 の7日目の記事です。
とある業務上の処理で非同期処理を複数並列で走らせたいと思って Parallel.ForEach
使おうとしたら
- 戻り値が Task じゃなくて void 返ってくるから async できないやん....
- SelectでTaskにくるんでWhenAllでもできるけど並列数とかの細かい制御できないし不安...
みたいな事で悩んでたら弊社のエースエンジニアである @chocolamint 先生から.NETのDataflowの存在を教えてもらったので、ちょっと調べてみました。
System.Threading.Tasks.Dataflow ってなんぞや?
System.Threading.Tasks.Dataflow は .NET の TPL(Task Parallel Libilary)の機能の一部で、そのうち Actor をベースにした非同期データフローを提供してくれるライブラリです。
ちなみに通常の .NET とリリースサイクルが異なり、規定の.NETには含まれておりません。
Nugetからの追加が別途必要になります。
ちなみに似たような名前の Microsoft.Tpl.Dataflow は旧Versionです。
https://github.com/dotnet/core/blob/ab5314af29352e61aa31ad38fd5058f4738ab5d4/release-notes/Archived%20Change%20Lists/Microsoft.Tpl.Dataflow.md
This package has been renamed to System.Threading.Tasks.Dataflow.
Source と Targets
DataflowはDataflowブロックというデータをバッファして処理するデータ構造によって成り立っています。大別して以下の3種類のDataflowブロックが存在します。
種類 | 概要 |
---|---|
source blocks |
ISourceBlock<out TOutput> を継承するやつ。データのソース。 |
target blocks |
ITargetBlock<in TInput> を継承するやつ。データのターゲット。 |
propagator blocks |
ISourceBlock<out TOutput> と ITargetBlock<in TInput> の両方を継承するやつ。ソースにもターゲットにもなれる。 |
ソースがデータの読み取り、ターゲットがデータの受け取り側です。
Dataflow の作り方
Dataflow を作るにはそれぞれのブロックをつなげる事でできます。
MSDNではつなげかたとして以下の2タイプを上げてました。
- シーケンシャルな pipeline
- いわゆる処理Aが完了したらBをやってそれも完了したらCも... みたいなやつ
- network の1種とみなせる
- 順次実行なら普通に実装するのと変わらないかというと異なり処理AからB、BからCといった感じのpipelineではにAからBにいった処理をやっている最中に次のAを並列で行えるためスループットが上げる事ができる
- グラフな network
- Aが終わったらBとCを走らせ両方完了したらDとか複雑な処理の流れを組める
pipeline,network ではデータが利用可能になるとソースがターゲットにデータを渡していきます。
それを実現するには具体的には ISourceBlock<TOutput>.LinkTo
で Source を Target につなげます。
この時 Source, Target はお互いに 0..n で接続を持てます。
また一度つながった Source と Target は接続を解く事もできます。
当初の目的を果たしてみる
では当初の目的通り ParallelForEach ではできなかった async の並列化をやってみたいと思います。
S3からテキストのファイルを複数取りに行って出力するような処理です。なので上記のように Dataflow を繋げたりとかは特に不要です。(説明しておいてなんだけど)
今回は TagetSource である ActionBlock しか使いません。
[Test]
public async Task Test()
{
var cancellationToken = CancellationToken.None;
var keys = new List<string>
{
"0000000001",
"0000000002",
"0000000003"
};
var result = await GetTextFileFromS3Async(keys, cancellationToken);
var value = string.Join("\n", result);
Console.WriteLine(value);
}
private static async Task<IEnumerable<string>> GetTextFileFromS3Async(IEnumerable<string> keys, CancellationToken cancellationToken)
{
var concurrentBag = new ConcurrentBag<string>();
// ITargetBlock<T> を継承する ActionBlock で key を受け取ってファイルの中身を読み取る
var actionBlock = new ActionBlock<string>(async key =>
{
using (var s3Client = new AmazonS3Client())
{
var response = await s3Client.GetObjectAsync("bucket-name", key, cancellationToken);
using (var reader = new StreamReader(response.ResponseStream))
{
concurrentBag.Add(reader.ReadToEnd());
}
}
},
// Select で Task にくるんで WaitAll と異なりこの辺の制御ができる。
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = Environment.ProcessorCount
});
foreach (var key in keys)
{
// actionBlock に key を渡して投げる
await actionBlock.SendAsync(key, cancellationToken);
}
// actionBlock にもう依頼は無い事を通知する。
actionBlock.Complete();
// actionBlock を await して全部終わるのを待つ。
await actionBlock.Completion;
return concurrentBag;
}
今回はシンプルな並列処理でしたが、 pipeline で粒度の細かい処理を並列化してスループットを上げたり network を組んで複雑なデータ処理フローを作成したりが楽にできそうですね。