Posted at

C#のRxライブラリのSubjectクラスを使ってみる


概要

C#のRxライブラリのSubjectクラスを使ってみる自己学習用サンプルです。


環境

Windiws 10

Visual Studio Express 2017 for Windows Desktop


Nuget

Reactive Extensions (Rx) for .NET ver.4.2.0


ObserverパターンとRx

混乱するので、名前や処理の流れをまとめます。

観察される側
観察者(購読者)

Observerパターン
Subject
Observer

Rx
IObservable
IObserver


Observerパターン



  • Subject


    • AddObserver(Ovserver)

    • NotifyObservers()




  • Observer


    • Update()





  1. SubjectObserverを登録しておく。(AddObserverの実行)


  2. Subject でイベントが発生した場合、Observerへ通知。(NotifyObserversの実行)


  3. ObserverUpdateを受けてイベント処理実行。


Rx



  • IObservable


    • Subscribe(IObserver)




  • IObserver


    • OnNext(T)

    • OnError(exception)

    • OnComplete()





  1. IObserveravleIObserverを登録しておく。(Subscribeの実行)

  2. イベントが発生した場合、IOnserverableIObserverへ通知(OnNextOnErrorOnComplete等を実行する)


  3. IObserverOnNextOnErrorOnCompleteでイベント処理。


Subjectについて

名前が紛らわしいですが、ObserverObservable を併せ持ったオブジェクトです。Observableのように Subscribe でき、かつObserverのように OnNext OnError OnComplete できます。これを使っていきたいと思います。


実装


C言語でのテストソース(Rxなし)

以下のようなテスト用ソースファイルがあります。やっていることはフレームを回して指定したフレーム数で指定した処理を1回だけ行いたいということです。


prog.c

#include <stdio.h>

#include <stdlib.h>

typedef _Bool bool;
typedef char int8;
typedef short nt16;
typedef long int32;
typedef unsigned char uint8;
typedef unsigned short uint16;
typedef unsigned int uint;
typedef unsigned long uint32;
#define true 1
#define false 0

// ワーク
typedef struct{
int mRNo;
int mFlag;
}SWork;

uint gFrameNow = 0;

// 現在のフレーム数取得
uint GetFrameNow(){ return(gFrameNow); }
// フレーム加算
void MoveFrame(){ gFrameNow++; }

// フレームチェック
bool CheckFrame(
int* _pFlag, // フラグ
int8 _FlagIndx, // フラグインデックス
uint _EndFrame // 指定フレーム
)
{
if( !((1 << _FlagIndx) & *_pFlag) & (_EndFrame <= GetFrameNow()) ){
*_pFlag |= (1 << _FlagIndx); // フラグを立てる
return(true);
}
return(false);
}

// フレーム処理
bool MoveProc(
SWork* _pWork
)
{
MoveFrame();

switch(_pWork->mRNo){
case 0:{
printf("Initialize\n");
_pWork->mRNo++;
}//break;
case 1:{
if( CheckFrame(&_pWork->mFlag, 0, 0) ){
printf("0frame proc.\n");
}
if( CheckFrame(&_pWork->mFlag, 1, 5) ){
printf("5frame proc.\n");
}
if( CheckFrame(&_pWork->mFlag, 2, 7)
||CheckFrame(&_pWork->mFlag, 3, 11)
){
printf("7 or 11frame proc.\n");
}

if( GetFrameNow() > 15){
_pWork->mRNo++;
}
}break;
case 2:{
printf("Finalize\n");
return(false);
}break;
}

return(true);
}

//---------------------------------
// メイン
int main(void)
{
SWork _Work;
_Work.mRNo = 0;
_Work.mFlag = 0;

while(1){
printf("Frame = %d, mFlag = 0x%02x\n", GetFrameNow(), _Work.mFlag);
if( !MoveProc(&_Work))break;
}

return EXIT_SUCCESS;
}


これを gcc prog.c -Wall -Wextra -std=gnu11

コンパイル&実行すると結果は

Frame = 0, mFlag = 0x00

