21
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

redux-sagaでwebsocketを使う

Last updated at Posted at 2018-09-24

あるアイテムを選んだら、そのアイテムは選択済みということが他のユーザーに伝播するようなアプリを作りました。

websocket.gif

コードはこちら
https://github.com/pokotyan/next-web-socket

redux-sagaでwebSocketを使う場合、eventChannelというのを使うんですが、これが癖がある感じだったので、今回作ったコードを元に処理の流れを説明していきたいと思います。

eventChannel:https://github.com/redux-saga/redux-saga/blob/master/docs/advanced/Channels.md

どのように作ってるか

nextとredux-saga、websocketとredis、ログインの部分にfirebaseのAuthenticationを利用しています。

まず、reduxのstoreに以下の2種類のデータを用意しています。

##reservedBox
user_idがkeyで、valueに選択したボックスのidの配列が入るオブジェクト。
reservedBoxは、ユーザーがボックスを選択するたびに値が更新され、その値を他のユーザーにブロードキャストします。

{
  63LyqMoV5KcplLqEInz1qvdWQvV2: [1, 2, 3]
  VNaz42PXYZUx7nmdTe4ZCd1N1qO2: [5, 6]
}

selectedBox

自身が選択したボックスのidの配列。
reservedBoxの情報があれば、selectedBoxはいらないんでは?と思うかもしれませんが、websocketのブロードキャストは"自分以外"の全員にデータを送るため、自分が選んだアイテムを当の自分が知り得ない、という状態になりました。そのため、自身が選択したボックスのid情報を別で用意しています。

[5, 6]

redisの活用

storeの情報はブラウザがリロードされればなくなってしまいます。それを防ぐため、redisにreservedBoxとselectedBoxの情報を保存しておき、ユーザーがボックスの選択を行うたびに更新します。
ブラウザがリロードされた場合、初期処理としてredisから値を取ってきてstoreに情報を埋め込む、ということをやっています。

コード

一旦クライアント側、サーバー側のコード全体を載せます。処理の流れは後述します。

client側

import { put, take, all, fork, call, select } from 'redux-saga/effects';
import { eventChannel } from 'redux-saga';
import * as socketActions from '../actions/socket';
import io from 'socket.io-client';

function createSocketConnection() {
  const socket = io('http://localhost:8999');

  return new Promise(resolve => {
    socket.on('connect', () => {
      resolve(socket);
    });
  });
}

// この関数は、指定されたソケットからeventChannelを作成します。
// 受信した `initReserve:receive`, `initSelected:receive`, `updateSelected:receive`, `broadCastReserve:receive` イベントに対するsubscriptionをセットアップします
function subscribe(socket) {
  // `eventChannel`はsubscriber関数をとります。
  // subscriber関数はsocketから渡ってきたデータをchannelに入れるために `emit`引数をとります
  return eventChannel(emit => {
    // handlerの役割はデータをchannelに入れることです。
    // これにより、Sagaは返されたchannelからこのデータを取得できます。
    // ここではreduxのactionがデータとしてchannelに入ります。
    const initReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox }));
    }

    const initSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }

    const updateSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }

    const broadCastReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox }));
    }

    // subscription をセットアップ
    socket.on('initReserve:receive', initReserveHandler);
    socket.on('initSelected:receive', initSelectedHandler);
    socket.on('updateSelected:receive', updateSelectedHandler);
    socket.on('broadCastReserve:receive', broadCastReserveHandler);

    // subscriberはsubscribeを解除する関数を返さなければなりません。
    // これは、sagaが `channel.close`メソッドを呼び出すときに呼び出されます
    const unsubscribe = () => {
      socket.off('initReserve:receive', initReserveHandler);
      socket.off('initSelected:receive', initSelectedHandler);
      socket.off('updateSelected:receive', updateSelectedHandler);
      socket.off('broadCastReserve:receive', broadCastReserveHandler);
    }

    return unsubscribe;
  });
}

function* initStatus({ socket, userId }) {
  // ブラウザがリロードされたときのためにredisから値を取ってきて、reservedBoxのstore更新
  yield socket.emit('initReserve');
  // ブラウザがリロードされたときのためにredisから値を取ってきて、selectedBoxのstore更新
  yield socket.emit('initSelected', { userId });
}

