LoginSignup
67
56

More than 1 year has passed since last update.

【Laravel】 キュー・イベント・ブロードキャストに関する補足とフロントエンドへの導入

Last updated at Posted at 2019-01-15

【2021-11-29 追記】
フロントエンドで Redux-Saga が採用されるケースが限定的になってきているので,フロントエンド結合部分に関してはレガシーな内容だと思って軽く読み飛ばしてください。

はじめに

前回同様,基本はマニュアルを参照すべし。以下補足説明

キューのアーキテクチャ

名称 説明
キュー バックグラウンドジョブが突っ込まれる場所
ワーカー バックグラウンドジョブを取り出して消化するプロセス
コネクション  バックグラウンドジョブを突っ込むための入口
バス 複数のコネクションを束ねる,入口のための入口
  • コネクションは複数のキューを持つことができる。一般的に,ジョブの種類や優先度によってキューを分岐することが多い。
  • キューとワーカーは一対一で対応するとは限らず,「どのワーカーがどのキューを処理するか」ということに関して任意の組み合わせが可能である。

キュー

総称として,バスを除くこれらの仕組みをまとめて「キュー」と呼ぶこともある。

ジョブのアーキテクチャ

ジョブをディスパッチする方法

マニュアルには

// このジョブはデフォルトキューへ送られる
Job::dispatch();

// このジョブは"emails"キューへ送られる
Job::dispatch()->onQueue('emails');

のようなコードが書かれているが,これは Artisan コマンドで生成したジョブクラスに最初からミックスインされている Dispatchable トレイトに実装されているメソッドである。

<?php

namespace Illuminate\Foundation\Bus;

use Illuminate\Contracts\Bus\Dispatcher;

class PendingDispatch
{
    /**
     * The job.
     *
     * @var mixed
     */
    protected $job;

    /**
     * Create a new pending job dispatch.
     *
     * @param  mixed  $job
     * @return void
     */
    public function __construct($job)
    {
        $this->job = $job;
    }

    /* ... */

    /**
     * Handle the object's destruction.
     *
     * @return void
     */
    public function __destruct()
    {
        app(Dispatcher::class)->dispatch($this->job);
    }
}

メソッドチェーンを通じて設定を行い,チェーンが切れたタイミングでディスパッチされる。最終的にやっていることは

app(\Illuminate\Contracts\Bus\Dispatcher::class)->dispatch($job);

だ。ちなみに Illuminate\Contracts\Bus\Dispatcher に対応するファサード Illuminate\Support\Facades\Bus が存在しているので,ファサードを使って

Bus::dispatch($job);

と書くこともできる。

なお,ディスパッチされたジョブは,\Illuminate\Contracts\Queue\ShouldQueue を実装しているかどうかによって,直ちに実行するか,一旦キューに投入するかの分岐が行われる。必ずバックグラウンドジョブにしたい場合は実装しておこう。

「ジョブ」という言葉の定義

Laravel はジョブの仕組みをレイヤー化しているが,重複して同じ「ジョブ」という名称で呼んでいるものがあるため,フレームワークのコードを読むと混乱しやすい。ここではレイヤー別にジョブについての説明を行う。

それぞれ,インスタンスが生成されるタイミングに注意されたい。

名称 レイヤー 予約時のインスタンス生成 実行時のインスタンス生成
アプリケーションジョブ
(デシリアライズ)
ラッパージョブ
ドライバジョブ
A. キューに投入されるまでの大まかな流れ
  1. アプリケーションジョブのインスタンスを生成し,ディスパッチ処理を開始。
  2. アプリケーションジョブが serialize() 関数でシリアライズされる。
  3. 更にシリアルを固定のラッパージョブ名 CallQueuedHandler@callとともにJSONとしてラップし,これがペイロードとなってキューに投入される。

enqueue

B. キューから取り出されて実行されるされるまでの大まかな流れ
  1. キューからペイロードが取り出され, ドライバジョブのインスタンスが生成される。
  2. ドライバジョブfire()メソッドが実行される。
  3. ラッパージョブCallQueuedHandlerのインスタンスが生成され, アプリケーションジョブのシリアルを引数としてcall()メソッドが実行される。
  4. unserialize() 関数でアプリケーションジョブがデシリアライズされる。
  5. アプリケーションジョブhandle()メソッドが実行される。

dequeue

アプリケーションジョブ

