Original
Source Code
import cluster from 'cluster';
import net from 'net';
import engineIo from 'engine.io';
export const setupMaster = (
server: net.Server,
loadBalancingMethod: 'random' | 'round-robin' | 'least-connection',
): void => {
if (!cluster.isMaster) {
throw new Error(`This node is not master.`);
}
const sessionIdToWorker = new Map<string, string>();
const workerIdToClientCounts = new Map<string, number>();
let currentIndex = 0;
const computeWorkerId = (data: string): string => {
const match = /sid=([\w-]{20})/.exec(data);
if (match) {
const sid = match[1];
const workerId = sessionIdToWorker.get(sid);
if (workerId && cluster.workers[workerId]) {
return workerId;
}
}
switch (loadBalancingMethod) {
case 'random': {
const workerIds = Object.keys(cluster.workers);
return workerIds[Math.floor(Math.random() * workerIds.length)];
}
case 'round-robin': {
const workerIds = Object.keys(cluster.workers);
currentIndex++;
if (currentIndex >= workerIds.length) {
currentIndex = 0;
}
return workerIds[currentIndex];
}
case 'least-connection': {
let leastActiveWorkerId;
for (const id in cluster.workers) {
if (leastActiveWorkerId === undefined) {
leastActiveWorkerId = id;
} else {
const c1 = workerIdToClientCounts.get(id) ?? 0;
const c2 = workerIdToClientCounts.get(leastActiveWorkerId) ?? 0;
if (c1 < c2) {
leastActiveWorkerId = id;
}
}
}
return leastActiveWorkerId ?? '0';
}
}
};
server.on('connection', (socket) => {
socket.on('data', (buffer) => {
socket.pause();
const data = buffer.toString();
const workerId = computeWorkerId(data);
cluster.workers[workerId]?.send(
{ type: 'sticky:connection', data },
socket,
(err) => err && socket.destroy(),
);
});
});
cluster.on('message', (worker, { type, socketId }) => {
if (type && socketId) {
console.log(`${type as string}: ${socketId as string}`);
switch (type) {
case 'sticky:connection':
sessionIdToWorker.set(socketId, worker.id.toString());
if (loadBalancingMethod === 'least-connection') {
workerIdToClientCounts.set(
worker.id.toString(),
(workerIdToClientCounts.get(worker.id.toString()) ?? 0) + 1,
);
}
break;
case 'sticky:disconnection':
sessionIdToWorker.delete(socketId);
if (
loadBalancingMethod === 'least-connection' &&
workerIdToClientCounts.has(worker.id.toString())
) {
workerIdToClientCounts.set(
worker.id.toString(),
Math.max(
0,
(workerIdToClientCounts.get(worker.id.toString()) ?? 0) - 1,
),
);
}
break;
}
}
});
};
export const setupWorker = (
server: net.Server,
engine: engineIo.Server,
): void => {
if (!cluster.isWorker) {
throw new Error(`This node is not worker.`);
}
process.on('message', ({ type, data }, socket: net.Socket) => {
switch (type) {
case 'sticky:connection':
server.emit('connection', socket);
socket.emit('data', Buffer.from(data));
socket.resume();
break;
}
});
engine.on('connection', (socket) => {
process.send &&
process.send({ type: 'sticky:connection', socketId: socket.id });
socket.once('close', () => {
process.send &&
process.send({ type: 'sticky:disconnection', socketId: socket.id });
});
});
};