Skip to content

Node.js: Gateway (Overview)

The WebSocket gateway accepts client connections, handles SUBSCRIBE/PUBLISH, and integrates with a backbone (e.g. NATS or Redis). This page describes the core structure; ACL and dedup are detailed in separate pages.


Dependencies

bash
npm init -y
npm install ws nats redis jsonwebtoken
json
{
  "dependencies": {
    "ws": "^8.x",
    "nats": "^2.x",
    "redis": "^4.x",
    "jsonwebtoken": "^9.x"
  }
}

Minimal Gateway (WebSocket + NATS)

javascript
const WebSocket = require('ws');
const { connect } = require('nats');

const PORT = process.env.WS_PORT || 8080;
let nc;

async function main() {
  nc = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222' });
  const wss = new WebSocket.Server({ port: PORT });

  wss.on('connection', (ws, req) => {
    ws.isAlive = true;
    ws.on('pong', () => { ws.isAlive = true; });

    ws.on('message', async (data) => {
      try {
        const msg = JSON.parse(data.toString());
        if (msg.type === 'subscribe') {
          const sub = nc.subscribe(msg.topic);
          for await (const m of sub) {
            if (ws.readyState !== WebSocket.OPEN) break;
            ws.send(JSON.stringify({
              type: 'message',
              topic: msg.topic,
              data: JSON.parse(Buffer.from(m.data).toString())
            }));
          }
        }
        if (msg.type === 'publish') {
          nc.publish(msg.topic, Buffer.from(JSON.stringify(msg.payload || {})));
          ws.send(JSON.stringify({ type: 'ack', messageId: msg.messageId, status: 'ok' }));
        }
      } catch (err) {
        ws.send(JSON.stringify({ type: 'error', message: err.message }));
      }
    });
  });

  const interval = setInterval(() => {
    wss.clients.forEach((ws) => {
      if (!ws.isAlive) return ws.terminate();
      ws.isAlive = false;
      ws.ping();
    });
  }, 30000);
  wss.on('close', () => clearInterval(interval));

  console.log('Gateway listening on', PORT);
}
main().catch(console.error);

Protocol Message Types (Summary)

typeDirectionDescription
connect / setupC→SIdentify with token; gateway opens session.
subscribeC→SSubscribe to topic; after ACL, add to sub-socket.
unsubscribeC→SUnsubscribe from topic.
publishC→Stopic, messageId, payload; dedup then backbone.
ackC→SmessageId; delivery confirmation for QoS≥1.
resumeC→SsessionId, lastSeqPerTopic; after reconnect.
messageS→Ctopic, seqNo, data, messageId.
ackS→CmessageId, status (ok
errorS→Ccode, message.

Auth: userId from Token (Example)

javascript
const jwt = require('jsonwebtoken');
const SECRET = process.env.JWT_SECRET || 'your-secret';

function getUserIdFromToken(token) {
  if (!token) return null;
  try {
    const decoded = jwt.verify(token, SECRET);
    return decoded.sub || decoded.userId || decoded.id;
  } catch {
    return null;
  }
}

// On CONNECT/setup:
ws.on('message', async (data) => {
  const msg = JSON.parse(data.toString());
  if (msg.type === 'setup' || msg.type === 'connect') {
    ws.userId = getUserIdFromToken(msg.token);
    if (!ws.userId) {
      ws.send(JSON.stringify({ type: 'error', code: 'AUTH_FAILED' }));
      return;
    }
    ws.send(JSON.stringify({ type: 'ready', sessionId: ws.sessionId }));
    return;
  }
  // ... other message types
});

Sub-Socket Structure (Topic → Set of Connections)

javascript
const topicSubscribers = new Map();

function addSubscriber(ws, topic) {
  if (!topicSubscribers.has(topic)) {
    topicSubscribers.set(topic, new Set());
  }
  topicSubscribers.get(topic).add(ws);
}

function removeSubscriber(ws, topic) {
  const set = topicSubscribers.get(topic);
  if (set) {
    set.delete(ws);
    if (set.size === 0) topicSubscribers.delete(topic);
  }
}

function broadcastToTopic(topic, payload) {
  const set = topicSubscribers.get(topic);
  if (!set) return;
  const data = JSON.stringify(payload);
  set.forEach((ws) => {
    if (ws.readyState === WebSocket.OPEN) ws.send(data);
  });
}

Messages from the backbone are delivered to subscribers via broadcastToTopic. Full integration: Gateway: Sub-Socket & Broadcast.


Next Steps

Star the repo on GitHub if this documentation is useful — link in the navbar above.