15
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

【Python】RxPY(3.0.1)で学ぶReactive Extensions【Rx】

Last updated at Posted at 2019-11-20

TL;DR

PythonでRx書きたいなぁって思ってRxPYを使ってみたら、なんか使い心地が思ってたのと違った。
どうやら、新しい仕様ではObservableのchainingはpipeという仕組みを使って書くらしい。
https://rxpy.readthedocs.io/en/latest/migration.html

従来↓

observable_object.map(lambda x:x*2) \
                 .filter(lambda x:x>3) \
                 .subscribe(print) \

現在↓

from rx import operators as ops
observable_object.pipe(
    ops.map(lambda x:x*2),
    ops.filter(lambda x:x>3)
).subscribe(print)

ただ、これを伝えたかった。

でも、あまりにもRxPYの日本語記事がでてこないので、ついでに初学者PythonistにRxを布教するためにRxPYについてまとめておく。

Reactive Extensionsとは

データをLinqっぽい処理ができるObservableなストリームで扱うことで、非同期処理をスマートに書くためのAPIです。
http://reactivex.io/
主要な言語だと大体使えます。
http://reactivex.io/languages.html
Pythonの場合RxPYが対応します。
https://rxpy.readthedocs.io/en/latest/index.html

概念に関する解説はすでにたくさんあるので、公式ページや他のQiita記事に任せるとしてここではしません。
代わりに、RxPYのプリミティブなコードを紹介していきます。

ストリームの作り方

Reactive Extensionsではデータをストリームで扱います。
逆に言えば、Reactive Extensionsで扱いたいデータはストリームに変換する必要があります。

import rx

# 0,1,2,3,4のストリームを生成する。
rx.range(0,5) 

# 'aaa','bbb','ccc'のストリームを生成する。
rx.of('aaa','bbb','ccc')

# リストをストリームに変換する。
l = [0,1,2,3,4]
rx.from_(l)

ストリームのデータを使う

流れてるデータをそれぞれ順番に使っていきます。
Reactive Extensionsではストリームのデータを使う際はsubscribeします。
コードを見た方が早いかもしれません。

import rx

# 0,1,2,3,4のストリーム
stream = rx.range(0,5)

# print関数が0,1,2,3,4を順に受け取る。
stream.subscribe(print) 
###出力###
# 0
# 1
# 2
# 3
# 4

# もちろん、自分で定義した式やラムダ式も扱うことができる。
stream.subscribe(lambda x:print('value = '+str(x)))
###出力###
# value = 0
# value = 1
# value = 2
# value = 3
# value = 4

# もっと厳密に、エラーが起きたときの処理や、最後の処理を書くこともできる。
stream.subscribe(
    on_next = lambda x:print('on_next : '+str(x)) # ストリームのデータを受け取る関数。
    ,on_error = lambda x:print('on_error : '+str(x)) # エラーが起きたときの処理。
    ,on_completed = lambda :print('on_completed !') # ストリームのデータが全て流れたときに実行される。
)
###出力###
# on_next : 0
# on_next : 1
# on_next : 2
# on_next : 3
# on_next : 4
# on_completed !

ストリームのデータ加工

普通のReactive Extensionsだとstreamからメソッドチェインするのですが、
RxPYはストリームのデータ加工をpipeとoperatorsを使って行います。

import rx
from rx import operators as ops

# 0,1,2,3,4のストリーム
stream = rx.range(0,5)

# map
stream.pipe(
    ops.map(lambda x:x*2) # データを二倍にする。
).subscribe(print)
###出力###
# 0
# 2
# 4
# 6
# 8

# filter
stream.pipe(
    ops.filter(lambda x:x>2) # 2以下のデータをフィルタする。
).subscribe(print)
###出力###
# 3
# 4

# zip
stream.pipe(
    ops.zip(rx.range(0,10,2)) # 2つのストリームのデータをそれぞれペアにしていく。
).subscribe(print)
###出力###
# (0, 0)
# (1, 2)
# (2, 4)
# (3, 6)
# (4, 8)

# buffer_with_count
stream.pipe(
    ops.buffer_with_count(2) # データを2つずつにまとめる。
).subscribe(print)
###出力###
# [0, 1]
# [2, 3]
# [4]

# to_list
stream.pipe(
    ops.to_list() # データをリストにする。
).subscribe(print)
###出力###
# [0, 1, 2, 3, 4]

# オペレータはチェインできる。
stream.pipe(
    ops.map(lambda x:x*2) # データを二倍にする。
    ,ops.filter(lambda x:x>2) # 2以下のデータをフィルタする。
    ,ops.map(lambda x:str(x)) # データを文字に変換する。
).subscribe(lambda x:print('value = '+x))
###出力###
# value = 4
# value = 6
# value = 8

