Observableの解説
非同期処理やリアクティブプログラミングの文脈で登場する "Observable" は、Rx(Reactive Extensions)ライブラリの中核的な概念です。この記事では、PythonのRxPYをベースに、Observableの基本からHot/Coldの違い、イテレータとの比較、処理方法までを徹底的に解説します。
Observableとは?
Observable(オブザーバブル)は、時間とともに発生する値やイベントのストリームを表現するオブジェクトです。データが「発行」され、それを購読(subscribe)することで、非同期に受け取ることができます。
特徴:
- データの流れ(ストリーム)を表現
-
subscribe()
によってデータを受信 - 時間軸に沿って複数の値を送る
-
map
,filter
,combine_latest
などのオペレータで処理を組み立てられる
サンプルコード(基本)
import rx
from rx import operators as ops
# 1から5のストリーム
source = rx.from_iterable(range(1, 6))
# 2倍してフィルター
source.pipe(
ops.map(lambda x: x * 2),
ops.filter(lambda x: x > 5)
).subscribe(lambda x: print(f"Received: {x}"))
出力:
Received: 6
Received: 8
Received: 10
イテレータ、Hot Observable、Cold Observable の比較
イテレータ(同期・Pull型)
def number_generator():
for i in range(3):
yield i
for val in number_generator():
print(f"A: {val}")
出力:
A: 0
A: 1
A: 2
Cold Observable(非同期・Push型、購読ごとに再実行)
import rx
from rx import operators as ops
import time
source = rx.from_iterable([0, 1, 2])
source.pipe(
ops.do_action(lambda _: time.sleep(1))
).subscribe(lambda x: print(f"A: {x}"))
time.sleep(2)
source.pipe(
ops.do_action(lambda _: time.sleep(1))
).subscribe(lambda x: print(f"B: {x}"))
出力:
A: 0
A: 1
A: 2
B: 0
B: 1
B: 2
Hot Observable(非同期・Push型、共有ストリーム)
import rx
from rx.subject import Subject
from threading import Thread
import time
subject = Subject()
def emit_values():
for i in range(3):
time.sleep(1)
subject.on_next(i)
Thread(target=emit_values).start()
subject.subscribe(lambda x: print(f"A: {x}"))
time.sleep(2)
subject.subscribe(lambda x: print(f"B: {x}"))
time.sleep(2)
出力例:
A: 0
A: 1
A: 2
B: 2
ユースケースの違い:Cold vs Hot Observable
Cold Observable(遅延評価、再実行されるストリーム)
Cold Observable は以下のような「購読者ごとに毎回新しい処理をしたい」場面で有効です。
ユースケース例:
- HTTPリクエスト(購読するたびにリクエストが実行される)
- ファイルの読み込み(購読するごとに別々の読み取り)
- データベースクエリの発行
特徴:
- 各購読者がデータ生成の開始点から観測できる
- 再現性のある処理に向く
Hot Observable(イベント共有、現在値だけを配信)
Hot Observable は、継続的に発生しているリアルタイムデータやイベントを「共有」したい場合に有効です。
ユースケース例:
- センサーの値(温度、加速度など)
- WebSocket や Kafka のようなストリーミングAPI
- UIイベント(マウスの動き、クリックなど)
- ログ監視ストリーム(複数モジュールでリアルタイム解析)
特徴:
- すでに開始しているデータフローに「途中から参加」する
- 複数の購読者でストリームを共有
Observableの処理方法
Observableの値は、基本的に subscribe()
を使って受け取ります。
方法1: subscribe()
rx.from_iterable([1, 2, 3]).subscribe(lambda x: print(x))
方法2: .pipe()
と演算子で加工
rx.from_iterable([1, 2, 3]).pipe(
ops.map(lambda x: x * 10),
ops.filter(lambda x: x > 10)
).subscribe(lambda x: print(x))
方法3: .to_list()
+ .subscribe()
または .run()
(RxPY v3)
rx.from_iterable([1, 2, 3]).pipe(
ops.to_list()
).subscribe(lambda x: print(f"List: {x}"))
または:
result = rx.from_iterable([1, 2, 3]).pipe(ops.to_list()).run()
print(result) # [1, 2, 3]
知っておくと良いこと
-
遅延評価(Lazy Evaluation):Observableは
subscribe()
されるまで実行されません - Hot Observable の扱いには注意:途中から購読すると過去の値は見えません
-
Schedulers を使えば非同期・スレッド処理も制御できます(例:
ThreadPoolScheduler
) - Rxは
async/await
よりも宣言的で、複雑なストリーム処理やUIイベント連携に向いています
まとめ
種類 | タイプ | データ発行のタイミング | 複数購読者対応 |
---|---|---|---|
イテレータ | Pull | 要求時 | × |
Cold Observable | Push | 購読時に開始 | △(再実行) |
Hot Observable | Push | ストリーム開始時 | ○(途中参加) |
Observableは非同期処理やリアクティブアーキテクチャにおいて非常に強力なツールです。特に複数の非同期イベントをシンプルに結合・加工したい場面では、イテレータやコールバックよりもはるかに表現力があります。
より高度な例(combine_latest, debounce, switch_map)なども活用することで、RxPYの真価が見えてきます。