1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RxJSのObservable作成方法まとめ(TypeScript対応)

Last updated at Posted at 2025-05-04

RxJSを学ぶ中で「Observableってどう作るの?」という場面に何度も遭遇したため、作成方法を網羅的にまとめてみました。

RxJSでは、カスタムObservableの作成や、イベント・配列・HTTPレスポンスなどから簡単にObservableを生成できるよう、さまざまな手段が提供されています。

本記事では、Observableの作成方法を網羅的にカテゴリ毎にまとめ、具体例とともに整理しました。

Observable作成手法の分類

以下は主な作成手法のカテゴリ別一覧です。

カテゴリ 主な手法 説明
カスタム作成 new Observable() 自由度が高いが記述量も多い。手動でクリーンアップが必要
作成演算子 of(), from(), fromEvent(), interval(), timer(), ajax(), fromFetch() よく使われるデータ・イベント・時間ベースの生成関数群
特殊な作成演算子 defer(), range(), generate(), iif() 制御的・ループ的な生成、条件による切り替えなど
特殊Observable EMPTY, NEVER, throwError() 完了・何もしない・エラー発行用
Subject系 Subject, BehaviorSubject 観測者としても送信者としても機能する特殊なObservable
コールバック変換 bindCallback(), bindNodeCallback() コールバックベースの関数をObservableに変換
WebSocket webSocket() WebSocket通信を双方向Observableとして扱う

カスタム作成

new Observable()

(公式: Observable)

最も基本的な方法は、Observableコンストラクタを直接使用することです。この方法はカスタムなObservableロジックを定義したい場合に最も柔軟です。明示的な next, error, complete 呼び出しによって細かな挙動制御が可能です。

import { Observable } from 'rxjs';

const observable$ = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

observable$.subscribe({
  next: value => console.log('値:', value),
  error: err => console.error('エラー:', err),
  complete: () => console.log('完了')
});

// (出力)
// 値: 1
// 値: 2
// 値: 3
// 値: 4
// 完了

new Observable() を使う場合は、明示的なリソース解放(クリーンアップ処理)を自分で記述する必要があります。

const obs$ = new Observable(subscriber ={
  const id = setInterval(() =subscriber.next(Date.now()), 1000);
  return () ={
    clearInterval(id); // 明示的なリソース解放
  };
});

一方、fromEvent()interval() などRxJSのビルトイン作成関数は、内部に適切なクリーンアップ処理を持ちます。

const click$ = fromEvent(document, 'click');
const timer$ = interval(1000);

これらは内部で addEventListenersetInterval を使用していて、unsubscribe() 時に RxJS が自動で removeEventListenerclearInterval() を呼ぶように設計されています。

なお、RxJSの内部でクリーンアップ処理が実装されていても、unsubscribe()を呼ばなければその処理は実行されないため注意が必要です。

 const subscription = observable$.subscribe({
 //省略...
 });
>
 subscription.unsubscribe(); // 👈 
  • どのObservableの作成方法でも、不要になったら必ずunsubscribe()する習慣を持ちましょう。
  • 購読解除を忘れると、イベントリスナーやタイマーが動き続けるため、メモリリークや予期せぬ副作用の原因になります。

作成演算子

より簡潔で用途に特化したObservable作成には、RxJSが提供する「作成操作子(creation operator)」が便利です。繰り返し使われるユースケースにはこれらを使うことでコードが簡素化されます。

「RxJS公式ではこれらはfunctionと分類されていますが、従来(RxJS 5.x ~ 6)はcreation operatorと呼ばれることが多く、現在でもその呼称はよく使われています。」

of()

(公式: of)

複数の値を1つずつ順に発行するもっともシンプルなObservable作成関数です。

import { of } from 'rxjs';

const values$ = of(1, 2, 3, 4, 5);

values$.subscribe({
  next: value => console.log('値:', value),
  error: err => console.error('エラー:', err),
  complete: () => console.log('完了')
});
// 出力: 値: 1, 値: 2, 値: 3, 値: 4, 値: 5, 完了