./artisan make:job で作る,通常 App\Jobs\ 名前空間に配置されるジョブのことである。マニュアルが「ジョブ」と言ったら基本的にこれのことを指している。

<?php

namespace App\Jobs;

/* ... */

class ProcessPodcast implements ShouldQueue
{
   use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

   protected $podcast;

   /**
    * 新しいジョブインスタンスの生成
    *
    * @param  Podcast  $podcast
    * @return void
    */
   public function __construct(Podcast $podcast)
   {
       $this->podcast = $podcast;
   }

   /**
    * ジョブの実行
    *
    * @param  AudioProcessor  $processor
    * @return void
    */
   public function handle(AudioProcessor $processor)
   {
       // アップロード済みポッドキャストの処理…
   }
}

参考程度に,トレイトの解説も行っておく。ちなみにこれらのトレイトはすべて任意使用であって,必須ではない。

トレイト 所属サービス 役割
Dispatchable Bus ディスパッチ処理をインスタンスメソッドから呼べるようにする
InteractsWithQueue Queue ドライバジョブにアプリケーションジョブからアクセスできるようにする
Queueable Bus 使用するコネクション・キューをデフォルトから変更できるようにする
SerializesModels Queue Eloquent モデルオブジェクト自体をシリアライズせず,主キーのみをシリアライズし,デシリアライズされるときに再度クエリを投げて取得する
(これが無いと,デシリアライズされたオブジェクトは読み取り専用になってしまう)

ラッパージョブ

基本的に Illuminate\Queue\CallQueuedHandler::call() で固定。以下のような処理を行うために,アプリケーションジョブをラップする形で使用される。

  • アプリケーションジョブのシリアライズ・デシリアライズ
  • InteractsWithQueueを使用しているアプリケーションジョブにドライバジョブをセットする
  • チェインされたアプリケーションジョブの継続処理
  • 完了または失敗したドライバジョブの削除処理
  • SerializesModels を実装したアプリケーションジョブのデシリアライズで ModelNotFoundException が発生したとき, 以下のデフォルトプロパティ宣言があれば,ジョブを失敗と見なさずに直ちに削除する
public $deleteWhenMissingModels = true;

ドライバジョブ

ドライバジョブは, Job 契約 を実装している。参考程度に,ドライバジョブとしてフレームワークによって定義されている例を挙げておく。

なおこれらは,基底抽象クラス Jobを利用して,契約を利用しつつもコードのコピペを避けるように工夫されている。

イベントのアーキテクチャ

イベントは,一見ジョブと似ているようにも思えるが,発生したときの処理を外部に移譲するという点で決定的に異なる。その処理を行うのはリスナーだ。どちらかといえば,リスナーのほうが素のジョブに近いだろう。

以下では,リスナーに ShouldQueue が実装されている場合の図解を示す。


A. リスナーがキューに投入されるまでの大まかな流れ

  1. イベントが発生する。
  2. 呼び出すリスナーの情報を引数として CallQueuedListener クラスのインスタンスが生成され,以降アプリケーションジョブとして処理される。 この際, "displayName" にリスナーのクラス名が自動的に設定され,デバッグ情報としてワーカーのログに表示されるようになる。

enqueue

B. キューから取り出されて実行されるされるまでの大まかな流れ
  1. アプリケーションジョブ CallQueuedListener のインスタンスが復元される。
  2. handle() メソッドが実行される。
  3. 元のリスナーのインスタンスが生成され,その handle() メソッドが実行される。

dequeue

ブロードキャストのアーキテクチャ

ブロードキャストなのにキューが必要な理由?

通常の ShouldBroadcast 契約をイベントに実装している場合,イベントをブロードキャストするためには

  1. キュードライバ
  2. ブロードキャストドライバ

の2つが必要だ。どうしてそうなっているのかを考えてみよう。

以下にそれぞれのドライバを列挙する。なお,処理が同期的に行われてキューの意味が無いものであったり,ログを取るだけもしくは何もしない,といった選択肢は除外している。


キュードライバ一覧

  • database
  • beanstalkd
  • sqs
  • redis
ブロードキャストドライバ一覧
  • pusher
  • redis

この中に1つ明らかに浮いているものがある。それは Pusher だ。 Pusher はAPI経由で外部サービスとして利用するしかないため,通信のオーバーヘッドがそれなりに発生する。SQS も AWS のサービスではあるが,AWS でインフラを構築している場合しか使用しないので, 外部サービスではないと考えられる。