# 処理の途中でエラーが起きたときはon_errorが実行され、以降のデータは処理されない。
stream.pipe(
    ops.map(lambda x:1/(x-2)) # 2が流れたときにゼロ除算でエラーが発生する。
).subscribe(
    on_next = print
    ,on_error = lambda x: print(x)
)
###出力###
# -0.5
# -1.0
# division by zero

operatorはかなりの数があります。
目的に合ったものを探して使ってください。
https://rxpy.readthedocs.io/en/latest/reference_operators.html

ストリームにデータを流す

今までは既存のデータをストリームに変換していました。
ここでは任意のタイミングでストリームにデータを流す方法を解説します。

import rx
from rx.subject import Subject

# Subjectを用いて、任意のタイミングでデータを流すことができる特殊なストリームを作る。
stream = Subject()

# on_nextでデータを流すことができる。
# でも、このストリームはsubscribeされてないので何も起こらない。
stream.on_next(1)

# 一度subscribeしたら、データが流れるたびに受け取る。
d = stream.subscribe(print)
stream.on_next(1)
###出力###
# 1
stream.on_next(2)
###出力###
# 2

# subscribeをやめるときはdisposeする。
d.dispose()
stream.on_next(2)

# 複数subscribeすることも可能。ブロードキャストする感じ。
d1 = stream.subscribe(lambda x:print('subscriber 1 got '+str(x)))
d2 = stream.subscribe(lambda x:print('subscriber 2 got '+str(x)))
d3 = stream.subscribe(lambda x:print('subscriber 3 got '+str(x)))
stream.on_next(1)
###出力###
# subscriber 1 got 1
# subscriber 2 got 1
# subscriber 3 got 1

# 不要なsubscriberはdisposeしないと、永遠にsubscribeし続ける。
d1.dispose()
d2.dispose()
d3.dispose()

# ストリームを加工してsubscribeすることも可能
stream.pipe(
    ops.filter(lambda x:x%2==0) # 2の倍数でフィルタ
).subscribe(lambda x:print(str(x)+' is a multiple of 2'))
stream.pipe(
    ops.filter(lambda x:x%3==0) # 3の倍数でフィルタ
).subscribe(lambda x:print(str(x)+' is a multiple of 3'))
stream.on_next(2)
###出力###
# 2 is a multiple of 2
stream.on_next(3)
###出力###
# 3 is a multiple of 3
stream.on_next(6)
###出力###
# 6 is a multiple of 2
# 6 is a multiple of 3

# subjectをdisposeすると、リソースが開放される。
# subscribeしてるものも全てdisposeされる。
# disposeするとデータを流すことができなくなる。
stream.dispose()

subjectもいくつか種類があります。用途にあったものを使ってください。
https://rxpy.readthedocs.io/en/latest/reference_subject.html

スケジューリング

どんなタイミングで、どうやってsubscribeされるかをコントロールします。

import rx
from rx import operators as ops
import time
import random
from rx.subject import Subject
from rx.scheduler import NewThreadScheduler
from rx.scheduler import CurrentThreadScheduler

def f(s):
    time.sleep(1*random.random())
    print(s)

stream = Subject()

# 現在のスレッドで実行するスケジューラを設定。
# 同一スレッド内でsubscribeは一つずつ実行される。
stream_with_scheduler = stream.pipe(
    ops.observe_on(CurrentThreadScheduler()) # スケジューラの設定
)

stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))

stream.on_next(1)
# CurrentThreadSchedulerはデフォルトのスケジューラと同じなので動作は変わらない。
###出力###
# 1
# 2
# 3

stream.dispose()
stream = Subject()

# 新しいスレッド上で実行するスケジューラを設定
stream_with_scheduler = stream.pipe(
    ops.observe_on(NewThreadScheduler()) # スケジューラの設定
)

stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))

stream.on_next(1)
# 新しいスレッド上で実行されるので、全て同時に実行される。
###出力###
# 2
# 3
# 1

stream.dispose()

スケジューラもいくつかあります。用途に合ったものを使ってください。
https://rxpy.readthedocs.io/en/latest/reference_scheduler.html

非同期処理

ここまでの知識を使って非同期処理をいい感じに行う方法を解説します。

1. 待ち合わせ

HTTPリクエストや重い演算など、時間のかかる処理がある場合、順番に実行していくより並列で実行したほうがいいことがあります。
そこで問題になってくるのが処理の依存関係です。
ここでは、一つの解決方法として待ち合わせの方法を紹介します。

import rx
from rx import operators as ops
from rx.subject import Subject
import threading
import time
import random

stream = Subject()

# 時間がかかる処理
def f1():
    time.sleep(5*random.random())
    print('f1 done.')
    stream.on_next(1)
def f2():
    time.sleep(5*random.random())
    print('f2 done.')
    stream.on_next(1)
