あるアイテムを選んだら、そのアイテムは選択済みということが他のユーザーに伝播するようなアプリを作りました。
コードはこちら
https://github.com/pokotyan/next-web-socket
redux-sagaでwebSocketを使う場合、eventChannelというのを使うんですが、これが癖がある感じだったので、今回作ったコードを元に処理の流れを説明していきたいと思います。
eventChannel:https://github.com/redux-saga/redux-saga/blob/master/docs/advanced/Channels.md
どのように作ってるか
nextとredux-saga、websocketとredis、ログインの部分にfirebaseのAuthenticationを利用しています。
まず、reduxのstoreに以下の2種類のデータを用意しています。
reservedBox
user_idがkeyで、valueに選択したボックスのidの配列が入るオブジェクト。
reservedBoxは、ユーザーがボックスを選択するたびに値が更新され、その値を他のユーザーにブロードキャストします。
{
  63LyqMoV5KcplLqEInz1qvdWQvV2: [1, 2, 3]
  VNaz42PXYZUx7nmdTe4ZCd1N1qO2: [5, 6]
}
selectedBox
自身が選択したボックスのidの配列。
reservedBoxの情報があれば、selectedBoxはいらないんでは?と思うかもしれませんが、websocketのブロードキャストは"自分以外"の全員にデータを送るため、自分が選んだアイテムを当の自分が知り得ない、という状態になりました。そのため、自身が選択したボックスのid情報を別で用意しています。
[5, 6]
redisの活用
storeの情報はブラウザがリロードされればなくなってしまいます。それを防ぐため、redisにreservedBoxとselectedBoxの情報を保存しておき、ユーザーがボックスの選択を行うたびに更新します。
ブラウザがリロードされた場合、初期処理としてredisから値を取ってきてstoreに情報を埋め込む、ということをやっています。
コード
一旦クライアント側、サーバー側のコード全体を載せます。処理の流れは後述します。
client側
import { put, take, all, fork, call, select } from 'redux-saga/effects';
import { eventChannel } from 'redux-saga';
import * as socketActions from '../actions/socket';
import io from 'socket.io-client';
function createSocketConnection() {
  const socket = io('http://localhost:8999');
  return new Promise(resolve => {
    socket.on('connect', () => {
      resolve(socket);
    });
  });
}
// この関数は、指定されたソケットからeventChannelを作成します。
// 受信した `initReserve:receive`, `initSelected:receive`, `updateSelected:receive`, `broadCastReserve:receive` イベントに対するsubscriptionをセットアップします
function subscribe(socket) {
  // `eventChannel`はsubscriber関数をとります。
  // subscriber関数はsocketから渡ってきたデータをchannelに入れるために `emit`引数をとります
  return eventChannel(emit => {
    // handlerの役割はデータをchannelに入れることです。
    // これにより、Sagaは返されたchannelからこのデータを取得できます。
    // ここではreduxのactionがデータとしてchannelに入ります。
    const initReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox }));
    }
    const initSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }
    const updateSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }
    const broadCastReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox }));
    }
    // subscription をセットアップ
    socket.on('initReserve:receive', initReserveHandler);
    socket.on('initSelected:receive', initSelectedHandler);
    socket.on('updateSelected:receive', updateSelectedHandler);
    socket.on('broadCastReserve:receive', broadCastReserveHandler);
    // subscriberはsubscribeを解除する関数を返さなければなりません。
    // これは、sagaが `channel.close`メソッドを呼び出すときに呼び出されます
    const unsubscribe = () => {
      socket.off('initReserve:receive', initReserveHandler);
      socket.off('initSelected:receive', initSelectedHandler);
      socket.off('updateSelected:receive', updateSelectedHandler);
      socket.off('broadCastReserve:receive', broadCastReserveHandler);
    }
    return unsubscribe;
  });
}
function* initStatus({ socket, userId }) {
  // ブラウザがリロードされたときのためにredisから値を取ってきて、reservedBoxのstore更新
  yield socket.emit('initReserve');
  // ブラウザがリロードされたときのためにredisから値を取ってきて、selectedBoxのstore更新
  yield socket.emit('initSelected', { userId });
}
function* syncStatus(socket) {
  while (true) {
    const { payload: { boxId, userId } } = yield take(socketActions.SYNC_RESERVE);
    yield all([
      // reserveBoxのredisを更新、更新した値をbroadcastして、他ブラウザのreseveBoxのstore更新
      call(socket.emit, ['broadCastReserve', { boxId, userId }]),
      // selectedBoxのredisを更新、自身のブラウザのselectedBoxのstore更新
      call(socket.emit, ['updateSelected', { boxId, userId }]),
    ])
  }
}
function* writeStatus(socket) {
  const channel = yield call(subscribe, socket);
  while (true) {
    // subscriber関数から渡ってきたデータ(reduxのaction)を取得します。
    let action = yield take(channel);
    yield put(action);
  }
}
function* watchOnSocket() {
  while (true) {
    try {
      const { payload: { userId } } = yield take(socketActions.INIT_SOCKET);
      const socket = yield call(createSocketConnection)
      // redisから現在の状態を取得し、storeに反映させる初期化のタスク。
      yield fork(initStatus, { socket, userId });
      // ボックスを選択した際、自身の選択ボックスの更新と、その選択したボックス情報を他ブラウザにブロードキャストするタスク。
      yield fork(syncStatus, socket);
      // webSocketのイベントを待ち受け、socketから受け取ったデータを元にactionをdispatchし、storeを更新するタスク。
      // 上記のinitStatus、syncStatusのタスクがemitするwebsocketのイベントはサーバー側で受信される。
      // 受信後、サーバー側ではredisからのデータ取得,更新の処理が行われ、 `**:receive` のwebsocketイベントが送信される。
      // その`**:receive`のイベントを待ち受けるタスク。(websocketとredux-sagaの世界を繋ぐためにeventChannelを用いる)
      yield fork(writeStatus, socket);
    } catch (err) {
      console.error('socket error:', err)
    }
  }
}
export default function* rootSaga() {
  yield all([
    fork(watchOnSocket)
  ]);
}
server側
const express = require('express');
const cors = require('cors');
const app = express();
const { redisSet, redisGet } = require('./utils/redis')
const PORT = process.env.PORT || 8999;
const server = app.listen(PORT, () => {
  console.log(`Server started on port ${PORT} :)`);
});
const io = require('socket.io').listen(server);
io.origins('*:*');
io.on('connection', (socket) => {
  socket.on('initReserve', async () => {
    const reservedBox = await getReservedBox();
    socket.emit('initReserve:receive', reservedBox);
  });
  socket.on('initSelected', async ({ userId }) => {
    const selectedBox = await getSelectedBox(userId);
    socket.emit('initSelected:receive', selectedBox);
  });
  socket.on('broadCastReserve', async ({ boxId, userId }) => {
    const reservedBox = await updateReservedBox({ boxId, userId }) || [];
    socket.broadcast.emit(`broadCastReserve:receive`, JSON.parse(reservedBox));
  });
  socket.on('updateSelected', async ({ boxId, userId }) => {
    const selectedBox = await updateSelectedBox({ boxId, userId }) || [];
    socket.emit('updateSelected:receive', JSON.parse(selectedBox));
  });
});
async function updateReservedBox({ boxId, userId }) {
  const result = await redisGet('next-web-socket:reservedBox');
  const reservedBox = JSON.parse(result) || {};
  if (reservedBox[userId]) {
    reservedBox[userId].push(boxId);
  } else {
    reservedBox[userId] = [boxId];
  }
  await redisSet('next-web-socket:reservedBox', JSON.stringify(reservedBox));
  return redisGet('next-web-socket:reservedBox');  
}
async function updateSelectedBox({ boxId, userId }) {
  const result = await redisGet(`next-web-socket:selectedBox:${userId}`);
  const selectedBox = JSON.parse(result) || [];
  selectedBox.push(boxId);
  await redisSet(`next-web-socket:selectedBox:${userId}`, JSON.stringify(selectedBox));
  return redisGet(`next-web-socket:selectedBox:${userId}`);  
}
async function getReservedBox() {
  const reservedBox = await redisGet('next-web-socket:reservedBox');
  return JSON.parse(reservedBox) || {};
}
async function getSelectedBox(userId) {
  const selectedBox = await redisGet(`next-web-socket:selectedBox:${userId}`);
  return JSON.parse(selectedBox) || [];
}
処理の流れ
ルートのコンポーネントのcomponentDidMountで、initSocketのactionが発火します。
class App extends Component {
  componentDidMount() {
    const { auth, router, socketActions } = this.props;
    if (!auth.isAutherized) {
      router.push('/login');
    } else {
      socketActions.initSocket({
        userId: auth.user.uid
      });
    }
  }
initSocketのaction発火で以下の3つのsagaタスク(initStatus、syncStatus、writeStatus)が動きます。
function* watchOnSocket() {
  while (true) {
    try {
      const { payload: { userId } } = yield take(socketActions.INIT_SOCKET);
      const socket = yield call(createSocketConnection)
      yield fork(initStatus, { socket, userId });
      yield fork(syncStatus, socket);
      yield fork(writeStatus, socket);
    } catch (err) {
      throw err;
    }
  }
}
initStatus
initStatusのタスクはredisの値を初期値としてstoreに保存することです。
initReserveとinitSelectedのwebsocketのeventをサーバー側に送信します。
function* initStatus({ socket, userId }) {
  yield socket.emit('initReserve');
  yield socket.emit('initSelected', { userId });
}
server側でinitReserveとinitSelectedのeventを受信します。
redisから初期値を取得し、*:receiveのeventをclient側に送信します。
  socket.on('initReserve', async () => {
    const reservedBox = await getReservedBox(); // redisからreservedBoxのデータを取ってくる関数
    socket.emit('initReserve:receive', reservedBox);
  });
  socket.on('initSelected', async ({ userId }) => {
    const selectedBox = await getSelectedBox(userId); // redisからselectedBoxのデータを取ってくる関数
    socket.emit('initSelected:receive', selectedBox);
  });
client側で*:receiveのeventを受信します。
サーバー側から受け取った値がpayloadに含まれたactionオブジェクトをemitします。
function subscribe(socket) {
  return eventChannel(emit => {
    const initReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox }));
    }
    const initSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }
    socket.on('initReserve:receive', initReserveHandler);
    socket.on('initSelected:receive', initSelectedHandler);
