1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

C#のRxライブラリのSubjectクラスを使ってみる

Posted at

概要

C#のRxライブラリのSubjectクラスを使ってみる自己学習用サンプルです。

環境

Windiws 10
Visual Studio Express 2017 for Windows Desktop

Nuget

Reactive Extensions (Rx) for .NET ver.4.2.0

ObserverパターンとRx

混乱するので、名前や処理の流れをまとめます。

観察される側 観察者(購読者)
Observerパターン Subject Observer
Rx IObservable IObserver

Observerパターン

  • Subject

    • AddObserver(Ovserver)
    • NotifyObservers()
  • Observer

  • Update()

  1. SubjectObserverを登録しておく。(AddObserverの実行)
  2. Subject でイベントが発生した場合、Observerへ通知。(NotifyObserversの実行)
  3. ObserverUpdateを受けてイベント処理実行。

Rx

  • IObservable

    • Subscribe(IObserver)
  • IObserver

  • OnNext(T)

  • OnError(exception)

  • OnComplete()

  1. IObserveravleIObserverを登録しておく。(Subscribeの実行)
  2. イベントが発生した場合、IOnserverableIObserverへ通知(OnNextOnErrorOnComplete等を実行する)
  3. IObserverOnNextOnErrorOnCompleteでイベント処理。

Subjectについて

名前が紛らわしいですが、ObserverObservable を併せ持ったオブジェクトです。Observableのように Subscribe でき、かつObserverのように OnNext OnError OnComplete できます。これを使っていきたいと思います。

実装

C言語でのテストソース(Rxなし)

以下のようなテスト用ソースファイルがあります。やっていることはフレームを回して指定したフレーム数で指定した処理を1回だけ行いたいということです。

prog.c
# include <stdio.h>
# include <stdlib.h>

typedef _Bool bool;
typedef char int8;
typedef short nt16;
typedef long int32;
typedef unsigned char uint8;
typedef unsigned short uint16;
typedef unsigned int uint;
typedef unsigned long uint32;
# define true 1
# define false 0

// ワーク
typedef struct{
    int mRNo;
    int mFlag;
}SWork;

uint gFrameNow = 0;

// 現在のフレーム数取得
uint GetFrameNow(){ return(gFrameNow); }
// フレーム加算
void MoveFrame(){ gFrameNow++; }

// フレームチェック
bool CheckFrame(
    int*    _pFlag,     // フラグ
    int8    _FlagIndx,  // フラグインデックス
    uint    _EndFrame   // 指定フレーム
    )
{
    if( !((1 << _FlagIndx) & *_pFlag) & (_EndFrame <= GetFrameNow()) ){
        *_pFlag |= (1 << _FlagIndx);    // フラグを立てる
        return(true);
    }
    return(false);
}

// フレーム処理
bool MoveProc(
    SWork* _pWork
    )
{
    MoveFrame();
 
    switch(_pWork->mRNo){
    case 0:{
        printf("Initialize\n");
        _pWork->mRNo++;
        }//break;
    case 1:{
        if( CheckFrame(&_pWork->mFlag, 0, 0) ){
            printf("0frame proc.\n");
        }
        if( CheckFrame(&_pWork->mFlag, 1, 5) ){
            printf("5frame proc.\n");
        }       
        if( CheckFrame(&_pWork->mFlag, 2, 7)
            ||CheckFrame(&_pWork->mFlag, 3, 11)
        ){
            printf("7 or 11frame proc.\n");
        }
        
        if( GetFrameNow() > 15){
            _pWork->mRNo++;
        }
        }break;
    case 2:{
        printf("Finalize\n");
        return(false);
        }break;
    }

    return(true);
}

//---------------------------------
// メイン
int main(void)
{
    SWork _Work;
    _Work.mRNo = 0;
    _Work.mFlag = 0;
    
    while(1){
        printf("Frame = %d, mFlag = 0x%02x\n", GetFrameNow(), _Work.mFlag);
        if( !MoveProc(&_Work))break;
    }
    
    return EXIT_SUCCESS;
}

これを gcc prog.c -Wall -Wextra -std=gnu11
コンパイル&実行すると結果は

