0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

#0125(2025/05/06)Observableの解説

Posted at

Observableの解説

非同期処理やリアクティブプログラミングの文脈で登場する "Observable" は、Rx(Reactive Extensions)ライブラリの中核的な概念です。この記事では、PythonのRxPYをベースに、Observableの基本からHot/Coldの違い、イテレータとの比較、処理方法までを徹底的に解説します。


Observableとは?

Observable(オブザーバブル)は、時間とともに発生する値やイベントのストリームを表現するオブジェクトです。データが「発行」され、それを購読(subscribe)することで、非同期に受け取ることができます。

特徴:

  • データの流れ(ストリーム)を表現
  • subscribe() によってデータを受信
  • 時間軸に沿って複数の値を送る
  • map, filter, combine_latest などのオペレータで処理を組み立てられる

サンプルコード(基本)

import rx
from rx import operators as ops

# 1から5のストリーム
source = rx.from_iterable(range(1, 6))

# 2倍してフィルター
source.pipe(
    ops.map(lambda x: x * 2),
    ops.filter(lambda x: x > 5)
).subscribe(lambda x: print(f"Received: {x}"))

出力:

Received: 6
Received: 8
Received: 10

イテレータ、Hot Observable、Cold Observable の比較

イテレータ(同期・Pull型)

def number_generator():
    for i in range(3):
        yield i

for val in number_generator():
    print(f"A: {val}")

出力:

A: 0
A: 1
A: 2

Cold Observable(非同期・Push型、購読ごとに再実行)

import rx
from rx import operators as ops
import time

source = rx.from_iterable([0, 1, 2])

source.pipe(
    ops.do_action(lambda _: time.sleep(1))
).subscribe(lambda x: print(f"A: {x}"))

time.sleep(2)
source.pipe(
    ops.do_action(lambda _: time.sleep(1))
).subscribe(lambda x: print(f"B: {x}"))

出力:

A: 0
A: 1
A: 2
B: 0
B: 1
B: 2

Hot Observable(非同期・Push型、共有ストリーム)

import rx
from rx.subject import Subject
from threading import Thread
import time

subject = Subject()

def emit_values():
    for i in range(3):
        time.sleep(1)
        subject.on_next(i)

Thread(target=emit_values).start()

subject.subscribe(lambda x: print(f"A: {x}"))
time.sleep(2)
subject.subscribe(lambda x: print(f"B: {x}"))
time.sleep(2)

出力例:

A: 0
A: 1
A: 2
B: 2

ユースケースの違い:Cold vs Hot Observable

Cold Observable(遅延評価、再実行されるストリーム)

Cold Observable は以下のような「購読者ごとに毎回新しい処理をしたい」場面で有効です。

ユースケース例:

  • HTTPリクエスト(購読するたびにリクエストが実行される)
  • ファイルの読み込み(購読するごとに別々の読み取り)
  • データベースクエリの発行

特徴:

  • 各購読者がデータ生成の開始点から観測できる
  • 再現性のある処理に向く

Hot Observable(イベント共有、現在値だけを配信)

Hot Observable は、継続的に発生しているリアルタイムデータやイベントを「共有」したい場合に有効です。

ユースケース例:

  • センサーの値(温度、加速度など)
  • WebSocket や Kafka のようなストリーミングAPI
  • UIイベント(マウスの動き、クリックなど)
  • ログ監視ストリーム(複数モジュールでリアルタイム解析)

特徴:

  • すでに開始しているデータフローに「途中から参加」する
  • 複数の購読者でストリームを共有

Observableの処理方法

Observableの値は、基本的に subscribe() を使って受け取ります。

方法1: subscribe()

rx.from_iterable([1, 2, 3]).subscribe(lambda x: print(x))

方法2: .pipe() と演算子で加工

rx.from_iterable([1, 2, 3]).pipe(
    ops.map(lambda x: x * 10),
    ops.filter(lambda x: x > 10)
).subscribe(lambda x: print(x))

方法3: .to_list() + .subscribe() または .run()(RxPY v3)

rx.from_iterable([1, 2, 3]).pipe(
    ops.to_list()
).subscribe(lambda x: print(f"List: {x}"))

または:

result = rx.from_iterable([1, 2, 3]).pipe(ops.to_list()).run()
print(result)  # [1, 2, 3]

知っておくと良いこと

  • 遅延評価(Lazy Evaluation):Observableは subscribe() されるまで実行されません
  • Hot Observable の扱いには注意:途中から購読すると過去の値は見えません
  • Schedulers を使えば非同期・スレッド処理も制御できます(例:ThreadPoolScheduler
  • Rxは async/await よりも宣言的で、複雑なストリーム処理やUIイベント連携に向いています

まとめ

種類 タイプ データ発行のタイミング 複数購読者対応
イテレータ Pull 要求時 ×
Cold Observable Push 購読時に開始 △(再実行)
Hot Observable Push ストリーム開始時 ○(途中参加)

Observableは非同期処理やリアクティブアーキテクチャにおいて非常に強力なツールです。特に複数の非同期イベントをシンプルに結合・加工したい場面では、イテレータやコールバックよりもはるかに表現力があります。

より高度な例(combine_latest, debounce, switch_map)なども活用することで、RxPYの真価が見えてきます。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?