of()from() の違い

  • of([1, 2, 3]) → 1つの配列を発行します。
  • from([1, 2, 3]) → 個別の値 1, 2, 3 を順に発行します。
  • よく混同されるので注意が必要です。

from()

(公式: from)

配列・Promise・イテラブルなど、既存のデータ構造からObservableを生成します。

import { from } from 'rxjs';

// 配列から作成
const array$ = from([1, 2, 3]);
array$.subscribe({
  next: value => console.log('配列値:', value),
  error: err => console.error('エラー:', err),
  complete: () => console.log('完了')
});

// Promiseから作成
const promise$ = from(Promise.resolve('Promiseの結果'));
promise$.subscribe({
  next: value => console.log('Promise結果:', value),
  error: err => console.error('エラー:', err),
  complete: () => console.log('完了')
});

// イテラブルから作成
const iterable$ = from(new Set([1, 2, 3]));
iterable$.subscribe({
  next: value => console.log('イテラブル値:', value),
  error: err => console.error('エラー:', err),
  complete: () => console.log('完了')
});

// (出力)
// 配列値: 1
// 配列値: 2
// 配列値: 3
// 完了
// イテラブル値: 1
// イテラブル値: 2
// イテラブル値: 3
// 完了
// Promise結果: Promiseの結果
// 完了

fromEvent()

(公式: fromEvent)

DOMイベントなど、イベントソースをObservableとして扱うための関数です。

import { fromEvent } from 'rxjs';

const clicks$ = fromEvent(document, 'click');
clicks$.subscribe({
  next: event => console.log('クリックイベント:', event),
  error: err => console.error('エラー:', err),
  complete: () => console.log('完了')
});

// (出力)
// クリックイベント: PointerEvent {isTrusted: true, pointerId: 1, width: 1, height: 1, pressure: 0, …}

DOM以外では使えないことに注意

  • fromEvent() はブラウザ環境でのみ利用でき、Node.jsでは利用できません。
  • 複数回購読すると、複数のイベントリスナーが追加される可能性があります。

👉 より詳細なイベントストリームの活用例については、イベントのストリーム化 を参照してください。

interval(), timer()

(公式: interval), (公式: timer)

一定間隔で連続的に値を発行したいときや、時間制御が必要な場合に使われます。

import { interval, timer } from 'rxjs';

// 1秒ごとに値を発行
const interval$ = interval(1000);
interval$.subscribe({
  next: value => console.log('インターバル:', value),
  error: err => console.error('エラー:', err),
  complete: () => console.log('完了')
});

// 3秒後に開始し、その後1秒ごとに値を発行
const timer$ = timer(3000, 1000);
timer$.subscribe({
  next: value => console.log('タイマー:', value),
  error: err => console.error('エラー:', err),
  complete: () => console.log('完了')
});

// (出力)
// インターバル: 0
// インターバル: 1
// インターバル: 2
// タイマー: 0
// インターバル: 3
// タイマー: 1
// インターバル: 4
// タイマー: 2
// .
// .

interval()timer() は時間制御に関する処理で頻繁に使われ、特にアニメーション、ポーリング、非同期イベント遅延などに適しています。

Cold Observable である点に注意

  • interval()timer() は Cold Observable であり、購読のたびに独立して実行されます。
  • 必要に応じて share() などでHot化することも検討できます。

ajax()

(公式: ajax)

HTTP通信の結果をObservableとして非同期的に扱うための関数です。

import { ajax } from 'rxjs/ajax';

const api$ = ajax.getJSON('https://jsonplaceholder.typicode.com/todos/1');
api$.subscribe({
  next: response => console.log('APIレスポンス:', response),
  error: error => console.error('APIエラー:', error),
  complete: () => console.log('API完了')
});

// (出力)
// APIレスポンス: {userId: 1, id: 1, title: 'delectus aut autem', completed: false}
// API完了

