あるアイテムを選んだら、そのアイテムは選択済みということが他のユーザーに伝播するようなアプリを作りました。
コードはこちら
https://github.com/pokotyan/next-web-socket
redux-sagaでwebSocketを使う場合、eventChannelというのを使うんですが、これが癖がある感じだったので、今回作ったコードを元に処理の流れを説明していきたいと思います。
eventChannel:https://github.com/redux-saga/redux-saga/blob/master/docs/advanced/Channels.md
どのように作ってるか
nextとredux-saga、websocketとredis、ログインの部分にfirebaseのAuthenticationを利用しています。
まず、reduxのstoreに以下の2種類のデータを用意しています。
##reservedBox
user_idがkeyで、valueに選択したボックスのidの配列が入るオブジェクト。
reservedBoxは、ユーザーがボックスを選択するたびに値が更新され、その値を他のユーザーにブロードキャストします。
{
63LyqMoV5KcplLqEInz1qvdWQvV2: [1, 2, 3]
VNaz42PXYZUx7nmdTe4ZCd1N1qO2: [5, 6]
}
selectedBox
自身が選択したボックスのidの配列。
reservedBoxの情報があれば、selectedBoxはいらないんでは?と思うかもしれませんが、websocketのブロードキャストは"自分以外"の全員にデータを送るため、自分が選んだアイテムを当の自分が知り得ない、という状態になりました。そのため、自身が選択したボックスのid情報を別で用意しています。
[5, 6]
redisの活用
storeの情報はブラウザがリロードされればなくなってしまいます。それを防ぐため、redisにreservedBoxとselectedBoxの情報を保存しておき、ユーザーがボックスの選択を行うたびに更新します。
ブラウザがリロードされた場合、初期処理としてredisから値を取ってきてstoreに情報を埋め込む、ということをやっています。
コード
一旦クライアント側、サーバー側のコード全体を載せます。処理の流れは後述します。
client側
import { put, take, all, fork, call, select } from 'redux-saga/effects';
import { eventChannel } from 'redux-saga';
import * as socketActions from '../actions/socket';
import io from 'socket.io-client';
function createSocketConnection() {
const socket = io('http://localhost:8999');
return new Promise(resolve => {
socket.on('connect', () => {
resolve(socket);
});
});
}
// この関数は、指定されたソケットからeventChannelを作成します。
// 受信した `initReserve:receive`, `initSelected:receive`, `updateSelected:receive`, `broadCastReserve:receive` イベントに対するsubscriptionをセットアップします
function subscribe(socket) {
// `eventChannel`はsubscriber関数をとります。
// subscriber関数はsocketから渡ってきたデータをchannelに入れるために `emit`引数をとります
return eventChannel(emit => {
// handlerの役割はデータをchannelに入れることです。
// これにより、Sagaは返されたchannelからこのデータを取得できます。
// ここではreduxのactionがデータとしてchannelに入ります。
const initReserveHandler = async (reservedBox) => {
emit(socketActions.reserveUpdate({ reservedBox }));
}
const initSelectedHandler = async (selectedBox) => {
emit(socketActions.selectedUpdate({ selectedBox }));
}
const updateSelectedHandler = async (selectedBox) => {
emit(socketActions.selectedUpdate({ selectedBox }));
}
const broadCastReserveHandler = async (reservedBox) => {
emit(socketActions.reserveUpdate({ reservedBox }));
}
// subscription をセットアップ
socket.on('initReserve:receive', initReserveHandler);
socket.on('initSelected:receive', initSelectedHandler);
socket.on('updateSelected:receive', updateSelectedHandler);
socket.on('broadCastReserve:receive', broadCastReserveHandler);
// subscriberはsubscribeを解除する関数を返さなければなりません。
// これは、sagaが `channel.close`メソッドを呼び出すときに呼び出されます
const unsubscribe = () => {
socket.off('initReserve:receive', initReserveHandler);
socket.off('initSelected:receive', initSelectedHandler);
socket.off('updateSelected:receive', updateSelectedHandler);
socket.off('broadCastReserve:receive', broadCastReserveHandler);
}
return unsubscribe;
});
}
function* initStatus({ socket, userId }) {
// ブラウザがリロードされたときのためにredisから値を取ってきて、reservedBoxのstore更新
yield socket.emit('initReserve');
// ブラウザがリロードされたときのためにredisから値を取ってきて、selectedBoxのstore更新
yield socket.emit('initSelected', { userId });
}
function* syncStatus(socket) {
while (true) {
const { payload: { boxId, userId } } = yield take(socketActions.SYNC_RESERVE);
yield all([
// reserveBoxのredisを更新、更新した値をbroadcastして、他ブラウザのreseveBoxのstore更新
call(socket.emit, ['broadCastReserve', { boxId, userId }]),
// selectedBoxのredisを更新、自身のブラウザのselectedBoxのstore更新
call(socket.emit, ['updateSelected', { boxId, userId }]),
])
}
}
function* writeStatus(socket) {
const channel = yield call(subscribe, socket);
while (true) {
// subscriber関数から渡ってきたデータ(reduxのaction)を取得します。
let action = yield take(channel);
yield put(action);
}
}
function* watchOnSocket() {
while (true) {
try {
const { payload: { userId } } = yield take(socketActions.INIT_SOCKET);
const socket = yield call(createSocketConnection)
// redisから現在の状態を取得し、storeに反映させる初期化のタスク。
yield fork(initStatus, { socket, userId });
// ボックスを選択した際、自身の選択ボックスの更新と、その選択したボックス情報を他ブラウザにブロードキャストするタスク。
yield fork(syncStatus, socket);
// webSocketのイベントを待ち受け、socketから受け取ったデータを元にactionをdispatchし、storeを更新するタスク。
// 上記のinitStatus、syncStatusのタスクがemitするwebsocketのイベントはサーバー側で受信される。
// 受信後、サーバー側ではredisからのデータ取得,更新の処理が行われ、 `**:receive` のwebsocketイベントが送信される。
// その`**:receive`のイベントを待ち受けるタスク。(websocketとredux-sagaの世界を繋ぐためにeventChannelを用いる)
yield fork(writeStatus, socket);
} catch (err) {
console.error('socket error:', err)
}
}
}
export default function* rootSaga() {
yield all([
fork(watchOnSocket)
]);
}
server側
const express = require('express');
const cors = require('cors');
const app = express();
const { redisSet, redisGet } = require('./utils/redis')
const PORT = process.env.PORT || 8999;
const server = app.listen(PORT, () => {
console.log(`Server started on port ${PORT} :)`);
});
const io = require('socket.io').listen(server);
io.origins('*:*');
io.on('connection', (socket) => {
socket.on('initReserve', async () => {
const reservedBox = await getReservedBox();
socket.emit('initReserve:receive', reservedBox);
});
socket.on('initSelected', async ({ userId }) => {
const selectedBox = await getSelectedBox(userId);
socket.emit('initSelected:receive', selectedBox);
});
socket.on('broadCastReserve', async ({ boxId, userId }) => {
const reservedBox = await updateReservedBox({ boxId, userId }) || [];
socket.broadcast.emit(`broadCastReserve:receive`, JSON.parse(reservedBox));
});
socket.on('updateSelected', async ({ boxId, userId }) => {
const selectedBox = await updateSelectedBox({ boxId, userId }) || [];
socket.emit('updateSelected:receive', JSON.parse(selectedBox));
});
});
async function updateReservedBox({ boxId, userId }) {
const result = await redisGet('next-web-socket:reservedBox');
const reservedBox = JSON.parse(result) || {};
if (reservedBox[userId]) {
reservedBox[userId].push(boxId);
} else {
reservedBox[userId] = [boxId];
}
await redisSet('next-web-socket:reservedBox', JSON.stringify(reservedBox));
return redisGet('next-web-socket:reservedBox');
}
async function updateSelectedBox({ boxId, userId }) {
const result = await redisGet(`next-web-socket:selectedBox:${userId}`);
const selectedBox = JSON.parse(result) || [];
selectedBox.push(boxId);
await redisSet(`next-web-socket:selectedBox:${userId}`, JSON.stringify(selectedBox));
return redisGet(`next-web-socket:selectedBox:${userId}`);
}
async function getReservedBox() {
const reservedBox = await redisGet('next-web-socket:reservedBox');
return JSON.parse(reservedBox) || {};
}
async function getSelectedBox(userId) {
const selectedBox = await redisGet(`next-web-socket:selectedBox:${userId}`);
return JSON.parse(selectedBox) || [];
}
処理の流れ
ルートのコンポーネントのcomponentDidMountで、initSocketのactionが発火します。
class App extends Component {
componentDidMount() {
const { auth, router, socketActions } = this.props;
if (!auth.isAutherized) {
router.push('/login');
} else {
socketActions.initSocket({
userId: auth.user.uid
});
}
}
initSocketのaction発火で以下の3つのsagaタスク(initStatus、syncStatus、writeStatus)が動きます。
function* watchOnSocket() {
while (true) {
try {
const { payload: { userId } } = yield take(socketActions.INIT_SOCKET);
const socket = yield call(createSocketConnection)
yield fork(initStatus, { socket, userId });
yield fork(syncStatus, socket);
yield fork(writeStatus, socket);
} catch (err) {
throw err;
}
}
}
initStatus
initStatusのタスクはredisの値を初期値としてstoreに保存することです。
initReserveとinitSelectedのwebsocketのeventをサーバー側に送信します。
function* initStatus({ socket, userId }) {
yield socket.emit('initReserve');
yield socket.emit('initSelected', { userId });
}
server側でinitReserveとinitSelectedのeventを受信します。
redisから初期値を取得し、*:receiveのeventをclient側に送信します。
socket.on('initReserve', async () => {
const reservedBox = await getReservedBox(); // redisからreservedBoxのデータを取ってくる関数
socket.emit('initReserve:receive', reservedBox);
});
socket.on('initSelected', async ({ userId }) => {
const selectedBox = await getSelectedBox(userId); // redisからselectedBoxのデータを取ってくる関数
socket.emit('initSelected:receive', selectedBox);
});
client側で*:receiveのeventを受信します。
サーバー側から受け取った値がpayloadに含まれたactionオブジェクトをemitします。
function subscribe(socket) {
return eventChannel(emit => {
const initReserveHandler = async (reservedBox) => {
emit(socketActions.reserveUpdate({ reservedBox }));
}
const initSelectedHandler = async (selectedBox) => {
emit(socketActions.selectedUpdate({ selectedBox }));
}
socket.on('initReserve:receive', initReserveHandler);
socket.on('initSelected:receive', initSelectedHandler);
eventChannelでemitされた値はsagaのtakeで待ち受けることができます。
takeで取得したactionオブジェクトをputに渡してactionをdispatchします。
function* writeStatus(socket) {
const channel = yield call(subscribe, socket);
while (true) {
const action = yield take(channel);
yield put(action);
}
}
reducerでstoreが更新されます。
ここまでがinitStatusのタスクの流れです。
import * as socketActions from '../actions/socket';
const initialState = {
reservedBox: {},
selectedBox: []
};
export default (state = initialState, action) => {
switch (action.type) {
case socketActions.RESERVE_UPDATE:
return Object.assign({}, state, {
reservedBox: action.payload.reservedBox
});
case socketActions.SELECTED_UPDATE:
return Object.assign({}, state, {
selectedBox: action.payload.selectedBox
});
default:
return state;
}
};
syncStatus
syncStatusのタスクはSYNC_RESERVEアクションの発火をトリガーに動きます。
function* syncStatus(socket) {
while (true) {
const { payload: { boxId, userId } } = yield take(socketActions.SYNC_RESERVE);
// ...
SYNC_RESERVEアクションはボックスが選択されるたびに発火します。
export default class Box extends Component {
handleClick = () => {
this.props.socketActions.syncReserve({
boxId: this.props.id,
userId: this.props.user.uid
});
}
syncStatusにはpayloadとして、選択したボックスid、ユーザーidが渡ってきます。
ボックスid,ユーザーidを引数にbroadCastReserveとupdateSelectedのイベントをサーバー側へ送信します。
function* syncStatus(socket) {
while (true) {
const { payload: { boxId, userId } } = yield take(socketActions.SYNC_RESERVE);
yield all([
// reserveBoxのredisを更新、更新した値をbroadcastして、他ブラウザのreseveBoxのstore更新
call(socket.emit, ['broadCastReserve', { boxId, userId }]),
// selectedBoxのredisを更新、自身のブラウザのselectedBoxのstore更新
call(socket.emit, ['updateSelected', { boxId, userId }]),
])
}
}
broadCastReserveとupdateSelectedのイベントをサーバー側で受信します。
reservedBoxとselectedBoxのredisの値を更新し、更新後の値をbroadCastReserve:receive、updateSelected:receiveのイベントに載せてクライアント側に送信します。
socket.on('broadCastReserve', async ({ boxId, userId }) => {
// updateReservedBoxは引数のboxIdと現在のreservedBoxのredisの値をマージして返す
const reservedBox = await updateReservedBox({ boxId, userId }) || [];
socket.broadcast.emit(`broadCastReserve:receive`, JSON.parse(reservedBox));
});
socket.on('updateSelected', async ({ boxId, userId }) => {
// updateSelectedBoxは引数のboxIdと現在のselectedBoxのredisの値をマージして返す
const selectedBox = await updateSelectedBox({ boxId, userId }) || [];
socket.emit('updateSelected:receive', JSON.parse(selectedBox));
});
クライアント側でbroadCastReserve:receive、updateSelected:receiveのイベントを受信します。
サーバー側から受け取った値をpayloadに含んだactionオブジェクトをemitします。
function subscribe(socket) {
return eventChannel(emit => {
const updateSelectedHandler = async (selectedBox) => {
emit(socketActions.selectedUpdate({ selectedBox }));
}
const broadCastReserveHandler = async (reservedBox) => {
emit(socketActions.reserveUpdate({ reservedBox }));
}
socket.on('updateSelected:receive', updateSelectedHandler);
socket.on('broadCastReserve:receive', broadCastReserveHandler);
eventChannelでemitされた値はsagaのtakeで待ち受けることができます。
takeで取得したactionオブジェクトをputに渡してactionをdispatchします。
function* writeStatus(socket) {
const channel = yield call(subscribe, socket);
while (true) {
const action = yield take(channel);
yield put(action);
}
}
reducerでstoreが更新されます。
ここまでがsyncStatusのタスクの流れです。
import * as socketActions from '../actions/socket';
const initialState = {
reservedBox: {},
selectedBox: []
};
export default (state = initialState, action) => {
switch (action.type) {
case socketActions.RESERVE_UPDATE:
return Object.assign({}, state, {
reservedBox: action.payload.reservedBox
});
case socketActions.SELECTED_UPDATE:
return Object.assign({}, state, {
selectedBox: action.payload.selectedBox
});
default:
return state;
}
};
writeStatus
writeStatusのタスクの役割は、eventChannelのsubscriber関数からemitで渡ってくるactionオブジェクトをtakeで待ち受け、dispatchすることです。
function subscribe(socket) {
return eventChannel(emit => {
const initReserveHandler = async (reservedBox) => {
emit(socketActions.reserveUpdate({ reservedBox })); // 1) emitの引数に渡したactionオブジェクトは。。。。
}
const initSelectedHandler = async (selectedBox) => {
emit(socketActions.selectedUpdate({ selectedBox }));
}
const updateSelectedHandler = async (selectedBox) => {
emit(socketActions.selectedUpdate({ selectedBox }));
}
const broadCastReserveHandler = async (reservedBox) => {
emit(socketActions.reserveUpdate({ reservedBox }));
}
socket.on('initReserve:receive', initReserveHandler);
socket.on('initSelected:receive', initSelectedHandler);
socket.on('updateSelected:receive', updateSelectedHandler);
socket.on('broadCastReserve:receive', broadCastReserveHandler);
const unsubscribe = () => {
socket.off('initReserve:receive', initReserveHandler);
socket.off('initSelected:receive', initSelectedHandler);
socket.off('updateSelected:receive', updateSelectedHandler);
socket.off('broadCastReserve:receive', broadCastReserveHandler);
}
return unsubscribe;
});
}
function* writeStatus(socket) {
const channel = yield call(subscribe, socket);
while (true) {
const action = yield take(channel); // 2) ここで取得できる。
yield put(action); // 3) actionをdispatchする。
}
}
最後に
redux-sagaとwebsocketを組み合わそうとしている誰かの助けになれば幸いです!