Happy Elements Advent Calendar 2020 25日目の記事です。
#概要
24日目の記事でRedis Streamsを使ったリアルタイム通信基盤を使うに至った経緯を書きました。
今回はハンズオン形式で実際にRedis StreamsをPub/Subとして使う部分を書いていこうと思います。
この記事を元にRedis Streamsを使ったリアルタイム通信基盤を多くの企業で作ってもらえたらいいなと思っています。
#環境
masOS BigSur 11.0.1
nodebrew 1.0.1
npm 6.14.9
node 14.0.0
redis-server 6.0.9
#WebSocket使えるようにする
まずはnodeの初期化とwsのインストールを行います。
npm init --yes
npm install ws
WebSocket通信ができるようにコードを書きます。
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
ws.send('something');
});
サーバーを起動させます。
node server.js
WebSocketがちゃんと動いているのを確認します。
wscat -c ws://localhost:8080
#Redisに接続する
redisを起動します。
redis-server
ioredisをインストールします。
npm install ioredis
ioredisを使えるようにします。
const WebSocket = require('ws');
const Redis = require("ioredis");
const redis = new Redis();
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
ws.send('something');
});
#Redis Streamsを使ってSubscribeする
ioredisのissueを参考にRedis Streamsを使ってSubscribeします。
const WebSocket = require('ws');
const Redis = require("ioredis");
const redis = new Redis();
const wss = new WebSocket.Server({ port: 8080 });
async function subscribeStream(stream, listener) {
let lastID = '$'
while (true) {
// Implement your own `try/catch` logic,
// (For example, logging the errors and continue to the next loop)
const reply = await redis.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID)
if (!reply) {
continue
}
const results = reply[0][1]
const {length} = results
if (!results.length) {
continue
}
listener(results)
lastID = results[length - 1][0]
}
}
subscribeStream('mystream', console.log)
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
ws.send('something');
});
サーバーを起動します。
node server.js
Redisに接続し、XADDすることで動作確認します。
redis-cli
127.0.0.1:6379> XADD mystream * aaa 1234
#Redis Streamsを使ってPublishする
Publish用のRedisを作り、メッセージをPublishします。
const WebSocket = require('ws');
const Redis = require('ioredis');
const subscriber = new Redis();
const publisher = new Redis();
const wss = new WebSocket.Server({ port: 8080 });
async function subscribeStream(stream, listener) {
let lastID = '$'
while (true) {
// Implement your own `try/catch` logic,
// (For example, logging the errors and continue to the next loop)
const reply = await subscriber.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID);
if (!reply) {
continue;
}
const results = reply[0][1];
const { length } = results;
if (!results.length) {
continue;
}
listener(results);
lastID = results[length - 1][0];
}
}
async function publishStream(stream, message) {
await publisher.xadd(stream, '*', 'message', message);
}
subscribeStream('mystream', console.log)
wss.on('connection', function connection(ws) {
ws.on('message', async function incoming(message) {
await publishStream('mystream', message);
console.log('publish: ' + message);
});
});
#Redis Streamsを介して受け取ったメッセージをサーバー内の全員に送信する
Subscribeしてメッセージを受け取った時に、サーバー内の全員に送信します。
const WebSocket = require('ws');
const Redis = require('ioredis');
const subscriber = new Redis();
const publisher = new Redis();
const wss = new WebSocket.Server({ port: 8080 });
async function subscribeStream(stream, listener) {
let lastID = '$'
while (true) {
// Implement your own `try/catch` logic,
// (For example, logging the errors and continue to the next loop)
const reply = await subscriber.xread('BLOCK', '5000', 'COUNT', 100, 'STREAMS', stream, lastID);
if (!reply) {
continue;
}
const results = reply[0][1];
const { length } = results;
if (!results.length) {
continue;
}
listener(results);
lastID = results[length - 1][0];
}
}
async function publishStream(stream, message) {
await publisher.xadd(stream, '*', 'message', message);
}
subscribeStream('mystream', function broadcast(results) {
results.forEach(result => {
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(result[1][1]);
}
});
});
});
wss.on('connection', function connection(ws) {
ws.on('message', async function incoming(message) {
await publishStream('mystream', message);
});
});
wscatなどを用いてメッセージが複数コネクションでやりとりできることを確認します。
wscat -c ws://localhost:8080
Connected (press CTRL+C to quit)
> aaa
< aaa
< aaa
< bbb
> ccc
< ccc
#Redis Streamsを使ったリアルタイム通信基盤の完成!
あとはルームごとにSubscriberを分けたり、Redisクラスタに接続できるようにしたり、ALBを用意してWebSocketサーバーの負荷分散をしたりすれば完成です!
この辺りはRedis Pub/Subでも同じなので挑戦してみてください。
#まとめ
ハンズオン形式でRedis Streamsを使ったリアルタイム通信基盤の作り方を紹介しました。
スケールする圧倒的に簡単なリアルタイム通信基盤なので、手軽に使いたいニーズのある方にかなりオススメだと思っています。
(もっとこうした方がいいなどあれば教えていただけるととても喜びます)
この記事を見て、同じように最強のリアルタイム通信基盤を作ってもらえると嬉しいです。
最後までご覧いただきありがとうございました!
メンバー募集
Happy Elements株式会社 カカリアスタジオでは、
いっしょに【熱狂的に愛されるコンテンツ】をつくっていただけるメンバーを大募集中です!
もし弊社にご興味持っていただけましたら、是非一度
下記採用サイトをご覧ください。
Happy Elements株式会社 採用特設サイト