5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Happy ElementsAdvent Calendar 2020

Day 25

僕の考えた最強のリアルタイム通信基盤(実践編)〜みんなでライブの場合〜

Last updated at Posted at 2020-12-24

Happy Elements Advent Calendar 2020 25日目の記事です。

#概要
24日目の記事でRedis Streamsを使ったリアルタイム通信基盤を使うに至った経緯を書きました。
今回はハンズオン形式で実際にRedis StreamsをPub/Subとして使う部分を書いていこうと思います。
この記事を元にRedis Streamsを使ったリアルタイム通信基盤を多くの企業で作ってもらえたらいいなと思っています。

#環境

masOS BigSur 11.0.1
nodebrew 1.0.1
npm 6.14.9
node 14.0.0
redis-server 6.0.9

#WebSocket使えるようにする

まずはnodeの初期化とwsのインストールを行います。

npm init --yes
npm install ws

WebSocket通信ができるようにコードを書きます。

server.js
const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('received: %s', message);
  });

  ws.send('something');
});

サーバーを起動させます。

node server.js

WebSocketがちゃんと動いているのを確認します。

wscat -c ws://localhost:8080

#Redisに接続する

redisを起動します。

redis-server

ioredisをインストールします。

npm install ioredis

ioredisを使えるようにします。

server.js
const WebSocket = require('ws');
const Redis = require("ioredis");
const redis = new Redis();

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('received: %s', message);
  });

  ws.send('something');
});

#Redis Streamsを使ってSubscribeする

ioredisのissueを参考にRedis Streamsを使ってSubscribeします。

server.js

const WebSocket = require('ws');
const Redis = require("ioredis");
const redis = new Redis();

const wss = new WebSocket.Server({ port: 8080 });

async function subscribeStream(stream, listener) {
  let lastID = '$'

  while (true) {
    // Implement your own `try/catch` logic,
    // (For example, logging the errors and continue to the next loop)
    const reply = await redis.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID)
    if (!reply) {
      continue
    }
    const results = reply[0][1]
    const {length} = results
    if (!results.length) {
      continue
    }
    listener(results)
    lastID = results[length - 1][0]
  }
}

subscribeStream('mystream', console.log)

wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('received: %s', message);
  });

  ws.send('something');
});

サーバーを起動します。

node server.js

Redisに接続し、XADDすることで動作確認します。

redis-cli
127.0.0.1:6379> XADD mystream * aaa 1234

#Redis Streamsを使ってPublishする

Publish用のRedisを作り、メッセージをPublishします。

server.js
const WebSocket = require('ws');
const Redis = require('ioredis');
const subscriber = new Redis();
const publisher = new Redis();

const wss = new WebSocket.Server({ port: 8080 });

async function subscribeStream(stream, listener) {
  let lastID = '$'

  while (true) {
    // Implement your own `try/catch` logic,
    // (For example, logging the errors and continue to the next loop)
    const reply = await subscriber.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID);
    if (!reply) {
      continue;
    }
    const results = reply[0][1];
    const { length } = results;
    if (!results.length) {
      continue;
    }
    listener(results);
    lastID = results[length - 1][0];
  }
}

async function publishStream(stream, message) {
  await publisher.xadd(stream, '*', 'message', message);
}

subscribeStream('mystream', console.log)

wss.on('connection', function connection(ws) {
  ws.on('message', async function incoming(message) {
    await publishStream('mystream', message);
    console.log('publish: ' + message);
  });
});

#Redis Streamsを介して受け取ったメッセージをサーバー内の全員に送信する

Subscribeしてメッセージを受け取った時に、サーバー内の全員に送信します。

server.js
const WebSocket = require('ws');
const Redis = require('ioredis');
const subscriber = new Redis();
const publisher = new Redis();

const wss = new WebSocket.Server({ port: 8080 });

async function subscribeStream(stream, listener) {
  let lastID = '$'

  while (true) {
    // Implement your own `try/catch` logic,
    // (For example, logging the errors and continue to the next loop)
    const reply = await subscriber.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID);
    if (!reply) {
      continue;
    }
    const results = reply[0][1];
    const { length } = results;
    if (!results.length) {
      continue;
    }
    listener(results);
    lastID = results[length - 1][0];
  }
}

async function publishStream(stream, message) {
  await publisher.xadd(stream, '*', 'message', message);
}

subscribeStream('mystream', function broadcast(results) {
  results.forEach(result => {
    wss.clients.forEach(function each(client) {
      if (client.readyState === WebSocket.OPEN) {
        client.send(result[1][1]);
      }
    });
  });
});

wss.on('connection', function connection(ws) {
  ws.on('message', async function incoming(message) {
    await publishStream('mystream', message);
  });
});

wscatなどを用いてメッセージが複数コネクションでやりとりできることを確認します。

wscat -c ws://localhost:8080
Connected (press CTRL+C to quit)
> aaa
< aaa
< aaa
< bbb
> ccc
< ccc

#Redis Streamsを使ったリアルタイム通信基盤の完成!

あとはルームごとにSubscriberを分けたり、Redisクラスタに接続できるようにしたり、ALBを用意してWebSocketサーバーの負荷分散をしたりすれば完成です!
この辺りはRedis Pub/Subでも同じなので挑戦してみてください。

#まとめ

ハンズオン形式でRedis Streamsを使ったリアルタイム通信基盤の作り方を紹介しました。
スケールする圧倒的に簡単なリアルタイム通信基盤なので、手軽に使いたいニーズのある方にかなりオススメだと思っています。
(もっとこうした方がいいなどあれば教えていただけるととても喜びます)

この記事を見て、同じように最強のリアルタイム通信基盤を作ってもらえると嬉しいです。
最後までご覧いただきありがとうございました!

メンバー募集

Happy Elements株式会社 カカリアスタジオでは、
いっしょに【熱狂的に愛されるコンテンツ】をつくっていただけるメンバーを大募集中です!
もし弊社にご興味持っていただけましたら、是非一度
下記採用サイトをご覧ください。
Happy Elements株式会社 採用特設サイト

5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?