LoginSignup
26
24

【Unity】複数の非同期処理を直列に実行する機構を作る

Posted at

はじめに

非同期処理(async/await)を扱っていて、「非同期処理を扱いたいが並行で複数走らないように上手く制御したい」という場面があります。たとえば「リソースをダウンロードしたいが一気に通信せずに1つずつ順番にダウンロードしたい」みたいなシチュエーションです。

DEMO
(URLリストをダウンロードするんだけど、まとめてダウンロードせずに順番にダウンロードしたい、みたいな)

そのような「直列でタスクを実行する機構」を作るとしたときに、どんな作り方ができるのかを紹介します。

前提条件

  • UniTaskを使うことを想定しています

実装方法

Queueを使う

まずは一番最初に思いつくであろう、Queue<T>を使って実装する方法です。

using System;
using System.Collections.Generic;
using System.Threading;
using Cysharp.Threading.Tasks;
using UnityEngine;

namespace TORISOUP.SequentialTaskExecutors
{
    public sealed class QueueExecutor : IDisposable
    {
        private readonly Queue<Func<CancellationToken, UniTask>> _queue = new();
        private readonly CancellationTokenSource _cancellationTokenSource = new();

        private bool _isExecuting;
        private bool _isDisposed;

        private async UniTaskVoid ExecuteAsync(CancellationToken ct)
        {
            try
            {
                _isExecuting = true;
                while (_queue.Count > 0 && !ct.IsCancellationRequested)
                {
                    try
                    {
                        var task = _queue.Dequeue();
                        if (task != null) await task(ct);
                    }
                    catch (Exception e) when (e is not OperationCanceledException)
                    {
                        Debug.LogException(e);
                    }
                }
            }
            finally
            {
                _isExecuting = false;
            }
        }

        public void Register(Func<CancellationToken, UniTask> taskAction)
        {
            if (_isDisposed) throw new ObjectDisposedException(nameof(QueueExecutor));
            _queue.Enqueue(taskAction);
            if (!_isExecuting) ExecuteAsync(_cancellationTokenSource.Token).Forget();
        }

        public void Dispose()
        {
            if (_isDisposed) throw new ObjectDisposedException(nameof(QueueExecutor));
            _isDisposed = true;
            _cancellationTokenSource.Cancel();
            _cancellationTokenSource.Dispose();
            _queue.Clear();
        }
    }
}
public class Sample : MonoBehaviour
{
    readonly QueueExecutor _executor = new QueueExecutor();

    private void Start()
    {
        _executor.Register(async ct =>
        {
            await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
            Debug.Log("Do!");
        });

        _executor.Register(async ct =>
        {
            await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
            Debug.Log("Do!");
        });

        _executor.Register(async ct =>
        {
            await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
            Debug.Log("Do!");
        });

        destroyCancellationToken.Register(() => _executor.Dispose());
    }
}

Queueを用意して、それをwhileで一個ずつ取り出して処理しているだけでやってることはシンプルです。

この実装の利点としてはQueuePriorityQueueに変えれば優先度をつけて非同期処理を扱える点があげられます。他にも「直列でタスクを処理しつつ、同時にN個までは並行実行を許可したい」みたいな複雑な動作も実現可能です。

欠点としては「Queueの中身が尽きたとき」のケアをしてあげないといけないところです。Queueが空になるとループ処理が止まってしまうので、次にタスクを追加したタイミングで再度ループを発火させる必要があります。

IUniTaskAsyncEnumerableを使う

await可能なQueue<T>を用意できれば、「中身がある場合はそれを取り出し、ゼロ個の場合は新しく要素が追加されるのをawaitする」みたいな書き方できそうです。それを実現できるのがIUniTaskAsyncEnumerable<T>であり、その実装としてはChannelが使えます。

IUniTaskAsyncEnumerableを使った実装
public sealed class ChannelExecutor : IDisposable
{
    private readonly Channel<Func<CancellationToken, UniTask>> _channel =
        Channel.CreateSingleConsumerUnbounded<Func<CancellationToken, UniTask>>();