もし Pusher をブロードキャストドライバに選択した場合,ブロードキャスト処理を行うだけでリクエストに対する応答が遅くなってしまう。これを克服するためにはブロードキャスト処理をバックグラウンドで行う必要があり,そのためのキュー投入処理があるのだ。

逆に言えば,ブロードキャストドライバに Redis を利用している場合,IO が非常に軽いため,キューに投入する処理は完全に無駄になってしまう。(もしキュードライバにも Redis を利用していたら, Redis と無駄に2回通信することになってしまう)

ブロードキャストドライバに Redis を採用した場合, ShouldBroadcast は一切使用せず,すべて ShouldBroadcastNow に統一すべきだ。但し,イベントの broadcastWith メソッドの処理が極端に重くなったりする場合はこの限りではない。

ShouldBroadcast を実装したイベント

ブロードキャストドライバに Pusher を使用する場合の図解を示す。

enqueue
dequeue

いったんキューで寝かせて,出番が来たら Pusher に丸投げ。

ShouldBroadcastNow を実装したイベント

ブロードキャストドライバに Redis + Laravel Echo Server を使用する場合の図解を示す。

download-4.png

Redis に直接 PUBLISH して, Laravel Echo Server が SUBSCRIBE する形になる。

特定イベントに関して toOthers() メソッドを常に呼ぶ

しかし、broadcast関数には、ブロードキャストの受取人から現在のユーザーを除外できる、toOthersメソッドが用意されています。

broadcast(new ShippingStatusUpdated($update))->toOthers();

特定のイベントに関して常に toOthers() を利用したい場合,イベントのコンストラクタで dontBroadcastToCurrentUser() をコールするとよい。 以下に例を示す。

イベントの実装例
<?php

declare(strict_types=1);

namespace App\Events;

use App\Post;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Queue\SerializesModels;

class BroadcastPostCreated implements ShouldBroadcastNow
{
    use SerializesModels, InteractsWithSockets;

    /**
     * @var Post
     */
    public $post;

    public function __construct(Post $post)
    {
        $this->dontBroadcastToCurrentUser();
        $this->post = $post;
    }

    public function broadcastWith(): array
    {
        return $this->post->toArray();
    }

    public function broadcastAs(): string
    {
        return 'post.created';
    }

    public function broadcastOn(): array
    {
        /* ... */
    }
}

toOthers() はこれを呼んでいる。

/**
 * Broadcast the event to everyone except the current user.
 *
 * @return $this
 */
public function toOthers()
{
    if (method_exists($this->event, 'dontBroadcastToCurrentUser')) {
        $this->event->dontBroadcastToCurrentUser();
    }

    return $this;
}

このメソッドは InteractsWithSockets トレイトによって提供される。

/**
 * Exclude the current user from receiving the broadcast.
 *
 * @return $this
 */
public function dontBroadcastToCurrentUser()
{
    $this->socket = Broadcast::socket();
    return $this;
}

Broadcast::socket() の実態は BroadcastManager に実装されている。

/**
 * Get the socket ID for the given request.
 *
 * @param  \Illuminate\Http\Request|null  $request
 * @return string|null
 */
public function socket($request = null)
{
    if (! $request && ! $this->app->bound('request')) {
        return;
    }

    $request = $request ?: $this->app['request'];

    return $request->header('X-Socket-ID');
}

Laravel Echo を用いたフロントエンドとのつなぎ込み

PHP側からある特定のイベントがブロードキャストされたとき,フロントエンド側で対応する Redux のアクションをディスパッチさせることを考える。

接続状態のコントロール

Echo は生の WebSocket 同様,インスタンスを生成した瞬間に接続試行されるため, Redux などで状態遷移を管理している場合には扱いにくい。ラップした上で, connect() メソッドを呼んで接続するようにすると扱いやすい。以下に Socket.IO を使用する場合の実装例を示す。

src/api/echo.js
import io from 'socket.io-client'
import Echo from 'laravel-echo'
/* ... */

class EchoContainer {

  connection = null

  connecting = () => {
    return !!this.connection
  }

  connect = () => {
    this.connection = new Echo({
      broadcaster: 'socket.io',
      client: io,
      host: 'https://example.com',
    })
    return new Promise((resolve, reject) => {
      if (!this.connecting()) {
        throw new Error('socket already disconnected')
      }
      this.connection.connector.socket.on('connect', resolve)
      this.connection.connector.socket.on('error', reject)
    })
  }