Frame = 0, mFlag = 0x00
Initialize
0frame proc.
Frame = 1, mFlag = 0x01
Frame = 2, mFlag = 0x01
Frame = 3, mFlag = 0x01
Frame = 4, mFlag = 0x01
5frame proc.
Frame = 5, mFlag = 0x03
Frame = 6, mFlag = 0x03
7 or 11frame proc.
Frame = 7, mFlag = 0x07
Frame = 8, mFlag = 0x07
Frame = 9, mFlag = 0x07
Frame = 10, mFlag = 0x07
7 or 11frame proc.
Frame = 11, mFlag = 0x0f
Frame = 12, mFlag = 0x0f
Frame = 13, mFlag = 0x0f
Frame = 14, mFlag = 0x0f
Frame = 15, mFlag = 0x0f
Frame = 16, mFlag = 0x0f
Finalize

となります。想定したフレームで処理されているようです。

考察

CheckFrame関数が使いにくそうです。フラグ用のビットインデックスを渡す実装なのも間違えそうです。
あとは行いたい処理を読むのが順次探す必要がありそうです。

C#のSubjectを使って書き直してみる

上記のテストソースをC#でSubjectを使い、書き直してみます。

指定したフレームでの処理をAction型で別にしています。
7と11でSubscribeを2回設定しているのが気になりますが、Where(f == 7)||(f == 11)にできないためこうなってしまいました。(テストソースではありえませんが、フレームが飛ぶ場合を考慮して)
Takeは処理を1回だけ通すためにいれてあります。

prog.cs
using System.Reactive.Subjects;
using System.Reactive.Linq;

namespace rxtest
{
    public class RxProg
    {
        private Subject<int> subj = new Subject<int>();
        private UInt32 frame = 0;
        public UInt32 GetFrameNow() => frame;
        public void MoveFrame() => frame++;

        // コンストラクタ
        public RxProg()
        {
            // Observerを登録
            subj.DistinctUntilChanged().Where((f) => f == 0).Take(1).Subscribe(ProcInit);
            subj.DistinctUntilChanged().Where((f) => f >= 0).Take(1).Subscribe(Proc0Frame);
            subj.DistinctUntilChanged().Where((f) => f >= 5).Take(1).Subscribe(Proc5Frame);
            subj.DistinctUntilChanged().Where((f) => f >= 7).Take(1).Subscribe(Proc7or11Frame);
            subj.DistinctUntilChanged().Where((f) => f >= 11).Take(1).Subscribe(Proc7or11Frame);
            subj.DistinctUntilChanged().Where((f) => f > 15).Take(1).Subscribe(ProcFinal, (ex) => { }, () => { this.subj.Dispose(); });

        }

        // ジェネリックデリゲートに処理を用意(直接ラムダ式でSubscribeに入れてもOK)
        Action<int> ProcInit = (f) => { Console.WriteLine("Initialize"); };
        Action<int> ProcFinal = (f) => { Console.WriteLine("Finalize"); };
        Action<int> Proc0Frame = (f) => { Console.WriteLine("0frame proc."); };
        Action<int> Proc5Frame = (f) => { Console.WriteLine("5frame proc."); };
        Action<int> Proc7or11Frame = (f) => { Console.WriteLine("7 or 11frame proc."); };

        public bool Move()
        {
            // フレーム更新
            MoveFrame();

            // 終了したか?
            if ( subj.IsDisposed ) {
                return (false);
            }
            else
            {
                // イベント通知
                subj.OnNext((int)GetFrameNow());
            }

            return (true);
        }
    }

    class Prog
    {
        static void Main(string[] args)
        {
            var _Inst = new RxProg();
            while (true)
            {
                Console.WriteLine($"Frame = {_Inst.GetFrameNow()}");
                if (!_Inst.Move()) break;
            }
        }
    }
}

コンストラクタで処理を登録してMove()でイベント通知OnNextをしています。
実行結果は

Frame = 0
0frame proc.
Frame = 1
Frame = 2
Frame = 3
Frame = 4
5frame proc.
Frame = 5
Frame = 6
7 or 11frame proc.
Frame = 7
Frame = 8
Frame = 9
Frame = 10
7 or 11frame proc.
Frame = 11
Frame = 12
Frame = 13
Frame = 14
Frame = 15
Finalize
Frame = 16

となりました。FinalizeFrame = 16の表示順序が違いますが、想定したフレームで処理されているようです。:grinning:

考察

フレーム数と処理が分離されてるが、Subscribe登録が同じような処理で見にくい。
DistinctUntilChangedの効果がわかりにくい。

まとめ

Subjectクラスの動作がObserverパターンを元に考えると分かりにくい気がします。
Subjectクラス系にはAsyncSubjectクラスReplaySubjectクラスといった種類があるようです。これらもいずれ使っていきたいと思います。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?