    private readonly ChannelWriter<Func<CancellationToken, UniTask>> _writer;
    private readonly CancellationTokenSource _cancellationTokenSource = new();

    private bool _isDisposed;

    public ChannelExecutor()
    {
        _writer = _channel.Writer;
        ExecuteAsync(_cancellationTokenSource.Token).Forget();
    }

    private async UniTaskVoid ExecuteAsync(CancellationToken ct)
    {
        // Channelは非同期的なQueueとしての性質を持つ
        // 要素がゼロ個の場合は新しく追加するまでawaitしてくれる
        await foreach (var f in _channel.Reader.ReadAllAsync(ct))
        {
            try
            {
                await f(ct);
            }
            catch (Exception e) when (e is not OperationCanceledException)
            {
                Debug.LogException(e);
            }
        }
    }

    public void Register(Func<CancellationToken, UniTask> taskAction)
    {
        if (_isDisposed) throw new ObjectDisposedException(nameof(ChannelExecutor));
        _writer.TryWrite(taskAction);
    }

    public void Dispose()
    {
        if (_isDisposed) throw new ObjectDisposedException(nameof(ChannelExecutor));
        _isDisposed = true;
        _cancellationTokenSource.Cancel();
        _cancellationTokenSource.Dispose();
    }
}
public class Sample : MonoBehaviour
{
    readonly ChannelExecutor _executor = new ChannelExecutor();

    private void Start()
    {
        _executor.Register(async ct =>
        {
            await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
            Debug.Log("Do!");
        });

        _executor.Register(async ct =>
        {
            await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
            Debug.Log("Do!");
        });

        _executor.Register(async ct =>
        {
            await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
            Debug.Log("Do!");
        });


        destroyCancellationToken.Register(() => _executor.Dispose());
    }
}

利点はQueueを使った実装よりもシンプルであること。欠点はただ積まれた順番にしか処理ができないのでPriorityの指定などができないことです。一応LINQを使って挙動をカスタマイズも可能ではありますが、この用途での拡張性はそこまで期待できないかも。

まとめ

  • Queue<T>を使って実装するとちょっと管理が面倒くさいけど拡張性がある
  • Channel<T>を使うとIUniTaskAsyncEnumerableの利点を活かせるが、拡張性はそんなにないかも

一長一短なので好きな方法で実装すればよいかと思います。

おまけ:SequentialTaskExecutor

今回の紹介で出てきたChannelExecutorを調子にのってどんどん機能を拡張していったらできあがったものです。せっかくなのでライブラリとして公開しておきます。

SequentialTaskExecutor

using System;
using System.Threading;
using Cysharp.Threading.Tasks;

namespace TORISOUP.SequentialTaskExecutors
{
    public sealed class SequentialTaskExecutor<T> : IDisposable
    {
        private readonly Channel<AsyncAction<T>> _channel;
        private readonly ChannelWriter<AsyncAction<T>> _writer;
        private readonly CancellationTokenSource _cancellationTokenSource = new();
        private readonly object _gate = new();

        private bool _isDisposed;
        private bool _isExecuting;

        public SequentialTaskExecutor()
        {
            _channel = Channel.CreateSingleConsumerUnbounded<AsyncAction<T>>();
            _writer = _channel.Writer;
        }

        public void Execute()
        {
            lock (_gate)
            {
                if (_isDisposed) throw new ObjectDisposedException(nameof(SequentialTaskExecutor<T>));
                if (_isExecuting) return;
                _isExecuting = true;
            }

            ExecuteLoopAsync(_cancellationTokenSource.Token).Forget();
        }

