Edited at

System.IO.Pipelinesを使う上での注意点


はじめに

System.IO.Pipelinesが世に出てから今までいじってきた結果として、気を付けるべき点を書く


ReadAsyncまたはTryReadの時にInvalidOperationExceptionが出る

PipeReaderの使い方は基本的には以下のようになる。

// PipeReader pr;

// 同期版はpr.TryRead(out var readResult)
var readResult = await pr.ReadAsync();
if(!readResult.Buffer.IsEmpty)
{
// 読出し
}
if(readResult.Buffer.IsEmpty && readResult.IsCompleted)
{
// 終了処理
}
// これをしないと次回のReadで例外
pr.AdvanceTo(readResult.Buffer.End);

この時、読出し処理のブロックの最後でpr.AdvanceTo(readResult.Buffer.End)しないと、ReadAsyncまたはTryReadの時にSystem.InvalidOperationException: Reading is already in progress.という例外が発生する。

データを次回に持ち越したい場合、、pr.AdvanceTo(readResult.Buffer.Slice([次回の開始点]).Start)を使用すればOK。


PipeReaderにデータが来ない

非同期でPipeReaderを受けてる時、いつまでたってもデータが来ない時がある。(await pr.ReadAsync()で待たされる)。

このような場合、PipeWriterの方でデータをフラッシュしていない場合がある。

pw.WriteAsync(data)を使用している場合は、内部的にFlushしているためこの問題は起こりえないが、pw.GetSpan()を駆使して同期的に処理している場合、ここに引っかかる可能性がある。なお、

対処としては、ある程度データが溜まったらpw.FlushAsync()すれば、実行時点で蓄積されたバッファがPipeReaderの方に流れる。

// async内ではSpanが使用できないため別関数にする

// 将来的にはこの制限が緩和される可能性あり
void WriteData(PipeWriter pw, ReadOnlySpan<byte> rsp)
{
var sp = pw.GetSpan(rsp.Length);
rsp.CopyTo(sp);
pw.Advance(rsp.Length);
}
// PipeWriter pw;
// byte[] data;
WriteData(ppw, data.AsSpan());
// 実行時点でPipeReaderにデータが流れるようになる
await pw.FlushAsync();

なお、Flushしない場合でも、Complete()した時点でバッファも一緒にFlushされるので、

データ長が有限であることがわかっている場合はあえて途中のFlushをしないという選択肢もあり得る。