Skip to content

Node.js: Gateway — Sub-Socket & Broadcast

This page describes the sub-socket (subscribers per topic) and broadcasting from the backbone to those subscribers in Node.js.


Sub-Socket Data Structure

javascript
// topic -> Set<WebSocket>
const topicSubscribers = new Map();

function addSubscriber(ws, topic) {
  if (!topicSubscribers.has(topic)) {
    topicSubscribers.set(topic, new Set());
  }
  topicSubscribers.get(topic).add(ws);
}

function removeSubscriber(ws, topic) {
  const set = topicSubscribers.get(topic);
  if (set) {
    set.delete(ws);
    if (set.size === 0) topicSubscribers.delete(topic);
  }
}

function removeConnection(ws) {
  if (ws.topics) {
    ws.topics.forEach((t) => removeSubscriber(ws, t));
  }
}

Listening to Backbone and Broadcasting to Subscribers

The gateway adds each client to the topic set on SUBSCRIBE. On the backbone side, a single subscription (e.g. NATS wildcard) receives messages; the gateway routes by topic to the correct sub-socket.

javascript
const { connect } = require('nats');
let nc;

async function subscribeToBackbone() {
  nc = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222' });

  const sub = nc.subscribe('>'); // NATS wildcard: all subjects
  (async () => {
    for await (const msg of sub) {
      const topic = msg.subject;
      const payload = JSON.parse(Buffer.from(msg.data).toString());
      broadcastToTopic(topic, payload);
    }
  })();
}

function broadcastToTopic(topic, payload) {
  const set = topicSubscribers.get(topic);
  if (!set || set.size === 0) return;
  const data = JSON.stringify({
    type: 'message',
    topic,
    seqNo: payload.seqNo,
    messageId: payload.messageId,
    data: payload
  });
  set.forEach((ws) => {
    if (ws.readyState === 1) ws.send(data); // OPEN
  });
}

SUBSCRIBE Handler: Add to Sub-Socket and (Optional) Per-Topic Backbone Subscription

If you prefer one NATS subscription per topic inside the gateway:

javascript
const activeSubs = new Map(); // topic -> nats subscription

async function ensureBackboneSubscription(topic) {
  if (activeSubs.has(topic)) return;
  const sub = nc.subscribe(topic);
  activeSubs.set(topic, sub);
  (async () => {
    for await (const m of sub) {
      const payload = JSON.parse(Buffer.from(m.data).toString());
      broadcastToTopic(topic, payload);
    }
  })();
}

async function handleSubscribe(ws, msg) {
  const { topic } = msg;
  // ... ACL check ...
  addSubscriber(ws, topic);
  if (!ws.topics) ws.topics = new Set();
  ws.topics.add(topic);
  await ensureBackboneSubscription(topic);
  ws.send(JSON.stringify({ type: 'subscribed', topic }));
}

Cleanup on Connection Close

javascript
ws.on('close', () => {
  removeConnection(ws);
});

Flow Diagram

Full project: Full E2E Example. For an ElysiaJS implementation with the same model, see Elysia Example.

Star the repo on GitHub if this documentation is useful — link in the navbar above.