        private async UniTaskVoid ExecuteLoopAsync(CancellationToken ct)
        {
            await foreach (var action in _channel.Reader.ReadAllAsync())
            {
                if (ct.IsCancellationRequested)
                {
                    action.AutoResetUniTaskCompletionSource.TrySetCanceled(ct);
                    continue;
                }

                try
                {
                    if (action.CancellationToken.IsCancellationRequested) continue;

                    var token = ct;
                    if (action.CancellationToken != CancellationToken.None)
                    {
                        token = CancellationTokenSource.CreateLinkedTokenSource(action.CancellationToken, ct).Token;
                    }

                    var result = await action.Action(token);
                    action.AutoResetUniTaskCompletionSource.TrySetResult(result);
                }
                catch (OperationCanceledException ex)
                {
                    action.AutoResetUniTaskCompletionSource.TrySetCanceled(ex.CancellationToken);
                }
                catch (Exception e)
                {
                    action.AutoResetUniTaskCompletionSource.TrySetException(e);
                }
            }
        }

        public UniTask<T> RegisterAsync(Func<CancellationToken, UniTask<T>> func,
            CancellationToken cancellationToken = default)
        {
            lock (_gate)
            {
                if (_isDisposed) throw new ObjectDisposedException(nameof(SequentialTaskExecutor<T>));
            }

            var autoResetUniTaskCompletionSource = AutoResetUniTaskCompletionSource<T>.Create();
            cancellationToken.Register(() => autoResetUniTaskCompletionSource.TrySetCanceled(cancellationToken));
            _writer.TryWrite(new AsyncAction<T>(func, autoResetUniTaskCompletionSource, cancellationToken));
            return autoResetUniTaskCompletionSource.Task;
        }

        public void Dispose()
        {
            lock (_gate)
            {
                if (_isDisposed) return;

                try
                {
                    _writer.TryComplete();
                    _cancellationTokenSource.Cancel();
                    _cancellationTokenSource.Dispose();
                }
                finally
                {
                    _isDisposed = true;
                }
            }
        }


        private readonly struct AsyncAction<TU>
        {
            public Func<CancellationToken, UniTask<TU>> Action { get; }
            public AutoResetUniTaskCompletionSource<TU> AutoResetUniTaskCompletionSource { get; }
            public CancellationToken CancellationToken { get; }

            public AsyncAction(Func<CancellationToken, UniTask<TU>> action,
                AutoResetUniTaskCompletionSource<TU> autoResetUniTaskCompletionSource,
                CancellationToken cancellationToken)
            {
                Action = action;
                AutoResetUniTaskCompletionSource = autoResetUniTaskCompletionSource;
                CancellationToken = cancellationToken;
            }
        }
    }
}
使用例
using Cysharp.Threading.Tasks;
using UnityEngine;

public class Sample : MonoBehaviour
{
    void Start()
    {
        // Executorの作成
        var executor = new TORISOUP.SequentialTaskExecutors.SequentialTaskExecutor<int>();

        // Queueに登録
        executor.RegisterAsync(async _ =>
        {
            await UniTask.Delay(1000);
            return 1;
        });

        // CancellationTokenを指定する場合
        // この引数の「ct」はRegisterAsyncで渡したもの及びExecutorのDisposeの両方にリンクしている
        executor.RegisterAsync(async ct =>
        {
            await UniTask.Delay(1000, cancellationToken: ct);
            return 2;
        }, destroyCancellationToken);

        UniTask.Void(async () =>
        {
            // await可能
            // この場合はRegisterAsyncで登録した非同期処理自体の完了を待機することになる
            var result = await executor.RegisterAsync(async ct =>
            {
                await UniTask.Delay(1000, cancellationToken: ct);
                return 3;
            }, destroyCancellationToken);
            
            // "3"が出力される
            Debug.Log(result);
        });
    
        // Execute()を呼び出すことで実行開始
        // 実行開始後にもRegisterAsync()で追加登録可能
        executor.Execute();

        destroyCancellationToken.Register(() =>
        {
            // Dispose()を呼び出すことで直列実行をすべて中止
            // RegisterAsync()をawaitしている場合はそれもキャンセルされる
            executor.Dispose();
        });
    }
}
26
24
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
26
24