function* syncStatus(socket) {
  while (true) {
    const { payload: { boxId, userId } } = yield take(socketActions.SYNC_RESERVE);

    yield all([
      // reserveBoxのredisを更新、更新した値をbroadcastして、他ブラウザのreseveBoxのstore更新
      call(socket.emit, ['broadCastReserve', { boxId, userId }]),
      // selectedBoxのredisを更新、自身のブラウザのselectedBoxのstore更新
      call(socket.emit, ['updateSelected', { boxId, userId }]),
    ])
  }
}

function* writeStatus(socket) {
  const channel = yield call(subscribe, socket);

  while (true) {
    // subscriber関数から渡ってきたデータ(reduxのaction)を取得します。
    let action = yield take(channel);

    yield put(action);
  }
}

function* watchOnSocket() {
  while (true) {
    try {
      const { payload: { userId } } = yield take(socketActions.INIT_SOCKET);
      const socket = yield call(createSocketConnection)

      // redisから現在の状態を取得し、storeに反映させる初期化のタスク。
      yield fork(initStatus, { socket, userId });

      // ボックスを選択した際、自身の選択ボックスの更新と、その選択したボックス情報を他ブラウザにブロードキャストするタスク。
      yield fork(syncStatus, socket);

      // webSocketのイベントを待ち受け、socketから受け取ったデータを元にactionをdispatchし、storeを更新するタスク。
      // 上記のinitStatus、syncStatusのタスクがemitするwebsocketのイベントはサーバー側で受信される。
      // 受信後、サーバー側ではredisからのデータ取得,更新の処理が行われ、 `**:receive` のwebsocketイベントが送信される。
      // その`**:receive`のイベントを待ち受けるタスク。(websocketとredux-sagaの世界を繋ぐためにeventChannelを用いる)
      yield fork(writeStatus, socket);
    } catch (err) {
      console.error('socket error:', err)
    }
  }
}

export default function* rootSaga() {
  yield all([
    fork(watchOnSocket)
  ]);
}

server側

const express = require('express');
const cors = require('cors');
const app = express();
const { redisSet, redisGet } = require('./utils/redis')

const PORT = process.env.PORT || 8999;

const server = app.listen(PORT, () => {
  console.log(`Server started on port ${PORT} :)`);
});

const io = require('socket.io').listen(server);

io.origins('*:*');

io.on('connection', (socket) => {
  socket.on('initReserve', async () => {
    const reservedBox = await getReservedBox();

    socket.emit('initReserve:receive', reservedBox);
  });

  socket.on('initSelected', async ({ userId }) => {
    const selectedBox = await getSelectedBox(userId);

    socket.emit('initSelected:receive', selectedBox);
  });

  socket.on('broadCastReserve', async ({ boxId, userId }) => {
    const reservedBox = await updateReservedBox({ boxId, userId }) || [];

    socket.broadcast.emit(`broadCastReserve:receive`, JSON.parse(reservedBox));
  });

  socket.on('updateSelected', async ({ boxId, userId }) => {
    const selectedBox = await updateSelectedBox({ boxId, userId }) || [];

    socket.emit('updateSelected:receive', JSON.parse(selectedBox));
  });
});

async function updateReservedBox({ boxId, userId }) {
  const result = await redisGet('next-web-socket:reservedBox');
  const reservedBox = JSON.parse(result) || {};

  if (reservedBox[userId]) {
    reservedBox[userId].push(boxId);
  } else {
    reservedBox[userId] = [boxId];
  }

  await redisSet('next-web-socket:reservedBox', JSON.stringify(reservedBox));

  return redisGet('next-web-socket:reservedBox');  
}

async function updateSelectedBox({ boxId, userId }) {
  const result = await redisGet(`next-web-socket:selectedBox:${userId}`);
  const selectedBox = JSON.parse(result) || [];

  selectedBox.push(boxId);

  await redisSet(`next-web-socket:selectedBox:${userId}`, JSON.stringify(selectedBox));

  return redisGet(`next-web-socket:selectedBox:${userId}`);  
}

