3
2

More than 3 years have passed since last update.

【ReactiveExtensions】任意の時間間隔で値を発行する2つの方法

Posted at

ReactiveExtensionsには、等間隔で値を発行するObservable.Intervalファクトリメソッドはあるのですが、任意の間隔で値を発行するファクトリメソッドがありません。
いや、厳密にはあるのですが、使い方が複雑なのでここで紹介します。

任意の時間間隔で値を発行する2つの方法

方法1.Observable.Generateファクトリメソッドを使う

Observable.Generateファクトリメソッドの引数に、値の発行間隔を定義できるFunc<TSource, TimeSpan>型のtimeSelector引数があるので、任意の時間間隔で値を発行するIObservableを作り出すことができます。

Observable.Generateメソッドの定義と使い方

まずは、Observable.Generateメソッドの定義と基本的な使い方を説明します。

定義

一番簡単な定義は次のとおりです。

public static IObservable<TResult> Generate<TState, TResult> (
    TState initialState,                    //TStateの初期値を指定
    Func<TState, bool> condition,           //継続条件を指定
    Func<TState, TState> iterate,           //TStateの変化量を指定
    Func<TState, TResult> resultSelector)   //発光する値を指定

使用例

例えば、0〜4の値を発行するには次のようにします。

Observable.Generate(initialState: 0,            //初期値:0
                    condition: i => i < 5,      //継続条件:発行値が5より小さい
                    iterate: i => ++i,          //変化量:1ずつインクリメント
                    resultSelector: i => i)     //iをそのまま発行
          .Subscribe(i => Console.WriteLine(i),
                     () => Console.WriteLine("OnCompleted"));

まるでfor文のような引数の指定方法ですね。

実行結果

0
1
2
3
4
OnCompleted

Observable.Generateメソッドで値の発行間隔を指定する

本題はここからです。
Observable.Generateメソッドには、値の発行間隔を指定できるオーバーロードがあります。

値の発行間隔を指定できるオーバーロード定義

public static IObservable<TResult> Generate<TState, TResult> (
    TState initialState,                    //TStateの初期値を指定
    Func<TState, bool> condition,           //継続条件を指定
    Func<TState, TState> iterate,           //TStateの変化量を指定
    Func<TState, TResult> resultSelector,   //発光する値を指定
    Func<TState, TimeSpan> timeSelector)    //時間間隔を指定;

このtimeSelectorに発行間隔を定義したメソッドを指定できます。

使い方

どのように使うかですが、例えば発行する順番を定義したTimeSpanListを用意して、timeSelectorで値を順番に読んでいくように指定することで、Listに定義した間隔で値が発行されます。


class MainClass
{
    public static void Main(string[] args)
    {
        //発行間隔を定義
        List<TimeSpan> intervals = new List<TimeSpan>()
        {
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(2),
            TimeSpan.FromSeconds(3),
            TimeSpan.FromSeconds(4),
            TimeSpan.FromSeconds(5)
        };

        Console.WriteLine($"{DateTime.Now} 値の発行を開始します");
        Observable.Generate(initialState: 0,
                            condition: n => n < intervals.Count,
                            iterate: n => ++n,
                            resultSelector: n => n,
                            timeSelector: n => intervals[n])
                  .Timestamp()
                  .Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime} 発行された値:{val.Value}"),
                              () => Console.WriteLine("値の発行が完了しました"));

        Console.Read();
    }
}

実行結果

2020/04/26 23:51:09 値の発行を開始します
2020/04/26 23:51:10 発行された値:1
2020/04/26 23:51:12 発行された値:2
2020/04/26 23:51:15 発行された値:3
2020/04/26 23:51:19 発行された値:4
2020/04/26 23:51:24 発行された値:5
値の発行が完了しました

1,2,3,4,5秒間隔で値が発行されています。

簡単に任意の時間間隔で値を発行するファクトリメソッドを作る

Observable.Generateファクトリメソッドは引数が多くて面倒なので、簡単に任意の時間間隔で値を発行できるファクトリメソッドを作ります。

