Reactive ExtensionsなしでThrottleだけほしい

  • 4
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

ライセンス

Apache License 2.0で。

シンプル版

こんなクラスを書いてあげれば、イベントベース+async/awaitスタイルの中で、「一定時間の間に発行されたイベントから最後のイベントのみを取り出す」的なことができると思う。

Throttle.cs
using System;
using System.Threading;
using System.Threading.Tasks;

namespace WhoToFollowApp
{
    public sealed class Throttle<T>
    {
        private readonly int millisec;
        private readonly Action<T> action;
        private long signalSequence;

        public Throttle(int millisec, Action<T> action)
        {
            this.millisec = millisec;
            this.action = action;
        }

        public async void Signal(T input)
        {
            var id = Interlocked.Increment(ref signalSequence);
            await Task.Delay(millisec);
            var current = Interlocked.Read(ref signalSequence);
            if (current == id)
            {
                action(input);
            }
        }
    }
}

Throttle<T>はたとえばこんな感じで使う。

using System;
using System.Windows.Forms;

namespace WhoToFollowApp
{
    public partial class Form1 : Form
    {
        private Throttle<string> throttle;

        public Form1()
        {
            InitializeComponent(); // textBox1とlistBox1を初期化するコード(省略)
            throttle = new Throttle<string>(1000, this.TextChangedThrottled);
        }

        private void textBox1_TextChanged(object sender, EventArgs e)
        {
            throttle.Signal(textBox1.Text);
        }

        private void TextChangedThrottled(string text)
        {
            listBox1.Items.Add(text);
        }
    }
}

TextBox.TextChangedイベントのイベントハンドラでThrottle<string>.Signal(string input)を呼んでいる。

イベント発火から一定期間(この例では1秒)待っている間に後続のイベントがなければ、最後のイベントによってコールバック(この例ではTextChangedThrottled(string text))が呼び出される。後続があれば期間がスライドする。

複雑版

Signal()を呼ぶたびにTaskが起きるのが嫌なら、面倒だけどこんなコードで。

Throttle.cs
using System;
using System.Threading;
using System.Threading.Tasks;

namespace WhoToFollowApp
{
    public sealed class Throttle<T> : IDisposable
    {
        private readonly AutoResetEvent producer = new AutoResetEvent(true);
        private readonly AutoResetEvent consumer = new AutoResetEvent(false);
        private Tuple<T, SynchronizationContext> value;

        private readonly int millisec;
        private readonly Action<T> action;
        private volatile bool disposed = false;
        private int producers = 0;

        public Throttle(int millisec, Action<T> action)
        {
            this.millisec = millisec;
            this.action = action;
            Task.Run(() => WaitAndFire());
        }

        private void WaitAndFire()
        {
            try
            {
                while (true)
                {
                    producer.Set();
                    consumer.WaitOne();
                    if (disposed) return;

                    while (true)
                    {
                        producer.Set();
                        var timedOut = !consumer.WaitOne(millisec);
                        if (disposed) return;
                        if (timedOut) break;
                    }
                    Fire(action, value.Item1, value.Item2);
                    value = null;
                }
            }
            catch (ObjectDisposedException)
            {
                // ignore and exit
            }
        }

        private static void Fire(Action<T> action, T input, SynchronizationContext context)
        {
            if (context != null)
            {
                context.Post(state => action((T)state), input);
            }
            else
            {
                Task.Run(() => action(input));
            }
        }

        public void Signal(T input)
        {
            if (disposed) return;
            try
            {
                Interlocked.Increment(ref producers);
                producer.WaitOne();
                if (disposed) return;
                value = Tuple.Create(input, SynchronizationContext.Current);
                consumer.Set();
            }
            catch (ObjectDisposedException)
            {
                // ignore and exit
            }
            finally
            {
                Interlocked.Decrement(ref producers);
            }
        }

        public void Dispose()
        {
            if (!disposed)
            {
                disposed = true;
                while (Interlocked.CompareExchange(ref producers, 0, 0) > 0)
                {
                    producer.Set();
                }
                consumer.Set();
                producer.Dispose();
                consumer.Dispose();
            }
        }
    }
}

Throttle<T>IDisposableを実装するようになったので、適宜破棄してやること。

using System;
using System.Windows.Forms;

namespace WhoToFollowApp
{
    public partial class Form1 : Form
    {
        private Throttle<string> throttle;

        public Form1()
        {
            InitializeComponent(); // textBox1とlistBox1を初期化するコード(省略)
            this.Load += (s, e) => throttle = new Throttle<string>(1000, this.TextChangedThrottled);
            this.FormClosed += (s, e) => throttle.Dispose();
        }

        private void textBox1_TextChanged(object sender, EventArgs e)
        {
            throttle.Signal(textBox1.Text);
        }

        private void TextChangedThrottled(string text)
        {
            listBox1.Items.Add(text);
        }
    }
}