  disconnect = () => {
    if (this.connecting()) {
      this.connection.disconnect()
      this.connection = null
    }
  }

  /* ... */
}

export default new EchoContainer()

this のバインドを省略できるよう,インスタンスごとに関数オブジェクトを作る書き方をしている

axios からのリクエストへの X-Socket-Id ヘッダの付与

既に紹介したように,Laravel のブロードキャストは,「現在のリクエストを送ってきた相手にはブロードキャストしない」という機能を持っているが,これに必要な X-Socket-Id として付与すべきIDは以下のように取り出せる。これもラップしたクラスに実装するとよい。以下に axios を利用している場合の実装例を示す。

src/api/echo.js
/* ... */

class EchoContainer {

  /* ... */

  interceptor = (config) => {
    if (this.connecting() && config.method !== 'get') {
      const socketId = this.connection.connector.socketId()
      if (socketId) {
        config.headers['X-Socket-Id'] = socketId
      }
    }
    return config
  }

  /* ... */
}

export default new EchoContainer()
src/api/app.js
import axios from 'axios'
import echo from './echo'

const app = axios.create()
app.interceptors.request.use(echo.interceptor)

/* ... */

export default app

基本的に GET メソッドのエンドポイントアクセスでブロードキャストが発生することは無いので,そのときのみ除外している。これは,APIサーバがクロスオリジンである場合,独自ヘッダーが使用されるとプリフライトリクエストが飛んでしまうのを避ける意図がある。GET アクセスで毎回通信2往復になるのは,サービス品質的に大きな問題があるだろう。

redux-saga とのインテグレーション例

Echo(および内包される Socket.IO)を素の Event Emitter として使うことも可能ではあるが,状態遷移を扱いやすくする場合は, redux-sagaEvent Channel などを利用することが合理的であると考えられる。

redux-saga

  • 1つの Connection の中で, 例えば users.<ユーザID> といった Echo の Channel を作成できる。認証を要求するものは Private Channel を利用する。
  • Echo は,1つのチャンネルに対して複数のイベントリスナーを設定できる。このイベントリスナーでイベントの種類ごとに Saga Event Channel を生成する。

以下にこのアーキテクチャを採用したコード例を示す。まずは EchoContainer 側から。

src/api/echo.js
/* ... */
import { eventChannel } from 'redux-saga'

class EchoContainer {

  /* ... */

  createEventChannel = (echoChannelName, echoEventName, methodName = 'channel') => {
    return eventChannel((emit) => {
      if (!this.connecting()) {
        return () => {}
      }

      // イベントリスナの登録
      const channel = this.connection[methodName](echoChannelName)
      channel.on(echoEventName, (value) => emit(value))

      // イベントに関するコールバック一覧,および最後に登録したコールバックを取得
      const callbacks = channel.events[echoEventName]
      const callback = callbacks[callbacks.length - 1]

      // ディスポーザーの定義
      return () => {
        if (!this.connecting()) {
          return
        }

        // 最後に登録したコールバックを削除する
        for (let i = callbacks.length - 1; i >= 0; --i) {
          if (callbacks[i] === callback) {
            channel.socket.removeListener(echoEventName, callback)
            callbacks.splice(i, 1)
          }
        }

        // コールバック一覧が空になったらチャンネルを破棄する
        if (callbacks.length === 0) {
          this.connection.leave(echoChannelName)
        }
      }
    })
  }

  /* ... */
}

export default new EchoContainer()

公式マニュアルを見ると listen() でイベントリスナーを設定するように説明されているが,これは暗黙的にチャンネル名を加工する効果がある。

src/channel/socketio-channel.ts
/**
 * Listen for an event on the channel instance.
 */
listen(event: string, callback: Function): SocketIoChannel {
    this.on(this.eventFormatter.format(event), callback);

    return this;
}
src/util/event-formatter.ts
/**
 * Format the given event name.
 */
format(event: string): string {
    if (event.charAt(0) === '.' || event.charAt(0) === `\\`) {
        return event.substr(1);
    } else if (this.namespace) {
        event = this.namespace + '.' + event;
    }

    return event.replace(/\./g, `\\`);
}

この処理は過去に繰り返し破壊的変更が行われており,バグの温床になっている。もしPHP側で broadcastAs() メソッドを定義して明示的にブロードキャストされる際のイベント名を指定している場合は,暗黙的変換に頼らない on() メソッドを直接利用するほうがよい。