eventChannelでemitされた値はsagaのtakeで待ち受けることができます。
takeで取得したactionオブジェクトをputに渡してactionをdispatchします。
function* writeStatus(socket) {
  const channel = yield call(subscribe, socket);
  while (true) {
    const action = yield take(channel);
    yield put(action);
  }
}
reducerでstoreが更新されます。
ここまでがinitStatusのタスクの流れです。
import * as socketActions from '../actions/socket';
const initialState = {
  reservedBox: {},
  selectedBox: []
};
export default (state = initialState, action) => {
  switch (action.type) {
    case socketActions.RESERVE_UPDATE:
      return Object.assign({}, state, {
        reservedBox: action.payload.reservedBox
      });
    case socketActions.SELECTED_UPDATE:
      return Object.assign({}, state, {
        selectedBox: action.payload.selectedBox
      });
    default:
      return state;
  }
};
syncStatus
syncStatusのタスクはSYNC_RESERVEアクションの発火をトリガーに動きます。
function* syncStatus(socket) {
  while (true) {
    const { payload: { boxId, userId } } = yield take(socketActions.SYNC_RESERVE);
    // ...
SYNC_RESERVEアクションはボックスが選択されるたびに発火します。
export default class Box extends Component {
  handleClick = () => {
    this.props.socketActions.syncReserve({
      boxId: this.props.id,
      userId: this.props.user.uid
    });
  }
syncStatusにはpayloadとして、選択したボックスid、ユーザーidが渡ってきます。
ボックスid,ユーザーidを引数にbroadCastReserveとupdateSelectedのイベントをサーバー側へ送信します。
function* syncStatus(socket) {
  while (true) {
    const { payload: { boxId, userId } } = yield take(socketActions.SYNC_RESERVE);
    yield all([
      // reserveBoxのredisを更新、更新した値をbroadcastして、他ブラウザのreseveBoxのstore更新
      call(socket.emit, ['broadCastReserve', { boxId, userId }]),
      // selectedBoxのredisを更新、自身のブラウザのselectedBoxのstore更新
      call(socket.emit, ['updateSelected', { boxId, userId }]),
    ])
  }
}
broadCastReserveとupdateSelectedのイベントをサーバー側で受信します。
reservedBoxとselectedBoxのredisの値を更新し、更新後の値をbroadCastReserve:receive、updateSelected:receiveのイベントに載せてクライアント側に送信します。
  socket.on('broadCastReserve', async ({ boxId, userId }) => {
    // updateReservedBoxは引数のboxIdと現在のreservedBoxのredisの値をマージして返す
    const reservedBox = await updateReservedBox({ boxId, userId }) || [];
    socket.broadcast.emit(`broadCastReserve:receive`, JSON.parse(reservedBox));
  });
  socket.on('updateSelected', async ({ boxId, userId }) => {
    // updateSelectedBoxは引数のboxIdと現在のselectedBoxのredisの値をマージして返す
    const selectedBox = await updateSelectedBox({ boxId, userId }) || [];
    socket.emit('updateSelected:receive', JSON.parse(selectedBox));
  });
クライアント側でbroadCastReserve:receive、updateSelected:receiveのイベントを受信します。
サーバー側から受け取った値をpayloadに含んだactionオブジェクトをemitします。
function subscribe(socket) {
  return eventChannel(emit => {
    const updateSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }
    const broadCastReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox }));
    }
    socket.on('updateSelected:receive', updateSelectedHandler);
    socket.on('broadCastReserve:receive', broadCastReserveHandler);
eventChannelでemitされた値はsagaのtakeで待ち受けることができます。
takeで取得したactionオブジェクトをputに渡してactionをdispatchします。
function* writeStatus(socket) {
  const channel = yield call(subscribe, socket);
  while (true) {
    const action = yield take(channel);
    yield put(action);
  }
}
reducerでstoreが更新されます。
ここまでがsyncStatusのタスクの流れです。
import * as socketActions from '../actions/socket';
const initialState = {
  reservedBox: {},
  selectedBox: []
};
export default (state = initialState, action) => {
  switch (action.type) {
    case socketActions.RESERVE_UPDATE:
      return Object.assign({}, state, {
        reservedBox: action.payload.reservedBox
      });
    case socketActions.SELECTED_UPDATE:
      return Object.assign({}, state, {
        selectedBox: action.payload.selectedBox
      });
    default:
      return state;
  }
};
writeStatus
writeStatusのタスクの役割は、eventChannelのsubscriber関数からemitで渡ってくるactionオブジェクトをtakeで待ち受け、dispatchすることです。
function subscribe(socket) {
  return eventChannel(emit => {
    const initReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox })); // 1) emitの引数に渡したactionオブジェクトは。。。。
    }
    const initSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }
    const updateSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }
    const broadCastReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox }));
    }
    socket.on('initReserve:receive', initReserveHandler);
    socket.on('initSelected:receive', initSelectedHandler);
    socket.on('updateSelected:receive', updateSelectedHandler);
    socket.on('broadCastReserve:receive', broadCastReserveHandler);
    const unsubscribe = () => {
      socket.off('initReserve:receive', initReserveHandler);
      socket.off('initSelected:receive', initSelectedHandler);
      socket.off('updateSelected:receive', updateSelectedHandler);
      socket.off('broadCastReserve:receive', broadCastReserveHandler);
    }
    return unsubscribe;
  });
}
function* writeStatus(socket) {
  const channel = yield call(subscribe, socket);
  while (true) {
    const action = yield take(channel); // 2) ここで取得できる。
    yield put(action); // 3) actionをdispatchする。
  }
}
最後に
redux-sagaとwebsocketを組み合わそうとしている誰かの助けになれば幸いです!