RxJSのajaxは、内部的にはXMLHttpRequestを使用しています。一方、RxJSにはfromFetchというオペレーターもあり、これはFetch APIを利用してHTTPリクエストを行います。

fromFetch()

📘 RxJS公式: fromFetch

fromFetch() は Fetch API をラップして、HTTP リクエストを Observable として扱うことができる関数です。
ajax() と似ていますが、こちらはよりモダンで軽量です。

import { fromFetch } from 'rxjs/fetch';
import { switchMap } from 'rxjs';

const api$ = fromFetch('https://jsonplaceholder.typicode.com/todos/1');

api$.pipe(
  switchMap(response => response.json())
).subscribe({
  next: data => console.log('データ:', data),
  error: err => console.error('エラー:', err),
  complete: () => console.log('完了')
});

// 出力:
// データ: Objectcompleted: falseid: 1title: "delectus aut autem"userId: 1[[Prototype]]: Object
// main.ts:11 完了

fromFetch() は Fetch API を使用しているため、ajax() と異なりリクエスト設定の初期化やレスポンスの .json() 変換は手動で行う必要があります。
エラーハンドリングや HTTP ステータスのチェックなども適切に行う必要があります。

特殊な作成演算子

defer()

(公式: defer)

Observableの生成を購読時まで遅延させたいときに使用されます。

import { defer, of } from 'rxjs';

const deferred$ = defer(() => {
  const randomValue = Math.random();
  return randomValue > 0.5 ? 
    of('50%より大きい値:', randomValue) : 
    of('50%以下の値:', randomValue);
});

// 購読するごとに新しいObservableが作成される
deferred$.subscribe(value => console.log(value));
deferred$.subscribe(value => console.log(value));

// (出力)
// 50%以下の値:
// 0.08011364416212319
// 50%以下の値:
// 0.3141403962502316

defer() は副作用のある処理をObservable作成時ではなく購読時に遅延させたい場合に有効です。ランダム生成や現在時刻の取得などの用途に適しています。

of()との違い

  • of() は作成時点で値が確定します。
  • defer() は購読時に初めて処理されるため、購読するたびに値が変わるような処理に適しています。

range()

(公式: range)

指定された範囲内の一連の数値を発行するObservableを作成します。

import { range } from 'rxjs';

const range$ = range(5, 3); // 5から3つ → 5, 6, 7
range$.subscribe({
  next: val => console.log('range:', val),
  complete: () => console.log('完了')
});

// (出力)
// range: 5
// range: 6
// range: 7
// 完了

generate()

(公式: generate)

初期値・条件・更新式を指定して、ループのように数値や状態を生成するための関数です。

import { generate } from 'rxjs';

const generate$ = generate({
  initialState: 0,
  condition: x => x < 5,
  iterate: x => x + 1
});

generate$.subscribe({
  next: val => console.log('generate:', val),
  complete: () => console.log('完了')
});

// (出力)
// generate: 0
// generate: 1
// generate: 2
// generate: 3
// generate: 4
// 完了

iif()

(公式: iif)

条件に応じて、実行するObservableを動的に切り替える ための関数です。

import { iif, of, EMPTY } from 'rxjs';

const condition = true;
const iif$ = iif(() => condition, of('条件はtrue'), EMPTY);

iif$.subscribe({
  next: val => console.log('iif:', val),
  complete: () => console.log('完了')
});

// (出力)
// iif: 条件はtrue
// 完了

iif() は条件によって返すObservableを動的に切り替えることができます。フロー制御に便利です。

特殊Observable

EMPTY, NEVER, throwError()

(公式: EMPTY), (公式: NEVER), (公式: throwError)

実行制御や例外処理、学習用として役立つ特殊なObservableもRxJSには用意されています。

import { EMPTY, throwError, NEVER } from 'rxjs';

// 即座に完了するObservable
const empty$ = EMPTY;
empty$.subscribe({
  next: () => console.log('これは表示されない'),
  complete: () => console.log('即座に完了')
});