また衝撃的なことに, Laravel Echo にはイベントリスナー1つだけを指定して破棄するインタフェースが用意されていない。このためリスナー解除時の関数として, Array#splice を使用して,少々トリッキーな実装を行っている。

続いて, Saga 側を見ていこう。細かいアクションの定義は省略するが,大まかには以下のような実装になる。

src/redux/echo.js
import { call, put, fork, take, race } from 'redux-saga/effects'
import echo from '../api/echo'

export function* runLoop(channel, actionCreator, UNLISTEN = null) {
  while (true) {
    let entity, unlisten
    if (UNLISTEN) {
      ({ payload, unlisten } = yield race({ payload: take(channel), unlisten: take(UNLISTEN) }))
      if (unlisten) break
    } else {
      entity = yield take(channel)
    }
    yield put(actionCreator(payload))
  }
  channel.close()
}

// 認証無しチャンネル
export function* forkForChannel(echoChannelName, echoEventName, actionCreator, UNLISTEN = null, methodName = 'channel') {
  const channel = yield call(echo.createEventChannel, echoChannelName, echoEventName, methodName)
  yield fork(runLoop, channel, actionCreator, UNLISTEN)
}

// 認証ありチャンネル
export function* forkForPrivateChannel(echoChannelName, echoEventName, actionCreator, UNLISTEN = null) {
  yield* forkForChannel(echoEventName, echoEventName, actionCreator, UNLISTEN, 'private')
}

/* ... */
使用例
export function *listenChannels(userId) {
  yield* forkForChannel('global', 'announcement.received', broadcastAnnouncementReceived)
  yield* forkForPrivateChannel(`users.${userId}`, 'notifications.created', broadcastNotificationCreated, UNLISTEN_USER_CHANNEL)
  yield* forkForPrivateChannel(`users.${userId}`, 'notifications.updated', broadcastNotificationUpdated, UNLISTEN_USER_CHANNEL)
  yield* forkForPrivateChannel(`users.${userId}`, 'notifications.deleted', broadcastNotificationDeleted, UNLISTEN_USER_CHANNEL)
}

引数は以下のようになる。

引数名 必須 説明
echoChannelName Echo Channel のチャンネル名
echoEventName Echo Channel の各リスナーに対応するイベント名
actionCreator イベントを受信したときにディスパッチしたい Redux アクションのアクションクリエータ
UNLISTEN リスナーを解除するアクション(指定しない場合はリッスンし続ける)

以下に UNLISTEN が指定された場合の runLoop によるループ処理の流れを説明する。

  • UNLISTEN の発生と Event Channel からのイベントの発生を同時に監視する。
    • チャンネルからのイベントが発生した場合, PHP側の broadcastWith() で指定されたペイロードがJSON変換経由で payload にセットされる。これをアクションクリエータのペイロードとしてそのまま渡し,アクションをディスパッチする。
    • UNLISTEN として指定されたイベントが発生した場合,ループを脱出して Event Channel を破棄する。

これらの一連のコードにより,PHP側からイベントがブロードキャストされたとき,指定された Redux のアクションをディスパッチすることができる。

(発展) Presence Channel への対応

アーキテクチャ的に噛み合わない部分があったので Presence Channel への対応は見送ろうと思ったが,SocketIoPresenceChannel の実装を見ると, Private Channel に少し毛が生えた程度なので以下のように少し自分で書くだけで対応できそう。

複数のチャットルームに同時に入室できるみたいな実装だと,どのチャンネルを出たのかの判定が必要なので,その場合はもうちょっと runLoop 関数が複雑になるので注意。(ここでは割愛)

export function *joinChatroom(roomId) {
  yield* forkForPrivateChannel(
    `chatrooms.${roomId}`,
    'presence:subscribed',
    (members) => initChatroomMembers(members.map(m => m.user_info)),
     LEAVE_CHATROOM,
  )
  yield* forkForPrivateChannel(
    `chatrooms.${roomId}`,
    'presence:joining',
    (member) => chatroomMemberJoining(member.user_info),
     LEAVE_CHATROOM,
  )
  yield* forkForPrivateChannel(
    `chatrooms.${roomId}`,
    'presence:leaving',
    (member) => chatroomMemberLeaving(member.user_info),
     LEAVE_CHATROOM,
  )
}
67
56
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
67
56