次のブロック図の計算をReactiveExtensionsで試します(この計算自体に意味はありません)。
準備
今回はPythonで試します。まずはPython版のReactiveExtensions をインストールします。
端末
pip install Rx
ブロック図をJSONで表現
JSONでブロック図の配置モジュールと配線を記述します。
test.json
{
"modules": [
{"name":"add1", "module_type":"add", "inputs":["sin1", "const1"]},
{"name":"add2", "module_type":"add", "inputs":["mul1", "div1"]},
{"name":"mul1", "module_type":"mul", "inputs":["sub1", "add1"]},
{"name":"sub1", "module_type":"sub", "inputs":["sin1", "cos1"]},
{"name":"div1", "module_type":"div", "inputs":["cos1", "const2"]},
{"name":"sin1", "module_type":"sin", "inputs":["const1"]},
{"name":"cos1", "module_type":"cos", "inputs":["add1"]},
{"name":"out1", "module_type":"out", "inputs":["add2"]},
{"name":"const1", "module_type":"const", "value":1.0},
{"name":"const2", "module_type":"const", "value":2.0}
]
}
実装
モジュール作成、モジュール配線、定数値の設定、計算結果の表示までまとめて実行します。
test.py
# -*- coding: utf-8 -*-
from rx.subjects import Subject
import json, operator, math
if __name__ == '__main__':
# JSON読み込み
with open('test.json') as f:
j = json.load(f)
# モジュール作成
modules = { m['name']:Subject() for m in j['modules'] }
# モジュール配線
for m in filter(lambda m: m['module_type'] != 'const', j['modules']):
module_type = m['module_type'];
self_name = m['name']
input_names = m['inputs']
if module_type == 'add':
modules[ input_names[0] ].zip(modules[ input_names[1] ], operator.add) \
.subscribe(modules[ self_name ].on_next)
elif module_type == 'sub':
modules[ input_names[0] ].zip(modules[ input_names[1] ], operator.sub) \
.subscribe(modules[ self_name ].on_next)
elif module_type == 'mul':
modules[ input_names[0] ].zip(modules[ input_names[1] ], operator.mul) \
.subscribe(modules[ self_name ].on_next)
elif module_type == 'div':
modules[ input_names[0] ].zip(modules[ input_names[1] ], operator.truediv) \
.subscribe(modules[ self_name ].on_next)
elif module_type == 'sin':
modules[ input_names[0] ].select(math.sin).subscribe(modules[ self_name ].on_next)
elif module_type == 'cos':
modules[ input_names[0] ].select(math.cos).subscribe(modules[ self_name ].on_next)
elif module_type == 'tan':
modules[ input_names[0] ].select(math.tan).subscribe(modules[ self_name ].on_next)
elif module_type == 'out':
modules[ input_names[0] ].subscribe(print) # 計算結果の表示
# 定数値を設定
for m in filter(lambda m: m['module_type'] == 'const', j['modules']):
self_name = m['name']
value = m['value']
modules[ self_name ].on_next(value)
結果
1.9082290502110406
ReactiveExtensions
今回使用したクラス、拡張メソッドについて説明します。
Subject
SubjectはObserver(観察する方)とObservable(観察される方)の両方を兼ね備えたクラスです。あとで配線で使うためnameをキーにしてDictionaryにしておきます。
抜粋
# モジュール作成
modules = { m['name']:Subject() for m in j['modules'] }
Select
単項演算はselectの計算値を受信(subscribe)し次のモジュールに送信(on_next)します。
抜粋
modules[ input_names[0] ].select(math.sin).subscribe(modules[ self_name ].on_next)
Zip
二項演算は2つの入力値が揃ってから計算値を受信(subscribe)し次のモジュールに送信(on_next)します。
抜粋
modules[ input_names[0] ].zip(modules[ input_names[1] ], operator.add) \
.subscribe(modules[ self_name ].on_next)
確認
念のため検算してみます。
test_check.py
# -*- coding: utf-8 -*-
import math
if __name__ == '__main__':
sin1 = math.sin(1.0)
add1 = sin1+1.0
cos1 = math.cos(add1)
sub1 = sin1-cos1
mul1 = sub1*add1
div1 = cos1/2.0
add2 = mul1+div1
out = add2
print(out)
結果
1.9082290502110406
やったぜ