def f3():
    time.sleep(5*random.random())
    print('f3 done.')
    stream.on_next(1)
def f4():
    time.sleep(5*random.random())
    print('f4 done.')
    stream.on_next(1)
def f5():
    time.sleep(5*random.random())
    print('f5 done.')
    stream.on_next(1)

stream.pipe(
    ops.buffer_with_count(5) # ストリームに5個データが流れるまでストックされる。
).subscribe(lambda x:print('All done.')) # ストリームに5個データが流れてから実行される。つまり、f1 ~ f5が全て終わってから実行される。

# 時間のかかる処理なので全部同時に実行。
for f in [f1,f2,f3,f4,f5]:
    threading.Thread(target=f).start()

###出力###
# f5 done.
# f4 done.
# f1 done.
# f3 done.
# f2 done.
# All done.

2. イベント処理

想定するケースはこちらです。

  • Enterキーのイベント処理をしたい。
    • on_press : 押したときに発動
    • on_release : 離したときに発動
    • on_double_press : ダブルクリックみたいに2連続で押したときに発動
  • ただし、使える関数はキーボードの状態を返すkeyboard.is_pressedのみ。

若干恣意的ですが簡単な例を作るためなのでお許しを・・・

まず、キーボードの状態をストリームにします。
具体的には、以下の無尽蔵に出力されるTrueとかFalseをストリームにします。

while True:
    print(keyboard.is_pressed('enter'))
###出力###
# True
# True
# True
# True
# False
# False
# ...

↓ストリームに流すように変更

import rx
from rx import operators as ops
from rx.subject import Subject

enter_state_stream = Subject()
while True:
    # enter_state_streamにEnterキーの状態を流す
    enter_state_stream.on_next(keyboard.is_pressed('enter'))

subscribeしてないので、このままでは何も起こりません。
手始めにon_pressを実装していきます。

import rx
from rx import operators as ops
from rx.subject import Subject

enter_state_stream = Subject()

# on_press
enter_state_stream.pipe(
    ops.buffer_with_count(2,1) # 2つのデータを取得する。
    ,ops.filter(lambda x: x==[False,True]) # 押した瞬間はFalse,Trueのデータが流れる。
).subscribe(lambda x: print('on_press'))

while True:
    enter_state_stream.on_next(keyboard.is_pressed('enter'))

###出力(エンターキーを押す度にon_pressが表示される)###
# on_press
# on_press
# on_press
# on_press

on_releaseは同じ方法で実装できるので一旦飛ばします。
次はon_double_pressを実装してみます。

import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime

enter_state_stream = Subject()
on_press_stream = enter_state_stream.pipe(
    ops.buffer_with_count(2,1) # 先頭から2つのデータを取得する。
    ,ops.filter(lambda x: x==[False,True]) # 押した瞬間はFalse,Trueのデータが流れる。
)

# on_double_press
on_press_stream.pipe(
    ops.timestamp() # on_pressにタイムスタンプを付ける
    ,ops.buffer_with_count(2,1) # on_pressを2つずつ見ていく
    ,ops.map(lambda x:x[1][1]-x[0][1]) # 2つのon_pressの時間間隔に変換
    ,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2)) # on_pressの時間間隔が0.2秒以下でフィルタ
).subscribe(lambda x: print('on_double_press'))

while True:
    enter_state_stream.on_next(keyboard.is_pressed('enter'))
###出力(エンターキーを連続で押す度に表示される)###
# on_double_press
# on_double_press
# on_double_press
# on_double_press

これでon_double_pressは実装できました。
最後に非同期処理にしつつ、いい感じのクラスにまとめてみます。

import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime
import threading

class Enter:
    enter_state_stream = None
    on_press_stream = None
    on_release_stream = None
    on_double_press = None
    def __init__(self):
        self.enter_state_stream = Subject()
        self.on_press_stream = self.enter_state_stream.pipe(
            ops.buffer_with_count(2,1) 
            ,ops.filter(lambda x: x==[False,True]) 
        )
        self.on_release_stream = self.enter_state_stream.pipe(
            ops.buffer_with_count(2,1) 
            ,ops.filter(lambda x: x==[True,False]) 
        )
        self.on_double_press = self.on_press_stream.pipe(
            ops.timestamp() 
            ,ops.buffer_with_count(2,1)
            ,ops.map(lambda x:x[1][1]-x[0][1]) 
            ,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2)) 
        )
        def f():
            while True:
                self.enter_state_stream.on_next(keyboard.is_pressed('enter'))
        threading.Thread(target=f).start()

def main():
    enter = Enter()
    # こんな感じでイベント処理が書ける!
    enter.on_double_press.subscribe(lambda x:print('on_double_press'))

さいごに

よければ、何でも書いてってください。コメントしてくれると嬉しいです。
なんか間違ってるとことかあったら教えてください。

15
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?