// エラーを発行するObservable
const error$ = throwError(() => new Error('エラー発生'));
error$.subscribe({
  next: () => console.log('これは表示されない'),
  error: err => console.error('エラー:', err.message),
  complete: () => console.log('完了')
});

// 何も発行せず、完了もしないObservable
const never$ = NEVER;
never$.subscribe({
  next: () => console.log('これは表示されない'),
  complete: () => console.log('これも表示されない')
});

// (出力)
// 即座に完了
// エラー: エラー発生

主に制御・検証・学習用途

  • EMPTY, NEVER, throwError() は、通常のデータストリームではなく、フロー制御や例外ハンドリングの検証、または学習用途で活用されます。

Subject系

Subject, BehaviorSubject など

(公式: Subject), (公式: BehaviorSubject)

自ら値を発行できるObservableで、マルチキャストや状態共有に向いています。

import { Subject } from 'rxjs';

const subject$ = new Subject<number>();

// Observerとして使用
subject$.subscribe(value => console.log('Observer 1:', value));
subject$.subscribe(value => console.log('Observer 2:', value));

// Observableとして使用
subject$.next(1);
subject$.next(2);
subject$.next(3);
subject$.complete();

// (出力)
// Observer 1: 1
// Observer 2: 1
// Observer 1: 2
// Observer 2: 2
// Observer 1: 3
// Observer 2: 3

Hot Observableであることに注意

  • Subject は購読者に「同時に」通知されるため、from()of() などの Cold Observable とは異なり、購読タイミングによって値を受け取れないことがあります

詳しくは、「Subjectとは」を参照してください。

コールバック変換

RxJSには、コールバックベースの非同期関数をObservableに変換するための関数として bindCallback() および bindNodeCallback() が用意されています。

bindCallback()

(公式: bindCallback)

import { bindCallback } from 'rxjs';

function asyncFn(input: string, callback: (result: string) => void) {
  setTimeout(() => callback(`Hello, ${input}`), 1000);
}

const observableFn = bindCallback(asyncFn);
const result$ = observableFn('RxJS');

result$.subscribe({
  next: val => console.log(val), // Hello, RxJS
  complete: () => console.log('完了')
});

bindNodeCallback()

(公式: bindNodeCallback)

import { bindNodeCallback } from 'rxjs';
import { readFile } from 'fs';

const readFile$ = bindNodeCallback(readFile);
readFile__('./some.txt', 'utf8').subscribe({
  next: data => console.log('内容:', data),
  error: err => console.error('エラー:', err)
});

bindNodeCallback() は Node.js の (err, result) 型の非同期関数に対応しています。

使い分け

  • コールバックの第1引数が「エラーかどうか」なら bindNodeCallback()
  • 単純に「値だけ返す」コールバックなら bindCallback()

WebSocket()

(公式: webSocket)

RxJSの rxjs/webSocket モジュールには、WebSocketをObservable/Observerとして扱える webSocket() 関数が用意されています。

import { webSocket } from 'rxjs/webSocket';

const socket$ = webSocket('wss://echo.websocket.org');

socket$.subscribe({
  next: msg => console.log('受信:', msg),
  error: err => console.error('エラー:', err),
  complete: () => console.log('完了')
});

// メッセージ送信(Observerとしての機能)
socket$.next('Hello WebSocket!');

webSocket() は双方向通信が可能な Observable/Observer ハイブリッドです。
WebSocketの接続・送信・受信を簡単にObservableとして扱えるため、リアルタイム通信に便利です。

まとめ

RxJSのストリームは、従来のJavaScriptのイベント処理やAJAX通信などを統一的なインターフェイスで扱えるようにします。特に時間的に変化するデータを扱う場合や、複数のイベントソースを組み合わせる場合に威力を発揮します。

最後に

📘 本記事は以下の学習用サイト「RxJS with TypeScript」をもとに構成されています👇

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?