【2021-11-29 追記】
フロントエンドで Redux-Saga が採用されるケースが限定的になってきているので,フロントエンド結合部分に関してはレガシーな内容だと思って軽く読み飛ばしてください。
はじめに
前回同様,基本はマニュアルを参照すべし。以下補足説明
キューのアーキテクチャ
名称 | 説明 |
---|---|
キュー | バックグラウンドジョブが突っ込まれる場所 |
ワーカー | バックグラウンドジョブを取り出して消化するプロセス |
コネクション | バックグラウンドジョブを突っ込むための入口 |
バス | 複数のコネクションを束ねる,入口のための入口 |
- コネクションは複数のキューを持つことができる。一般的に,ジョブの種類や優先度によってキューを分岐することが多い。
- キューとワーカーは一対一で対応するとは限らず,「どのワーカーがどのキューを処理するか」ということに関して任意の組み合わせが可能である。
総称として,バスを除くこれらの仕組みをまとめて「キュー」と呼ぶこともある。
ジョブのアーキテクチャ
ジョブをディスパッチする方法
マニュアルには
// このジョブはデフォルトキューへ送られる Job::dispatch(); // このジョブは"emails"キューへ送られる Job::dispatch()->onQueue('emails');
のようなコードが書かれているが,これは Artisan コマンドで生成したジョブクラスに最初からミックスインされている Dispatchable
トレイトに実装されているメソッドである。
<?php
namespace Illuminate\Foundation\Bus;
use Illuminate\Contracts\Bus\Dispatcher;
class PendingDispatch
{
/**
* The job.
*
* @var mixed
*/
protected $job;
/**
* Create a new pending job dispatch.
*
* @param mixed $job
* @return void
*/
public function __construct($job)
{
$this->job = $job;
}
/* ... */
/**
* Handle the object's destruction.
*
* @return void
*/
public function __destruct()
{
app(Dispatcher::class)->dispatch($this->job);
}
}
メソッドチェーンを通じて設定を行い,チェーンが切れたタイミングでディスパッチされる。最終的にやっていることは
app(\Illuminate\Contracts\Bus\Dispatcher::class)->dispatch($job);
だ。ちなみに Illuminate\Contracts\Bus\Dispatcher
に対応するファサード Illuminate\Support\Facades\Bus
が存在しているので,ファサードを使って
Bus::dispatch($job);
と書くこともできる。
なお,ディスパッチされたジョブは,\Illuminate\Contracts\Queue\ShouldQueue
を実装しているかどうかによって,直ちに実行するか,一旦キューに投入するかの分岐が行われる。必ずバックグラウンドジョブにしたい場合は実装しておこう。
「ジョブ」という言葉の定義
Laravel はジョブの仕組みをレイヤー化しているが,重複して同じ「ジョブ」という名称で呼んでいるものがあるため,フレームワークのコードを読むと混乱しやすい。ここではレイヤー別にジョブについての説明を行う。
それぞれ,インスタンスが生成されるタイミングに注意されたい。
名称 | レイヤー | 予約時のインスタンス生成 | 実行時のインスタンス生成 |
---|---|---|---|
アプリケーションジョブ | 高 | ✔ | ✔ (デシリアライズ) |
ラッパージョブ | 中 | ✔ | |
ドライバジョブ | 低 | ✔ |
A. キューに投入されるまでの大まかな流れ
- アプリケーションジョブのインスタンスを生成し,ディスパッチ処理を開始。
- アプリケーションジョブが
serialize()
関数でシリアライズされる。 - 更にシリアルを固定の**ラッパージョブ名
CallQueuedHandler@call
**とともにJSONとしてラップし,これがペイロードとなってキューに投入される。
B. キューから取り出されて実行されるされるまでの大まかな流れ
- キューからペイロードが取り出され, ドライバジョブのインスタンスが生成される。
-
ドライバジョブの**
fire()
**メソッドが実行される。 - **ラッパージョブ
CallQueuedHandler
のインスタンスが生成され, アプリケーションジョブのシリアルを引数としてcall()
**メソッドが実行される。 -
unserialize()
関数でアプリケーションジョブがデシリアライズされる。 -
アプリケーションジョブの**
handle()
**メソッドが実行される。
アプリケーションジョブ
./artisan make:job
で作る,通常 App\Jobs\
名前空間に配置されるジョブのことである。マニュアルが「ジョブ」と言ったら基本的にこれのことを指している。
<?php namespace App\Jobs; /* ... */ class ProcessPodcast implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 新しいジョブインスタンスの生成 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * ジョブの実行 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // アップロード済みポッドキャストの処理… } }
参考程度に,トレイトの解説も行っておく。ちなみにこれらのトレイトはすべて任意使用であって,必須ではない。
トレイト | 所属サービス | 役割 |
---|---|---|
Dispatchable |
Bus | ディスパッチ処理をインスタンスメソッドから呼べるようにする |
InteractsWithQueue |
Queue | ドライバジョブにアプリケーションジョブからアクセスできるようにする |
Queueable |
Bus | 使用するコネクション・キューをデフォルトから変更できるようにする |
SerializesModels |
Queue | Eloquent モデルオブジェクト自体をシリアライズせず,主キーのみをシリアライズし,デシリアライズされるときに再度クエリを投げて取得する (これが無いと,デシリアライズされたオブジェクトは読み取り専用になってしまう) |
ラッパージョブ
基本的に Illuminate\Queue\CallQueuedHandler::call()
で固定。以下のような処理を行うために,アプリケーションジョブをラップする形で使用される。
- アプリケーションジョブのシリアライズ・デシリアライズ
-
InteractsWithQueue
を使用しているアプリケーションジョブにドライバジョブをセットする - チェインされたアプリケーションジョブの継続処理
- 完了または失敗したドライバジョブの削除処理
-
SerializesModels
を実装したアプリケーションジョブのデシリアライズでModelNotFoundException
が発生したとき, 以下のデフォルトプロパティ宣言があれば,ジョブを失敗と見なさずに直ちに削除する
public $deleteWhenMissingModels = true;
ドライバジョブ
ドライバジョブは, Job
契約 を実装している。参考程度に,ドライバジョブとしてフレームワークによって定義されている例を挙げておく。
なおこれらは,基底抽象クラス Job
を利用して,契約を利用しつつもコードのコピペを避けるように工夫されている。
イベントのアーキテクチャ
イベントは,一見ジョブと似ているようにも思えるが,発生したときの処理を外部に移譲するという点で決定的に異なる。その処理を行うのはリスナーだ。どちらかといえば,リスナーのほうが素のジョブに近いだろう。
以下では,リスナーに ShouldQueue
が実装されている場合の図解を示す。
A. リスナーがキューに投入されるまでの大まかな流れ
- イベントが発生する。
- 呼び出すリスナーの情報を引数として
CallQueuedListener
クラスのインスタンスが生成され,以降アプリケーションジョブとして処理される。 この際,"displayName"
にリスナーのクラス名が自動的に設定され,デバッグ情報としてワーカーのログに表示されるようになる。
B. キューから取り出されて実行されるされるまでの大まかな流れ
- アプリケーションジョブ
CallQueuedListener
のインスタンスが復元される。 -
handle()
メソッドが実行される。 - 元のリスナーのインスタンスが生成され,その
handle()
メソッドが実行される。
ブロードキャストのアーキテクチャ
ブロードキャストなのにキューが必要な理由?
通常の ShouldBroadcast
契約をイベントに実装している場合,イベントをブロードキャストするためには
- キュードライバ
- ブロードキャストドライバ
の2つが必要だ。どうしてそうなっているのかを考えてみよう。
以下にそれぞれのドライバを列挙する。なお,処理が同期的に行われてキューの意味が無いものであったり,ログを取るだけもしくは何もしない,といった選択肢は除外している。
キュードライバ一覧
- database
- beanstalkd
- sqs
- redis
ブロードキャストドライバ一覧
- pusher
- redis
この中に1つ明らかに浮いているものがある。それは Pusher だ。 **Pusher はAPI経由で外部サービスとして利用するしかないため,通信のオーバーヘッドがそれなりに発生する。**SQS も AWS のサービスではあるが,AWS でインフラを構築している場合しか使用しないので, 外部サービスではないと考えられる。
もし Pusher をブロードキャストドライバに選択した場合,ブロードキャスト処理を行うだけでリクエストに対する応答が遅くなってしまう。これを克服するためにはブロードキャスト処理をバックグラウンドで行う必要があり,そのためのキュー投入処理があるのだ。
逆に言えば,ブロードキャストドライバに Redis を利用している場合,IO が非常に軽いため,キューに投入する処理は完全に無駄になってしまう。(もしキュードライバにも Redis を利用していたら, Redis と無駄に2回通信することになってしまう)
**ブロードキャストドライバに Redis を採用した場合, ShouldBroadcast
は一切使用せず,すべて ShouldBroadcastNow
に統一すべきだ。**但し,イベントの broadcastWith
メソッドの処理が極端に重くなったりする場合はこの限りではない。
ShouldBroadcast
を実装したイベント
ブロードキャストドライバに Pusher を使用する場合の図解を示す。
いったんキューで寝かせて,出番が来たら Pusher に丸投げ。
ShouldBroadcastNow
を実装したイベント
ブロードキャストドライバに Redis + Laravel Echo Server を使用する場合の図解を示す。
Redis に直接 PUBLISH
して, Laravel Echo Server が SUBSCRIBE
する形になる。
特定イベントに関して toOthers()
メソッドを常に呼ぶ
しかし、
broadcast
関数には、ブロードキャストの受取人から現在のユーザーを除外できる、toOthers
メソッドが用意されています。broadcast(new ShippingStatusUpdated($update))->toOthers();
特定のイベントに関して常に toOthers()
を利用したい場合,イベントのコンストラクタで dontBroadcastToCurrentUser()
をコールするとよい。 以下に例を示す。
<?php
declare(strict_types=1);
namespace App\Events;
use App\Post;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Queue\SerializesModels;
class BroadcastPostCreated implements ShouldBroadcastNow
{
use SerializesModels, InteractsWithSockets;
/**
* @var Post
*/
public $post;
public function __construct(Post $post)
{
$this->dontBroadcastToCurrentUser();
$this->post = $post;
}
public function broadcastWith(): array
{
return $this->post->toArray();
}
public function broadcastAs(): string
{
return 'post.created';
}
public function broadcastOn(): array
{
/* ... */
}
}
toOthers()
はこれを呼んでいる。
/**
* Broadcast the event to everyone except the current user.
*
* @return $this
*/
public function toOthers()
{
if (method_exists($this->event, 'dontBroadcastToCurrentUser')) {
$this->event->dontBroadcastToCurrentUser();
}
return $this;
}
このメソッドは InteractsWithSockets
トレイトによって提供される。
/**
* Exclude the current user from receiving the broadcast.
*
* @return $this
*/
public function dontBroadcastToCurrentUser()
{
$this->socket = Broadcast::socket();
return $this;
}
Broadcast::socket()
の実態は BroadcastManager
に実装されている。
/**
* Get the socket ID for the given request.
*
* @param \Illuminate\Http\Request|null $request
* @return string|null
*/
public function socket($request = null)
{
if (! $request && ! $this->app->bound('request')) {
return;
}
$request = $request ?: $this->app['request'];
return $request->header('X-Socket-ID');
}
Laravel Echo を用いたフロントエンドとのつなぎ込み
PHP側からある特定のイベントがブロードキャストされたとき,フロントエンド側で対応する Redux のアクションをディスパッチさせることを考える。
接続状態のコントロール
Echo
は生の WebSocket
同様,インスタンスを生成した瞬間に接続試行されるため, Redux などで状態遷移を管理している場合には扱いにくい。ラップした上で, connect()
メソッドを呼んで接続するようにすると扱いやすい。以下に Socket.IO を使用する場合の実装例を示す。
import io from 'socket.io-client'
import Echo from 'laravel-echo'
/* ... */
class EchoContainer {
connection = null
connecting = () => {
return !!this.connection
}
connect = () => {
this.connection = new Echo({
broadcaster: 'socket.io',
client: io,
host: 'https://example.com',
})
return new Promise((resolve, reject) => {
if (!this.connecting()) {
throw new Error('socket already disconnected')
}
this.connection.connector.socket.on('connect', resolve)
this.connection.connector.socket.on('error', reject)
})
}
disconnect = () => {
if (this.connecting()) {
this.connection.disconnect()
this.connection = null
}
}
/* ... */
}
export default new EchoContainer()
※ this
のバインドを省略できるよう,インスタンスごとに関数オブジェクトを作る書き方をしている
axios
からのリクエストへの X-Socket-Id
ヘッダの付与
既に紹介したように,Laravel のブロードキャストは,「現在のリクエストを送ってきた相手にはブロードキャストしない」という機能を持っているが,これに必要な X-Socket-Id
として付与すべきIDは以下のように取り出せる。これもラップしたクラスに実装するとよい。以下に axios
を利用している場合の実装例を示す。
/* ... */
class EchoContainer {
/* ... */
interceptor = (config) => {
if (this.connecting() && config.method !== 'get') {
const socketId = this.connection.connector.socketId()
if (socketId) {
config.headers['X-Socket-Id'] = socketId
}
}
return config
}
/* ... */
}
export default new EchoContainer()
import axios from 'axios'
import echo from './echo'
const app = axios.create()
app.interceptors.request.use(echo.interceptor)
/* ... */
export default app
基本的に GET メソッドのエンドポイントアクセスでブロードキャストが発生することは無いので,そのときのみ除外している。これは,APIサーバがクロスオリジンである場合,独自ヘッダーが使用されるとプリフライトリクエストが飛んでしまうのを避ける意図がある。GET アクセスで毎回通信2往復になるのは,サービス品質的に大きな問題があるだろう。
redux-saga
とのインテグレーション例
Echo(および内包される Socket.IO)を素の Event Emitter として使うことも可能ではあるが,状態遷移を扱いやすくする場合は, redux-saga
の Event Channel などを利用することが合理的であると考えられる。
- 1つの Connection の中で, 例えば
users.<ユーザID>
といった Echo の Channel を作成できる。認証を要求するものは Private Channel を利用する。 - Echo は,1つのチャンネルに対して複数のイベントリスナーを設定できる。このイベントリスナーでイベントの種類ごとに Saga Event Channel を生成する。
以下にこのアーキテクチャを採用したコード例を示す。まずは EchoContainer
側から。
/* ... */
import { eventChannel } from 'redux-saga'
class EchoContainer {
/* ... */
createEventChannel = (echoChannelName, echoEventName, methodName = 'channel') => {
return eventChannel((emit) => {
if (!this.connecting()) {
return () => {}
}
// イベントリスナの登録
const channel = this.connection[methodName](echoChannelName)
channel.on(echoEventName, (value) => emit(value))
// イベントに関するコールバック一覧,および最後に登録したコールバックを取得
const callbacks = channel.events[echoEventName]
const callback = callbacks[callbacks.length - 1]
// ディスポーザーの定義
return () => {
if (!this.connecting()) {
return
}
// 最後に登録したコールバックを削除する
for (let i = callbacks.length - 1; i >= 0; --i) {
if (callbacks[i] === callback) {
channel.socket.removeListener(echoEventName, callback)
callbacks.splice(i, 1)
}
}
// コールバック一覧が空になったらチャンネルを破棄する
if (callbacks.length === 0) {
this.connection.leave(echoChannelName)
}
}
})
}
/* ... */
}
export default new EchoContainer()
公式マニュアルを見ると listen()
でイベントリスナーを設定するように説明されているが,これは暗黙的にチャンネル名を加工する効果がある。
/**
* Listen for an event on the channel instance.
*/
listen(event: string, callback: Function): SocketIoChannel {
this.on(this.eventFormatter.format(event), callback);
return this;
}
/**
* Format the given event name.
*/
format(event: string): string {
if (event.charAt(0) === '.' || event.charAt(0) === `\\`) {
return event.substr(1);
} else if (this.namespace) {
event = this.namespace + '.' + event;
}
return event.replace(/\./g, `\\`);
}
この処理は**過去に繰り返し破壊的変更が行われており,バグの温床になっている。**もしPHP側で broadcastAs()
メソッドを定義して明示的にブロードキャストされる際のイベント名を指定している場合は,暗黙的変換に頼らない on()
メソッドを直接利用するほうがよい。
また衝撃的なことに, Laravel Echo にはイベントリスナー1つだけを指定して破棄するインタフェースが用意されていない。このためリスナー解除時の関数として, Array#splice
を使用して,少々トリッキーな実装を行っている。
続いて, Saga 側を見ていこう。細かいアクションの定義は省略するが,大まかには以下のような実装になる。
import { call, put, fork, take, race } from 'redux-saga/effects'
import echo from '../api/echo'
export function* runLoop(channel, actionCreator, UNLISTEN = null) {
while (true) {
let entity, unlisten
if (UNLISTEN) {
({ payload, unlisten } = yield race({ payload: take(channel), unlisten: take(UNLISTEN) }))
if (unlisten) break
} else {
entity = yield take(channel)
}
yield put(actionCreator(payload))
}
channel.close()
}
// 認証無しチャンネル
export function* forkForChannel(echoChannelName, echoEventName, actionCreator, UNLISTEN = null, methodName = 'channel') {
const channel = yield call(echo.createEventChannel, echoChannelName, echoEventName, methodName)
yield fork(runLoop, channel, actionCreator, UNLISTEN)
}
// 認証ありチャンネル
export function* forkForPrivateChannel(echoChannelName, echoEventName, actionCreator, UNLISTEN = null) {
yield* forkForChannel(echoEventName, echoEventName, actionCreator, UNLISTEN, 'private')
}
/* ... */
export function *listenChannels(userId) {
yield* forkForChannel('global', 'announcement.received', broadcastAnnouncementReceived)
yield* forkForPrivateChannel(`users.${userId}`, 'notifications.created', broadcastNotificationCreated, UNLISTEN_USER_CHANNEL)
yield* forkForPrivateChannel(`users.${userId}`, 'notifications.updated', broadcastNotificationUpdated, UNLISTEN_USER_CHANNEL)
yield* forkForPrivateChannel(`users.${userId}`, 'notifications.deleted', broadcastNotificationDeleted, UNLISTEN_USER_CHANNEL)
}
引数は以下のようになる。
引数名 | 必須 | 説明 |
---|---|---|
echoChannelName |
✔ | Echo Channel のチャンネル名 |
echoEventName |
✔ | Echo Channel の各リスナーに対応するイベント名 |
actionCreator |
✔ | イベントを受信したときにディスパッチしたい Redux アクションのアクションクリエータ |
UNLISTEN |
リスナーを解除するアクション(指定しない場合はリッスンし続ける) |
以下に UNLISTEN
が指定された場合の runLoop
によるループ処理の流れを説明する。
-
UNLISTEN
の発生と Event Channel からのイベントの発生を同時に監視する。- チャンネルからのイベントが発生した場合, PHP側の
broadcastWith()
で指定されたペイロードがJSON変換経由でpayload
にセットされる。これをアクションクリエータのペイロードとしてそのまま渡し,アクションをディスパッチする。 -
UNLISTEN
として指定されたイベントが発生した場合,ループを脱出して Event Channel を破棄する。
- チャンネルからのイベントが発生した場合, PHP側の
これらの一連のコードにより,PHP側からイベントがブロードキャストされたとき,指定された Redux のアクションをディスパッチすることができる。
(発展) Presence Channel への対応
アーキテクチャ的に噛み合わない部分があったので Presence Channel への対応は見送ろうと思ったが,SocketIoPresenceChannel
の実装を見ると, Private Channel に少し毛が生えた程度なので以下のように少し自分で書くだけで対応できそう。
複数のチャットルームに同時に入室できるみたいな実装だと,どのチャンネルを出たのかの判定が必要なので,その場合はもうちょっと runLoop
関数が複雑になるので注意。(ここでは割愛)
export function *joinChatroom(roomId) {
yield* forkForPrivateChannel(
`chatrooms.${roomId}`,
'presence:subscribed',
(members) => initChatroomMembers(members.map(m => m.user_info)),
LEAVE_CHATROOM,
)
yield* forkForPrivateChannel(
`chatrooms.${roomId}`,
'presence:joining',
(member) => chatroomMemberJoining(member.user_info),
LEAVE_CHATROOM,
)
yield* forkForPrivateChannel(
`chatrooms.${roomId}`,
'presence:leaving',
(member) => chatroomMemberLeaving(member.user_info),
LEAVE_CHATROOM,
)
}