概要
C#のRxライブラリのSubjectクラスを使ってみる自己学習用サンプルです。
環境
Windiws 10
Visual Studio Express 2017 for Windows Desktop
Nuget
Reactive Extensions (Rx) for .NET ver.4.2.0
ObserverパターンとRx
混乱するので、名前や処理の流れをまとめます。
観察される側 | 観察者(購読者) | |
---|---|---|
Observerパターン | Subject | Observer |
Rx | IObservable | IObserver |
Observerパターン
-
Subject
- AddObserver(Ovserver)
- NotifyObservers()
-
Observer
-
Update()
-
Subject
にObserver
を登録しておく。(AddObserver
の実行) -
Subject
でイベントが発生した場合、Observer
へ通知。(NotifyObservers
の実行) -
Observer
でUpdate
を受けてイベント処理実行。
Rx
-
IObservable
- Subscribe(IObserver)
-
IObserver
-
OnNext(T)
-
OnError(exception)
-
OnComplete()
-
IObserveravle
にIObserver
を登録しておく。(Subscribe
の実行) - イベントが発生した場合、
IOnserverable
がIObserver
へ通知(OnNext
、OnError
、OnComplete
等を実行する) -
IObserver
がOnNext
、OnError
、OnComplete
でイベント処理。
Subjectについて
名前が紛らわしいですが、Observer
と Observable
を併せ持ったオブジェクトです。Observableのように Subscribe
でき、かつObserverのように OnNext
OnError
OnComplete
できます。これを使っていきたいと思います。
実装
C言語でのテストソース(Rxなし)
以下のようなテスト用ソースファイルがあります。やっていることはフレームを回して指定したフレーム数で指定した処理を1回だけ行いたいということです。
# include <stdio.h>
# include <stdlib.h>
typedef _Bool bool;
typedef char int8;
typedef short nt16;
typedef long int32;
typedef unsigned char uint8;
typedef unsigned short uint16;
typedef unsigned int uint;
typedef unsigned long uint32;
# define true 1
# define false 0
// ワーク
typedef struct{
int mRNo;
int mFlag;
}SWork;
uint gFrameNow = 0;
// 現在のフレーム数取得
uint GetFrameNow(){ return(gFrameNow); }
// フレーム加算
void MoveFrame(){ gFrameNow++; }
// フレームチェック
bool CheckFrame(
int* _pFlag, // フラグ
int8 _FlagIndx, // フラグインデックス
uint _EndFrame // 指定フレーム
)
{
if( !((1 << _FlagIndx) & *_pFlag) & (_EndFrame <= GetFrameNow()) ){
*_pFlag |= (1 << _FlagIndx); // フラグを立てる
return(true);
}
return(false);
}
// フレーム処理
bool MoveProc(
SWork* _pWork
)
{
MoveFrame();
switch(_pWork->mRNo){
case 0:{
printf("Initialize\n");
_pWork->mRNo++;
}//break;
case 1:{
if( CheckFrame(&_pWork->mFlag, 0, 0) ){
printf("0frame proc.\n");
}
if( CheckFrame(&_pWork->mFlag, 1, 5) ){
printf("5frame proc.\n");
}
if( CheckFrame(&_pWork->mFlag, 2, 7)
||CheckFrame(&_pWork->mFlag, 3, 11)
){
printf("7 or 11frame proc.\n");
}
if( GetFrameNow() > 15){
_pWork->mRNo++;
}
}break;
case 2:{
printf("Finalize\n");
return(false);
}break;
}
return(true);
}
//---------------------------------
// メイン
int main(void)
{
SWork _Work;
_Work.mRNo = 0;
_Work.mFlag = 0;
while(1){
printf("Frame = %d, mFlag = 0x%02x\n", GetFrameNow(), _Work.mFlag);
if( !MoveProc(&_Work))break;
}
return EXIT_SUCCESS;
}
これを gcc prog.c -Wall -Wextra -std=gnu11
で
コンパイル&実行すると結果は
Frame = 0, mFlag = 0x00
Initialize
0frame proc.
Frame = 1, mFlag = 0x01
Frame = 2, mFlag = 0x01
Frame = 3, mFlag = 0x01
Frame = 4, mFlag = 0x01
5frame proc.
Frame = 5, mFlag = 0x03
Frame = 6, mFlag = 0x03
7 or 11frame proc.
Frame = 7, mFlag = 0x07
Frame = 8, mFlag = 0x07
Frame = 9, mFlag = 0x07
Frame = 10, mFlag = 0x07
7 or 11frame proc.
Frame = 11, mFlag = 0x0f
Frame = 12, mFlag = 0x0f
Frame = 13, mFlag = 0x0f
Frame = 14, mFlag = 0x0f
Frame = 15, mFlag = 0x0f
Frame = 16, mFlag = 0x0f
Finalize
となります。想定したフレームで処理されているようです。
考察
CheckFrame
関数が使いにくそうです。フラグ用のビットインデックスを渡す実装なのも間違えそうです。
あとは行いたい処理を読むのが順次探す必要がありそうです。
C#のSubjectを使って書き直してみる
上記のテストソースをC#でSubjectを使い、書き直してみます。
指定したフレームでの処理をAction型で別にしています。
7と11でSubscribe
を2回設定しているのが気になりますが、Where
が (f == 7)||(f == 11)
にできないためこうなってしまいました。(テストソースではありえませんが、フレームが飛ぶ場合を考慮して)
Take
は処理を1回だけ通すためにいれてあります。
using System.Reactive.Subjects;
using System.Reactive.Linq;
namespace rxtest
{
public class RxProg
{
private Subject<int> subj = new Subject<int>();
private UInt32 frame = 0;
public UInt32 GetFrameNow() => frame;
public void MoveFrame() => frame++;
// コンストラクタ
public RxProg()
{
// Observerを登録
subj.DistinctUntilChanged().Where((f) => f == 0).Take(1).Subscribe(ProcInit);
subj.DistinctUntilChanged().Where((f) => f >= 0).Take(1).Subscribe(Proc0Frame);
subj.DistinctUntilChanged().Where((f) => f >= 5).Take(1).Subscribe(Proc5Frame);
subj.DistinctUntilChanged().Where((f) => f >= 7).Take(1).Subscribe(Proc7or11Frame);
subj.DistinctUntilChanged().Where((f) => f >= 11).Take(1).Subscribe(Proc7or11Frame);
subj.DistinctUntilChanged().Where((f) => f > 15).Take(1).Subscribe(ProcFinal, (ex) => { }, () => { this.subj.Dispose(); });
}
// ジェネリックデリゲートに処理を用意(直接ラムダ式でSubscribeに入れてもOK)
Action<int> ProcInit = (f) => { Console.WriteLine("Initialize"); };
Action<int> ProcFinal = (f) => { Console.WriteLine("Finalize"); };
Action<int> Proc0Frame = (f) => { Console.WriteLine("0frame proc."); };
Action<int> Proc5Frame = (f) => { Console.WriteLine("5frame proc."); };
Action<int> Proc7or11Frame = (f) => { Console.WriteLine("7 or 11frame proc."); };
public bool Move()
{
// フレーム更新
MoveFrame();
// 終了したか?
if ( subj.IsDisposed ) {
return (false);
}
else
{
// イベント通知
subj.OnNext((int)GetFrameNow());
}
return (true);
}
}
class Prog
{
static void Main(string[] args)
{
var _Inst = new RxProg();
while (true)
{
Console.WriteLine($"Frame = {_Inst.GetFrameNow()}");
if (!_Inst.Move()) break;
}
}
}
}
コンストラクタで処理を登録してMove()
でイベント通知OnNext
をしています。
実行結果は
Frame = 0
0frame proc.
Frame = 1
Frame = 2
Frame = 3
Frame = 4
5frame proc.
Frame = 5
Frame = 6
7 or 11frame proc.
Frame = 7
Frame = 8
Frame = 9
Frame = 10
7 or 11frame proc.
Frame = 11
Frame = 12
Frame = 13
Frame = 14
Frame = 15
Finalize
Frame = 16
となりました。Finalize
とFrame = 16
の表示順序が違いますが、想定したフレームで処理されているようです。
考察
フレーム数と処理が分離されてるが、Subscribe
登録が同じような処理で見にくい。
DistinctUntilChanged
の効果がわかりにくい。
まとめ
Subjectクラスの動作がObserverパターンを元に考えると分かりにくい気がします。
Subjectクラス
系にはAsyncSubjectクラス
やReplaySubjectクラス
といった種類があるようです。これらもいずれ使っていきたいと思います。