Node.js: Gateway — Sub-Socket & Broadcast
This page describes the sub-socket (subscribers per topic) and broadcasting from the backbone to those subscribers in Node.js.
Sub-Socket Data Structure
javascript
// topic -> Set<WebSocket>
const topicSubscribers = new Map();
function addSubscriber(ws, topic) {
if (!topicSubscribers.has(topic)) {
topicSubscribers.set(topic, new Set());
}
topicSubscribers.get(topic).add(ws);
}
function removeSubscriber(ws, topic) {
const set = topicSubscribers.get(topic);
if (set) {
set.delete(ws);
if (set.size === 0) topicSubscribers.delete(topic);
}
}
function removeConnection(ws) {
if (ws.topics) {
ws.topics.forEach((t) => removeSubscriber(ws, t));
}
}Listening to Backbone and Broadcasting to Subscribers
The gateway adds each client to the topic set on SUBSCRIBE. On the backbone side, a single subscription (e.g. NATS wildcard) receives messages; the gateway routes by topic to the correct sub-socket.
javascript
const { connect } = require('nats');
let nc;
async function subscribeToBackbone() {
nc = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222' });
const sub = nc.subscribe('>'); // NATS wildcard: all subjects
(async () => {
for await (const msg of sub) {
const topic = msg.subject;
const payload = JSON.parse(Buffer.from(msg.data).toString());
broadcastToTopic(topic, payload);
}
})();
}
function broadcastToTopic(topic, payload) {
const set = topicSubscribers.get(topic);
if (!set || set.size === 0) return;
const data = JSON.stringify({
type: 'message',
topic,
seqNo: payload.seqNo,
messageId: payload.messageId,
data: payload
});
set.forEach((ws) => {
if (ws.readyState === 1) ws.send(data); // OPEN
});
}SUBSCRIBE Handler: Add to Sub-Socket and (Optional) Per-Topic Backbone Subscription
If you prefer one NATS subscription per topic inside the gateway:
javascript
const activeSubs = new Map(); // topic -> nats subscription
async function ensureBackboneSubscription(topic) {
if (activeSubs.has(topic)) return;
const sub = nc.subscribe(topic);
activeSubs.set(topic, sub);
(async () => {
for await (const m of sub) {
const payload = JSON.parse(Buffer.from(m.data).toString());
broadcastToTopic(topic, payload);
}
})();
}
async function handleSubscribe(ws, msg) {
const { topic } = msg;
// ... ACL check ...
addSubscriber(ws, topic);
if (!ws.topics) ws.topics = new Set();
ws.topics.add(topic);
await ensureBackboneSubscription(topic);
ws.send(JSON.stringify({ type: 'subscribed', topic }));
}Cleanup on Connection Close
javascript
ws.on('close', () => {
removeConnection(ws);
});Flow Diagram
Full project: Full E2E Example. For an ElysiaJS implementation with the same model, see Elysia Example.