Initialize
0frame proc.
Frame = 1, mFlag = 0x01
Frame = 2, mFlag = 0x01
Frame = 3, mFlag = 0x01
Frame = 4, mFlag = 0x01
5frame proc.
Frame = 5, mFlag = 0x03
Frame = 6, mFlag = 0x03
7 or 11frame proc.
Frame = 7, mFlag = 0x07
Frame = 8, mFlag = 0x07
Frame = 9, mFlag = 0x07
Frame = 10, mFlag = 0x07
7 or 11frame proc.
Frame = 11, mFlag = 0x0f
Frame = 12, mFlag = 0x0f
Frame = 13, mFlag = 0x0f
Frame = 14, mFlag = 0x0f
Frame = 15, mFlag = 0x0f
Frame = 16, mFlag = 0x0f
Finalize

となります。想定したフレームで処理されているようです。


考察

CheckFrame関数が使いにくそうです。フラグ用のビットインデックスを渡す実装なのも間違えそうです。

あとは行いたい処理を読むのが順次探す必要がありそうです。


C#のSubjectを使って書き直してみる

上記のテストソースをC#でSubjectを使い、書き直してみます。

指定したフレームでの処理をAction型で別にしています。

7と11でSubscribeを2回設定しているのが気になりますが、Where(f == 7)||(f == 11)にできないためこうなってしまいました。(テストソースではありえませんが、フレームが飛ぶ場合を考慮して)

Takeは処理を1回だけ通すためにいれてあります。


prog.cs

using System.Reactive.Subjects;

using System.Reactive.Linq;

namespace rxtest
{
public class RxProg
{
private Subject<int> subj = new Subject<int>();
private UInt32 frame = 0;
public UInt32 GetFrameNow() => frame;
public void MoveFrame() => frame++;

// コンストラクタ
public RxProg()
{
// Observerを登録
subj.DistinctUntilChanged().Where((f) => f == 0).Take(1).Subscribe(ProcInit);
subj.DistinctUntilChanged().Where((f) => f >= 0).Take(1).Subscribe(Proc0Frame);
subj.DistinctUntilChanged().Where((f) => f >= 5).Take(1).Subscribe(Proc5Frame);
subj.DistinctUntilChanged().Where((f) => f >= 7).Take(1).Subscribe(Proc7or11Frame);
subj.DistinctUntilChanged().Where((f) => f >= 11).Take(1).Subscribe(Proc7or11Frame);
subj.DistinctUntilChanged().Where((f) => f > 15).Take(1).Subscribe(ProcFinal, (ex) => { }, () => { this.subj.Dispose(); });

}

// ジェネリックデリゲートに処理を用意(直接ラムダ式でSubscribeに入れてもOK)
Action<int> ProcInit = (f) => { Console.WriteLine("Initialize"); };
Action<int> ProcFinal = (f) => { Console.WriteLine("Finalize"); };
Action<int> Proc0Frame = (f) => { Console.WriteLine("0frame proc."); };
Action<int> Proc5Frame = (f) => { Console.WriteLine("5frame proc."); };
Action<int> Proc7or11Frame = (f) => { Console.WriteLine("7 or 11frame proc."); };

public bool Move()
{
// フレーム更新
MoveFrame();

// 終了したか?
if ( subj.IsDisposed ) {
return (false);
}
else
{
// イベント通知
subj.OnNext((int)GetFrameNow());
}

return (true);
}
}

class Prog
{
static void Main(string[] args)
{
var _Inst = new RxProg();
while (true)
{
Console.WriteLine($"Frame = {_Inst.GetFrameNow()}");
if (!_Inst.Move()) break;
}
}
}
}


コンストラクタで処理を登録してMove()でイベント通知OnNextをしています。

実行結果は

Frame = 0

0frame proc.
Frame = 1
Frame = 2
Frame = 3
Frame = 4
5frame proc.
Frame = 5
Frame = 6
7 or 11frame proc.
Frame = 7
Frame = 8
Frame = 9
Frame = 10
7 or 11frame proc.
Frame = 11
Frame = 12
Frame = 13
Frame = 14
Frame = 15
Finalize
Frame = 16

となりました。FinalizeFrame = 16の表示順序が違いますが、想定したフレームで処理されているようです。:grinning:


考察

フレーム数と処理が分離されてるが、Subscribe登録が同じような処理で見にくい。

DistinctUntilChangedの効果がわかりにくい。


まとめ

Subjectクラスの動作がObserverパターンを元に考えると分かりにくい気がします。

Subjectクラス系にはAsyncSubjectクラスReplaySubjectクラスといった種類があるようです。これらもいずれ使っていきたいと思います。