async function getReservedBox() {
  const reservedBox = await redisGet('next-web-socket:reservedBox');

  return JSON.parse(reservedBox) || {};
}

async function getSelectedBox(userId) {
  const selectedBox = await redisGet(`next-web-socket:selectedBox:${userId}`);

  return JSON.parse(selectedBox) || [];
}

処理の流れ

ルートのコンポーネントのcomponentDidMountで、initSocketのactionが発火します。

class App extends Component {
  componentDidMount() {
    const { auth, router, socketActions } = this.props;

    if (!auth.isAutherized) {
      router.push('/login');
    } else {
      socketActions.initSocket({
        userId: auth.user.uid
      });
    }
  }

initSocketのaction発火で以下の3つのsagaタスク(initStatus、syncStatus、writeStatus)が動きます。

function* watchOnSocket() {
  while (true) {
    try {
      const { payload: { userId } } = yield take(socketActions.INIT_SOCKET);
      const socket = yield call(createSocketConnection)

      yield fork(initStatus, { socket, userId });
      yield fork(syncStatus, socket);
      yield fork(writeStatus, socket);
    } catch (err) {
      throw err;
    }
  }
}

initStatus

initStatusのタスクはredisの値を初期値としてstoreに保存することです。
initReserveとinitSelectedのwebsocketのeventをサーバー側に送信します。

function* initStatus({ socket, userId }) {
  yield socket.emit('initReserve');
  yield socket.emit('initSelected', { userId });
}

server側でinitReserveとinitSelectedのeventを受信します。
redisから初期値を取得し、*:receiveのeventをclient側に送信します。

  socket.on('initReserve', async () => {
    const reservedBox = await getReservedBox(); // redisからreservedBoxのデータを取ってくる関数

    socket.emit('initReserve:receive', reservedBox);
  });

  socket.on('initSelected', async ({ userId }) => {
    const selectedBox = await getSelectedBox(userId); // redisからselectedBoxのデータを取ってくる関数

    socket.emit('initSelected:receive', selectedBox);
  });

client側で*:receiveのeventを受信します。
サーバー側から受け取った値がpayloadに含まれたactionオブジェクトをemitします。

function subscribe(socket) {
  return eventChannel(emit => {
    const initReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox }));
    }

    const initSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }

    socket.on('initReserve:receive', initReserveHandler);
    socket.on('initSelected:receive', initSelectedHandler);

eventChannelでemitされた値はsagaのtakeで待ち受けることができます。
takeで取得したactionオブジェクトをputに渡してactionをdispatchします。


function* writeStatus(socket) {
  const channel = yield call(subscribe, socket);

  while (true) {
    const action = yield take(channel);

    yield put(action);
  }
}

reducerでstoreが更新されます。
ここまでがinitStatusのタスクの流れです。

import * as socketActions from '../actions/socket';

const initialState = {
  reservedBox: {},
  selectedBox: []
};

export default (state = initialState, action) => {
  switch (action.type) {
    case socketActions.RESERVE_UPDATE:
      return Object.assign({}, state, {
        reservedBox: action.payload.reservedBox
      });
    case socketActions.SELECTED_UPDATE:
      return Object.assign({}, state, {
        selectedBox: action.payload.selectedBox
      });
    default:
      return state;
  }
};

syncStatus

syncStatusのタスクはSYNC_RESERVEアクションの発火をトリガーに動きます。

