はじめに
非同期処理(async/await
)を扱っていて、「非同期処理を扱いたいが並行で複数走らないように上手く制御したい」という場面があります。たとえば「リソースをダウンロードしたいが一気に通信せずに1つずつ順番にダウンロードしたい」みたいなシチュエーションです。
(URLリストをダウンロードするんだけど、まとめてダウンロードせずに順番にダウンロードしたい、みたいな)
そのような「直列でタスクを実行する機構」を作るとしたときに、どんな作り方ができるのかを紹介します。
前提条件
- UniTaskを使うことを想定しています
実装方法
Queueを使う
まずは一番最初に思いつくであろう、Queue<T>
を使って実装する方法です。
using System;
using System.Collections.Generic;
using System.Threading;
using Cysharp.Threading.Tasks;
using UnityEngine;
namespace TORISOUP.SequentialTaskExecutors
{
public sealed class QueueExecutor : IDisposable
{
private readonly Queue<Func<CancellationToken, UniTask>> _queue = new();
private readonly CancellationTokenSource _cancellationTokenSource = new();
private bool _isExecuting;
private bool _isDisposed;
private async UniTaskVoid ExecuteAsync(CancellationToken ct)
{
try
{
_isExecuting = true;
while (_queue.Count > 0 && !ct.IsCancellationRequested)
{
try
{
var task = _queue.Dequeue();
if (task != null) await task(ct);
}
catch (Exception e) when (e is not OperationCanceledException)
{
Debug.LogException(e);
}
}
}
finally
{
_isExecuting = false;
}
}
public void Register(Func<CancellationToken, UniTask> taskAction)
{
if (_isDisposed) throw new ObjectDisposedException(nameof(QueueExecutor));
_queue.Enqueue(taskAction);
if (!_isExecuting) ExecuteAsync(_cancellationTokenSource.Token).Forget();
}
public void Dispose()
{
if (_isDisposed) throw new ObjectDisposedException(nameof(QueueExecutor));
_isDisposed = true;
_cancellationTokenSource.Cancel();
_cancellationTokenSource.Dispose();
_queue.Clear();
}
}
}
public class Sample : MonoBehaviour
{
readonly QueueExecutor _executor = new QueueExecutor();
private void Start()
{
_executor.Register(async ct =>
{
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
Debug.Log("Do!");
});
_executor.Register(async ct =>
{
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
Debug.Log("Do!");
});
_executor.Register(async ct =>
{
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
Debug.Log("Do!");
});
destroyCancellationToken.Register(() => _executor.Dispose());
}
}
Queue
を用意して、それをwhile
で一個ずつ取り出して処理しているだけでやってることはシンプルです。
この実装の利点としてはQueue
をPriorityQueue
に変えれば優先度をつけて非同期処理を扱える点があげられます。他にも「直列でタスクを処理しつつ、同時にN個までは並行実行を許可したい」みたいな複雑な動作も実現可能です。
欠点としては「Queue
の中身が尽きたとき」のケアをしてあげないといけないところです。Queue
が空になるとループ処理が止まってしまうので、次にタスクを追加したタイミングで再度ループを発火させる必要があります。
IUniTaskAsyncEnumerableを使う
await
可能なQueue<T>
を用意できれば、「中身がある場合はそれを取り出し、ゼロ個の場合は新しく要素が追加されるのをawait
する」みたいな書き方できそうです。それを実現できるのがIUniTaskAsyncEnumerable<T>
であり、その実装としてはChannel
が使えます。
public sealed class ChannelExecutor : IDisposable
{
private readonly Channel<Func<CancellationToken, UniTask>> _channel =
Channel.CreateSingleConsumerUnbounded<Func<CancellationToken, UniTask>>();
private readonly ChannelWriter<Func<CancellationToken, UniTask>> _writer;
private readonly CancellationTokenSource _cancellationTokenSource = new();
private bool _isDisposed;
public ChannelExecutor()
{
_writer = _channel.Writer;
ExecuteAsync(_cancellationTokenSource.Token).Forget();
}
private async UniTaskVoid ExecuteAsync(CancellationToken ct)
{
// Channelは非同期的なQueueとしての性質を持つ
// 要素がゼロ個の場合は新しく追加するまでawaitしてくれる
await foreach (var f in _channel.Reader.ReadAllAsync(ct))
{
try
{
await f(ct);
}
catch (Exception e) when (e is not OperationCanceledException)
{
Debug.LogException(e);
}
}
}
public void Register(Func<CancellationToken, UniTask> taskAction)
{
if (_isDisposed) throw new ObjectDisposedException(nameof(ChannelExecutor));
_writer.TryWrite(taskAction);
}
public void Dispose()
{
if (_isDisposed) throw new ObjectDisposedException(nameof(ChannelExecutor));
_isDisposed = true;
_cancellationTokenSource.Cancel();
_cancellationTokenSource.Dispose();
}
}
public class Sample : MonoBehaviour
{
readonly ChannelExecutor _executor = new ChannelExecutor();
private void Start()
{
_executor.Register(async ct =>
{
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
Debug.Log("Do!");
});
_executor.Register(async ct =>
{
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
Debug.Log("Do!");
});
_executor.Register(async ct =>
{
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
Debug.Log("Do!");
});
destroyCancellationToken.Register(() => _executor.Dispose());
}
}
利点はQueue
を使った実装よりもシンプルであること。欠点はただ積まれた順番にしか処理ができないのでPriority
の指定などができないことです。一応LINQを使って挙動をカスタマイズも可能ではありますが、この用途での拡張性はそこまで期待できないかも。
まとめ
-
Queue<T>
を使って実装するとちょっと管理が面倒くさいけど拡張性がある -
Channel<T>
を使うとIUniTaskAsyncEnumerable
の利点を活かせるが、拡張性はそんなにないかも
一長一短なので好きな方法で実装すればよいかと思います。
おまけ:SequentialTaskExecutor
今回の紹介で出てきたChannelExecutor
を調子にのってどんどん機能を拡張していったらできあがったものです。せっかくなのでライブラリとして公開しておきます。
using System;
using System.Threading;
using Cysharp.Threading.Tasks;
namespace TORISOUP.SequentialTaskExecutors
{
public sealed class SequentialTaskExecutor<T> : IDisposable
{
private readonly Channel<AsyncAction<T>> _channel;
private readonly ChannelWriter<AsyncAction<T>> _writer;
private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly object _gate = new();
private bool _isDisposed;
private bool _isExecuting;
public SequentialTaskExecutor()
{
_channel = Channel.CreateSingleConsumerUnbounded<AsyncAction<T>>();
_writer = _channel.Writer;
}
public void Execute()
{
lock (_gate)
{
if (_isDisposed) throw new ObjectDisposedException(nameof(SequentialTaskExecutor<T>));
if (_isExecuting) return;
_isExecuting = true;
}
ExecuteLoopAsync(_cancellationTokenSource.Token).Forget();
}
private async UniTaskVoid ExecuteLoopAsync(CancellationToken ct)
{
await foreach (var action in _channel.Reader.ReadAllAsync())
{
if (ct.IsCancellationRequested)
{
action.AutoResetUniTaskCompletionSource.TrySetCanceled(ct);
continue;
}
try
{
if (action.CancellationToken.IsCancellationRequested) continue;
var token = ct;
if (action.CancellationToken != CancellationToken.None)
{
token = CancellationTokenSource.CreateLinkedTokenSource(action.CancellationToken, ct).Token;
}
var result = await action.Action(token);
action.AutoResetUniTaskCompletionSource.TrySetResult(result);
}
catch (OperationCanceledException ex)
{
action.AutoResetUniTaskCompletionSource.TrySetCanceled(ex.CancellationToken);
}
catch (Exception e)
{
action.AutoResetUniTaskCompletionSource.TrySetException(e);
}
}
}
public UniTask<T> RegisterAsync(Func<CancellationToken, UniTask<T>> func,
CancellationToken cancellationToken = default)
{
lock (_gate)
{
if (_isDisposed) throw new ObjectDisposedException(nameof(SequentialTaskExecutor<T>));
}
var autoResetUniTaskCompletionSource = AutoResetUniTaskCompletionSource<T>.Create();
cancellationToken.Register(() => autoResetUniTaskCompletionSource.TrySetCanceled(cancellationToken));
_writer.TryWrite(new AsyncAction<T>(func, autoResetUniTaskCompletionSource, cancellationToken));
return autoResetUniTaskCompletionSource.Task;
}
public void Dispose()
{
lock (_gate)
{
if (_isDisposed) return;
try
{
_writer.TryComplete();
_cancellationTokenSource.Cancel();
_cancellationTokenSource.Dispose();
}
finally
{
_isDisposed = true;
}
}
}
private readonly struct AsyncAction<TU>
{
public Func<CancellationToken, UniTask<TU>> Action { get; }
public AutoResetUniTaskCompletionSource<TU> AutoResetUniTaskCompletionSource { get; }
public CancellationToken CancellationToken { get; }
public AsyncAction(Func<CancellationToken, UniTask<TU>> action,
AutoResetUniTaskCompletionSource<TU> autoResetUniTaskCompletionSource,
CancellationToken cancellationToken)
{
Action = action;
AutoResetUniTaskCompletionSource = autoResetUniTaskCompletionSource;
CancellationToken = cancellationToken;
}
}
}
}
using Cysharp.Threading.Tasks;
using UnityEngine;
public class Sample : MonoBehaviour
{
void Start()
{
// Executorの作成
var executor = new TORISOUP.SequentialTaskExecutors.SequentialTaskExecutor<int>();
// Queueに登録
executor.RegisterAsync(async _ =>
{
await UniTask.Delay(1000);
return 1;
});
// CancellationTokenを指定する場合
// この引数の「ct」はRegisterAsyncで渡したもの及びExecutorのDisposeの両方にリンクしている
executor.RegisterAsync(async ct =>
{
await UniTask.Delay(1000, cancellationToken: ct);
return 2;
}, destroyCancellationToken);
UniTask.Void(async () =>
{
// await可能
// この場合はRegisterAsyncで登録した非同期処理自体の完了を待機することになる
var result = await executor.RegisterAsync(async ct =>
{
await UniTask.Delay(1000, cancellationToken: ct);
return 3;
}, destroyCancellationToken);
// "3"が出力される
Debug.Log(result);
});
// Execute()を呼び出すことで実行開始
// 実行開始後にもRegisterAsync()で追加登録可能
executor.Execute();
destroyCancellationToken.Register(() =>
{
// Dispose()を呼び出すことで直列実行をすべて中止
// RegisterAsync()をawaitしている場合はそれもキャンセルされる
executor.Dispose();
});
}
}