public static IObservable<int> AnyInterval(IReadOnlyList<TimeSpan> intervals) =>
    Observable.Generate(initialState: 0,
                        condition: n => n < intervals.Count,
                        iterate: n => ++n,
                        resultSelector: n => n,
                        timeSelector: n => intervals[n]);

待機したい時間の順番を定義したTimeSpan型リストを渡すだけで使えます。
ほんとは引数はIEnumerable<TimeSpan>にしたかったのですが、内部でCountとインデクサを使っていたのでやめました。

実際に使ってみるとこんな感じです。

ObservableEx.AnyInterval(new List<TimeSpan>{TimeSpan.FromSeconds(1),
                                            TimeSpan.FromSeconds(2),
                                            TimeSpan.FromSeconds(3),
                                            TimeSpan.FromSeconds(4),
                                            TimeSpan.FromSeconds(5) })
            .Timestamp()
            .Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime} 発行された値:{val.Value}"),
                        () => Console.WriteLine("値の発行が完了しました"));

これだと値を5個発行して終わりですが、無限に発行したければRepeat()オペレータを挟めばループしてくれます。

方法2.Observable.FromAsyncファクトリメソッドを使う

Observable.FromAsyncファクトリメソッドはTaskを引数にとってTaskが終了したらその戻り値を流してくれます。
そのため、Task.Delay等で時間を調整してやれば任意の時間間隔で値を発行するIObservableを作り出すことが可能です。

例えば、以下のような感じですね。


Observable.FromAsync(() => Task.Run(async () =>
{
    await Task.Delay(TimeSpan.FromSeconds(1));
    return Unit.Default;
}));

これだと1秒待ってUnitを発行します。
Unitを発行してOnCompletedしてしまうので、Repeatで無限に発行するようにすれば、ひとまず1秒間隔で値を発行するIObservableができます。


Observable.FromAsync(() => Task.Run(async () =>
{
    await Task.Delay(TimeSpan.FromSeconds(1));
    return Unit.Default;
}))
.Repeat();

あとはどのようにTask.Delayの引数を変えるかですが、外部変数とかを使うしかないんですかね?
他になにかいい方法があれば教えて下さい。

List<TimeSpan> intervals = new List<TimeSpan>()
{
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(2),
    TimeSpan.FromSeconds(3),
    TimeSpan.FromSeconds(4),
    TimeSpan.FromSeconds(5)
};
int n = 0;

Observable.FromAsync(() => Task.Run(async () =>
{
    await Task.Delay(intervals[n]);
    return n++;
}))
.Repeat(intervals.Count).Timestamp()
.Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime} 発行された値:{val.Value}"),
            () => Console.WriteLine("値の発行が完了しました"));

この方法だと、あまりきれいではありませんが、特定の条件下で発行する間隔を変えるなんてことも可能です。

List<TimeSpan> intervals = new List<TimeSpan>()
{
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(2),
    TimeSpan.FromSeconds(3),
    TimeSpan.FromSeconds(4),
    TimeSpan.FromSeconds(5)
};
int n = 0;

Observable.FromAsync(() => Task.Run(async () =>
{
    await Task.Delay(intervals[n]);
    if(条件) await Task.Delay(~); //特定の条件下で発行間隔を増やす
    return n++;
}))
.Repeat(intervals.Count).Timestamp()
.Subscribe(val => Console.WriteLine($"{val.Timestamp.ToLocalTime().DateTime} 発行された値:{val.Value}"),
            () => Console.WriteLine("値の発行が完了しました"));

すごく汚いですし副作用も多そうなので積極的に使うべきではありませんが、どうしてもという場合は仮で使えそうです。

まとめ

  • 基本的にObservable.Generateを使う方法でやれば任意の時間間隔で値を発行できる
  • 柔軟に発行間隔を変えたい場合などはObservable.FromAsyncを使えば自由度は高い、ただし副作用も多そうで注意が必要

こんな感じですかね…。
最後雑な感じで終わってしまいましたがFromAsyncを使った方法は自分でも疑問なのでアドバイスなどあればコメントくださると嬉しいです。

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2