C# でリアクティブプログラミングを自作してみる:Select と Where オペレーター
こんにちは、@studio_meowtoon です。今回は、WSL の Ubuntu 環境の C# でリアクティブプログラミングの機能を自作する方法を紹介します。
C# には本来厳格なコーディング規則がありますが、この記事では可読性のために、一部規則に沿わない表記方法を使用しています。ご注意ください。
開発環境
- Windows 11 Home 22H2 を使用しています。
WSL の Ubuntu を操作していきますので macOS の方も参考にして頂けます。
WSL (Microsoft Store アプリ版) ※ こちらの関連記事からインストール方法をご確認いただけます
> wsl --version
WSL バージョン: 1.0.3.0
カーネル バージョン: 5.15.79.1
WSLg バージョン: 1.0.47
Ubuntu ※ こちらの関連記事からインストール方法をご確認いただけます
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 22.04.1 LTS
Release: 22.04
.NET SDK ※ こちらの関連記事からインストール方法をご確認いただけます
$ dotnet --list-sdks
7.0.202 [/usr/share/dotnet/sdk]
$ dotnet --version
7.0.202
リアクティブプログラミングを自作
リアクティブプログラミングとは?
リアクティブプログラミングは、CPUのマルチコア化やクラウドの活用が進む中で、 非同期通信に基づいたアプリケーションの設計に適しているため、 これからのソフトウェア開発における重要な技術として注目されています。
リアクティブプログラミングはこれまでの手法と何が違うのか?
以下の疑似コードを見てください。
a = 1
b = 2 * a
a = 2
print b
これまでの命令型プログラミングの観点では以下の出力を得ます。
a = 1
b = 2 * a
a = 2
print b
出力結果: 2
対して、リアクティブプログラミングの観点では以下の出力を得ます。
a = 1
b = 2 * a
a = 2
print b
出力結果: 4
リアクティブプログラミングの観点とは?
リアクティブプログラミングの観点では |
---|
2行目の「b = 2 * a」というコードは「b を a の2倍として定義する」という意味で解釈されます。 |
つまり、a と b の関係性を定義した後は、「 a への代入」というイベントへのリアクションとして 「 b の再計算」を常にバックグラウンドで行います。 |
このようなパラダイムのことをリアクティブプログラミングと呼びます。 |
リアクティブプログラミングのメリット
これまでの命令型プログラミングでは、特に非同期の並行処理において様々な問題がありました。例えば次のような問題です。
命令型プログラミングの非同期の並行処理における問題点 |
---|
コードが複雑化する。 |
命令の実行順序を管理することができない。 |
幾重にもネストされたコールバック。(いわゆるコールバック地獄) |
リアクティブプログラミングを活用すると、今までとは異なるプログラミングパラダイムで上記の問題を解決するコードを記述することが可能になります。
とは言われるものの…🤔
一般的には先ほどのように非同期処理と絡めてリアクティブプログラミングの利点が紹介されるケースが多いですが、学習当初、個人的には全然実感がわきませんでした😭 そこで、この記事では自分が納得いく方法で説明を試みたいと思います😋
今までとは異なるプログラミングパラダイムって何だろう?🤔
これは個人的な見解で他のWEBサイトなどの受け売りではないのですが、Rx系のライブラリを利用すると
- for, foreach などのループ処理を書かないようになる
- if, else if などの分岐処理を書かないようになる
というコードに落ち着くかと思います。何故そうなるかを、これから自作のRx実装を試みながら説明していきたいと思います。
リアクティブプログラミングを実現する方法は?
一般的にリアクティブプログラミングの為に使用されるライブラリは、その根幹にオブザーバーパターンという考え方を持っています。
GoF のオブザーバーパターン (Observer pattern)
- オブザーバーパターンは 1995年に GoF によって提案された23のデザインパターンの一つであり、プログラムのインタフェースとしては古典的なものです。
- Subject(監視対象) と Observer(観察者) からなるパラダイムで、利用シーンとしては、GUIの設計においてデータベースの変更を検知して表示を自動更新したい場合などが代表的です。
オブザーバーパターンについては
- ここでは詳しく解説しません、別の一般的な資料を参照することをお勧めします。
- 理由は Rx では表向きオブザーバーパターンのインターフェイスを利用しますが、どうやら内部的にはそれに準拠しない実装で実現していると推測されるからです。(※その部分は後述します)
まず C# でシンプルなオブザーバーパターンを実装する
Ubuntu に C# 開発環境を構築する方法は、こちらの関連記事からご確認いただけます。
幸いなことに .NET では以下のインターフェイスが定義されています。
- IObserver<T> インターフェイス:
- 値を受け取る側が実装するインターフェイス
- IObservable<T> インターフェイス:
- 値を発行する側が実装するインターフェイス
最初にシンプルな IObserver 実装クラスを作成します。
public class HogeObserver : IObserver<string> {
public void OnCompleted() {
throw new NotImplementedException(); // ※実装省略
}
public void OnError(Exception error) {
throw new NotImplementedException(); // ※実装省略
}
// 変更通知を受け取るメソッド
public void OnNext(string value) {
Console.WriteLine("OnNext: " + value); // ここではコンソールに表示します
}
}
次にシンプルな IObservable 実装クラスを作成します。
public class HogeObservable : IObservable<string> {
// observer のリストを持ちます
List<IObserver<string>> _observers = new();
// リストに observer を追加します
public IDisposable Subscribe(IObserver<string> observer) {
_observers.Add(observer);
return null; // ※ この実装では IDisposable には対応しません
}
// 登録された observer に変更内容を通知する
public void Notify(string value) {
// ここではリストをループ処理しています
foreach (var observer in _observers) {
observer.OnNext(value);
}
}
}
Main メソッドから使用してみます。
class Program {
static void Main(string[] args) {
// Observable と Observer を使用
HogeObservable observable = new();
observable.Subscribe(new HogeObserver()); // 登録するだけです
observable.Notify("Hello"); // ※ Notify() は IObservable で定義されていません
observable.Notify("World"); // ※ Notify() は IObservable で定義されていません
}
}
コマンドラインで実行します。
$ dotnet run
OnNext: Hello
OnNext: World
ここまでの作業で、C# でオブザーバーパターンを実装することができました。
しかし、問題点があると思います。
実行するメソッド(※例えば Notify) がインターフェイスで定義されてない!
どうするか?🤔 ⇒ 仲介役を使用して実装します!😋
幸いなことに System.Reactive(Rx) パッケージで以下のインターフェイスが定義されています。
- ISubject<T>インターフェイス:
- 値を受け取る側 が実装するインターフェイス
- 値を発行する側 が実装するインターフェイス
どちらの操作も持つ ISubject インタフェースを使います。
System.Reactive パッケージを NuGet でインストールします。
$ dotnet add package System.Reactive
シンプルな ISubject 実装クラスを作成してみます。
class HogeSubject : ISubject<string> {
// observer のリストを持ちます
List<IObserver<string>> _observers = new();
public void OnCompleted() {
throw new NotImplementedException(); // ※実装省略
}
public void OnError(Exception error) {
throw new NotImplementedException(); // ※実装省略
}
// ここが呼ばれて初めて処理が走る
public void OnNext(string value) {
foreach (var observer in _observers) {
observer.OnNext(value);
}
}
// ここでは observer が登録されるだけ
public IDisposable Subscribe(IObserver<string> observer) {
_observers.Add(observer);
return null; // ※ IDisposable には対応してない
}
}
Main メソッドから使用してみます。
class Program {
static void Main(string[] args) {
// Subject と Observer を使用
HogeSubject subject = new();
subject.Subscribe(new HogeObserver());
subject.OnNext("Hello");
subject.OnNext("World");
}
}
コマンドラインで実行します。
$ dotnet run
OnNext: Hello
OnNext: World
ここまでの作業で、Subject オブジェクトの OnNext メソッドで処理が実行されるようになりました。
どういうこと?🤔
つまり Subject の OnNext() を呼ぶと Observer の OnNext() が呼ばれます。
public class HogeObserver : IObserver<string> {
// ※省略
public void OnNext(string value) {
Console.WriteLine("OnNext: " + value); // ここが呼ばれます
}
}
これは Subject から Observer にイベント通知されると表現されます。イベント通知という言葉で難しく説明されますが、単純に Subject が自分で Observer のリストを持っていて、それらをループ処理して Observer のメソッドを呼んでいます。
また、以下のクラス図のように HogeObserver クラスと HogeSubject クラスは直接依存しません。
ここまでのまとめ
- Subject が Observer を呼び出す。
- Pull 型で情報を通知します。
- Observer は Subject から呼ばれる。
- Push 型で情報が通知されます。
ここまでの作業で、Subject オブジェクトの OnNext() メソッドを呼べば、Observer オブジェクトの処理が走るところまで実装出来ました😋
ここから一気に Rx の実装に近づけます!😤
問題点:
IObserver インタフェースを実装するのめんどくさくないですか?😫
解決策:
それラムダ式で出来ます!😋
それでは C# のラムダ式を使用してみます。
class Program {
static void Main(string[] args) {
// Subject とラムダ式を使用
HogeSubject subject = new();
subject.Subscribe(x => Console.WriteLine("OnNext: " + x)); // ラムダ式
subject.OnNext("Hello");
subject.OnNext("World");
}
}
コマンドラインで実行します。
$ dotnet run
OnNext: Hello
OnNext: World
ISubject の実装だけでここまで出来ました😋
再びどういうこと?😭
Rx のライブラリで C# の拡張メソッドを使用した糖衣構文が用意されています。
public static class ObservableExtensions {
// ※省略
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext);
// ※省略
}
- Action デリゲートにラムダ式を渡すことで実現しています。
- 実際には内部的に IObserver の実装インスタンスを作成して onNext() で Action デリゲートが実行されます。
※ラムダ式については、こちらの以前の関連記事を参照頂けます。
ここまでのまとめ
内容 |
---|
Rx 系でよく見る subject.Subscribe(x => x への処理)) は、C# では内部的に IObserver の実装インスタンスが作成されています。 |
subject.Subscribe() に渡すデリゲートは、IObserver の文脈で onNext() として呼ばれます。 |
Subscribe() にラムダ式を記述しただけでは処理は走りません、別途 onNext() されてはじめて処理が走ります。 |
ちょっとまって😟
まだ全然 Rx っぽくないじゃないか?😕
Rx(リアクティブエクステンション)っぽいコードとは?🤔
ここまでの内容は非常に長~い前振りで、リアクティブプログラミングとは、一般的に以下のような操作が行われるコードだと思います。
// 処理の定義
HogeSubject subject = new();
subject.Where(x => x.Contains("H")) // "H"が含まれていたら
.Select(x => x + "ですねん") // "ですねん" を追加する
// Subscribe(※これまでの説明により、ここでは登録されるだけ)
.Subscribe(x => Console.WriteLine("OnNext: " + x));
// 処理の実行
subject.OnNext("Hello");
subject.OnNext("World");
$ dotnet run
OnNext: Helloですねん
とてもシンプルな例ですが、上記の動作を実装してみます。
実装するオペレーター | 内容 |
---|---|
xWhere() | フィルタリング系オペレーター |
xSelect() | メッセージ変換系オペレーター |
xSubscribe() | ついでに独自実装してみます |
まずデリゲートを受ける Observer を実装します。
// デリゲートを処理出来る IObserver の機能を提供します
public class AnonymousObserver<T> : IObserver<T> {
Action<T> _onNext = null;
public AnonymousObserver(Action<T> onNext) { // 引数がデリゲートである
_onNext = onNext;
}
public void OnCompleted() {
throw new NotImplementedException(); // ※実装省略
}
public void OnError(Exception error) {
throw new NotImplementedException(); // ※実装省略
}
public void OnNext(T value) {
_onNext(value); // デリゲートが実行されます
}
}
次にデリゲートを登録できる Subscribe() を拡張メソッドとして実装します。
※上記 AnonymousObserver クラスを使用して独自実装します。
// Subscribe() をシンプルに実装します
public static IDisposable xSubscribe<T>(this IObservable<T> self, Action<T> onNext) {
// IObserver の実装を生成します
var disposable = self.Subscribe(new AnonymousObserver<T>(onNext));
return disposable;
}
Main メソッドから使用してみます。
class Program {
static void Main(string[] args) {
// Subject とラムダ式を使用
HogeSubject subject = new();
subject.xSubscribe(x => Console.WriteLine("OnNext: " + x)); // ラムダ式
subject.OnNext("Hello");
subject.OnNext("World");
}
}
$ dotnet run
OnNext: Hello
OnNext: World
Microsoft 謹製の Rx ライブラリと同じようにラムダ式で記述できました😋
次にデリゲートを受ける Observable を実装します。
// デリゲートを処理出来る IObservable の機能を提供します
public class AnonymousObservable<T> : IObservable<T> {
Func<IObserver<T>, IDisposable> _subscribe;
// 引数がデリゲートである
public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe) {
_subscribe = subscribe;
}
public IDisposable Subscribe(IObserver<T> observer) {
// デリゲートが実行されます ※ネストされた Observable を持ちます
var disposable = _subscribe(observer);
return disposable;
}
}
デリゲートを登録できる Where() オペレーターを実装します。
// Where() オペレーターをシンプルに実装します
public static IObservable<T> xWhere<T>(this IObservable<T> self, Func<T, bool> predicate) {
var observable = new AnonymousObservable<T>(observer => {
var disposable = self.Subscribe(new AnonymousObserver<T>(value => {
// ここが OnNext される内容
// 条件に一致したときだけ observer に値を流します
if (predicate(value)) {
observer.OnNext(value);
}
}));
return disposable;
});
return observable; // self ではなく別の observable を返す
}
Main メソッドから使用してみます。
class Program {
static void Main(string[] args) {
// xWhere を使用
HogeSubject subject = new();
subject.xWhere(x => x.Contains("H")) // "H" が含まれる場合のみ
.Subscribe(x => Console.WriteLine("OnNext: " + x));
subject.OnNext("Hello");
subject.OnNext("World");
}
}
$ dotnet run
OnNext: Hello
ここまでの作業で、Rx の Where オペレーター相当の機能を実装できました😋
さらにデリゲートを登録できる Select() オペレーターを実装します。
// Select() オペレーターをシンプルに実装します
public static IObservable<TResult> xSelect<T, TResult>(this IObservable<T> self, Func<T, TResult> selector) {
var observable = new AnonymousObservable<TResult>(observer => {
var disposable = self.Subscribe(new AnonymousObserver<T>(value => {
// ここが OnNext される内容
// 渡ってきた値を加工して observer にその値を流します
TResult result = selector(value);
observer.OnNext(result);
}));
return disposable;
});
return observable; // self ではなく別の observable を返す
}
Main メソッドから使用してみます。
class Program {
static void Main(string[] args) {
// xSelect を使用
HogeSubject subject = new();
subject.xSelect(x => x + "ですねん") // "ですねん" を追加する
.Subscribe(x => Console.WriteLine("OnNext: " + x));
subject.OnNext("Hello");
subject.OnNext("World");
}
}
$ dotnet run
OnNext: Helloですねん
OnNext: Worldですねん
ここまでの作業で、Rx の Select オペレーター相当の機能を実装できました😋
さらに Where と Select オペレーターを組み合わせて実行してみます。
class Program {
static void Main(string[] args) {
// xWhere と xSelect を使用
HogeSubject subject = new();
subject.xWhere(x => x.Contains("H")) // "H" が含まれる場合のみ
.xSelect(x => x + "ですねん") // "ですねん" を追加する
.xSubscribe(x => Console.WriteLine("OnNext: " + x));
subject.OnNext("Hello");
subject.OnNext("World");
}
}
$ dotnet run
OnNext: Helloですねん
ここまでの作業で、Rx の Where、Select オペレーター相当の機能を実装できました😋
簡易 Rx 実装の完成です🎉🎉
もちろんこの実装はいろいろ足りません😭😭
- しかしながら、Rx(リアクティブエクステンション)の実装が内部で何をしているのかが、何となくわかって頂けたのではないでしょうか?
- 今回は C# の実装を参考にしたので他の言語では異なる実装になると思われますが、エッセンスは共通だと思います。
ここまでのまとめ
さて、序盤で書いた今までとは異なるプログラミングパラダイムって?🤔
- for, foreach などのループ処理を書かない。
- そもそも Subject 内の実装でラップされる。
- if, else if などの分岐処理を書かない。
- Where などで代替えされる。
結果、上記のようになったと思いませんか?😙
今一度、命令型プログラミングのパラダイム
最後に命令型プログラミングのパラダイムで、上記と同様の処理を実装してみます。
class Program {
static void Main(string[] args) {
// foreach や if で実装
List<string> list = new();
list.Add("Hello");
list.Add("World");
foreach (var item in list) {
if (item.Contains("H")) { // "H" が含まれる場合のみ
var result = item + "ですねん"; // "ですねん" を追加する
Console.WriteLine("result: " + result);
}
}
}
}
$ dotnet run
result: Helloですねん
全く違いますよね。今感じたことがリアクティブプログラミングとはパラダイムが異なるという感覚だと思います。こちらは、何のインターフェイス、クラス、拡張メソッドも使用していない為、非常にシンプルです。しかし、再利用性は限りなく低い実装と言えるでしょう。
まとめ
- 命令型プログラミングとは異なるリアクティブプログラミングの実装を試すことができました。
- 今回の説明では Rx (リアクティブエクステンション) ≠ リアクティブプログラミングのエッセンスを紹介したので Subject.OnNext() を自分で呼びました。
- しかし、例えば実際の Unity & UniRx などを使用するゲームプログラミングでは、自分で OnNext() メソッドを呼び出すケースはほぼありません。
- OnNext() メソッドは、Rx ライブラリが提供する Observable ストリームを管理するライブラリ側で良きタイミングで呼び出されています。
- この OnNext() っていったい誰が呼び出してるんだろう? という点が Rx 系で一番混乱する部分だと思いました。
どうでしたか? Window 11 の WSL Ubuntu に、C# / .NET の開発環境を手軽に構築することができます。ぜひお試しください。今後も .NET の開発環境などを紹介していきますので、ぜひお楽しみにしてください。