function* syncStatus(socket) {
  while (true) {
    const { payload: { boxId, userId } } = yield take(socketActions.SYNC_RESERVE);
    // ...

SYNC_RESERVEアクションはボックスが選択されるたびに発火します。

export default class Box extends Component {
  handleClick = () => {
    this.props.socketActions.syncReserve({
      boxId: this.props.id,
      userId: this.props.user.uid
    });
  }

syncStatusにはpayloadとして、選択したボックスid、ユーザーidが渡ってきます。
ボックスid,ユーザーidを引数にbroadCastReserveとupdateSelectedのイベントをサーバー側へ送信します。

function* syncStatus(socket) {
  while (true) {
    const { payload: { boxId, userId } } = yield take(socketActions.SYNC_RESERVE);

    yield all([
      // reserveBoxのredisを更新、更新した値をbroadcastして、他ブラウザのreseveBoxのstore更新
      call(socket.emit, ['broadCastReserve', { boxId, userId }]),
      // selectedBoxのredisを更新、自身のブラウザのselectedBoxのstore更新
      call(socket.emit, ['updateSelected', { boxId, userId }]),
    ])
  }
}

broadCastReserveとupdateSelectedのイベントをサーバー側で受信します。
reservedBoxとselectedBoxのredisの値を更新し、更新後の値をbroadCastReserve:receive、updateSelected:receiveのイベントに載せてクライアント側に送信します。

  socket.on('broadCastReserve', async ({ boxId, userId }) => {
    // updateReservedBoxは引数のboxIdと現在のreservedBoxのredisの値をマージして返す
    const reservedBox = await updateReservedBox({ boxId, userId }) || [];

    socket.broadcast.emit(`broadCastReserve:receive`, JSON.parse(reservedBox));
  });

  socket.on('updateSelected', async ({ boxId, userId }) => {
    // updateSelectedBoxは引数のboxIdと現在のselectedBoxのredisの値をマージして返す
    const selectedBox = await updateSelectedBox({ boxId, userId }) || [];

    socket.emit('updateSelected:receive', JSON.parse(selectedBox));
  });

クライアント側でbroadCastReserve:receive、updateSelected:receiveのイベントを受信します。
サーバー側から受け取った値をpayloadに含んだactionオブジェクトをemitします。

function subscribe(socket) {
  return eventChannel(emit => {
    const updateSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }

    const broadCastReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox }));
    }

    socket.on('updateSelected:receive', updateSelectedHandler);
    socket.on('broadCastReserve:receive', broadCastReserveHandler);

eventChannelでemitされた値はsagaのtakeで待ち受けることができます。
takeで取得したactionオブジェクトをputに渡してactionをdispatchします。

function* writeStatus(socket) {
  const channel = yield call(subscribe, socket);

  while (true) {
    const action = yield take(channel);

    yield put(action);
  }
}

reducerでstoreが更新されます。
ここまでがsyncStatusのタスクの流れです。

import * as socketActions from '../actions/socket';

const initialState = {
  reservedBox: {},
  selectedBox: []
};

export default (state = initialState, action) => {
  switch (action.type) {
    case socketActions.RESERVE_UPDATE:
      return Object.assign({}, state, {
        reservedBox: action.payload.reservedBox
      });
    case socketActions.SELECTED_UPDATE:
      return Object.assign({}, state, {
        selectedBox: action.payload.selectedBox
      });
    default:
      return state;
  }
};

writeStatus

writeStatusのタスクの役割は、eventChannelのsubscriber関数からemitで渡ってくるactionオブジェクトをtakeで待ち受け、dispatchすることです。

function subscribe(socket) {
  return eventChannel(emit => {
    const initReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox })); // 1) emitの引数に渡したactionオブジェクトは。。。。
    }

    const initSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }

    const updateSelectedHandler = async (selectedBox) => {
      emit(socketActions.selectedUpdate({ selectedBox }));
    }

    const broadCastReserveHandler = async (reservedBox) => {
      emit(socketActions.reserveUpdate({ reservedBox }));
    }

    socket.on('initReserve:receive', initReserveHandler);
    socket.on('initSelected:receive', initSelectedHandler);
    socket.on('updateSelected:receive', updateSelectedHandler);
    socket.on('broadCastReserve:receive', broadCastReserveHandler);

    const unsubscribe = () => {
      socket.off('initReserve:receive', initReserveHandler);
      socket.off('initSelected:receive', initSelectedHandler);
      socket.off('updateSelected:receive', updateSelectedHandler);
      socket.off('broadCastReserve:receive', broadCastReserveHandler);
    }

    return unsubscribe;
  });
}


function* writeStatus(socket) {
  const channel = yield call(subscribe, socket);

  while (true) {
    const action = yield take(channel); // 2) ここで取得できる。

    yield put(action); // 3) actionをdispatchする。
  }
}


最後に

redux-sagaとwebsocketを組み合わそうとしている誰かの助けになれば幸いです!

21
18
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?