Skip to content

Node.js: Gateway — Sub-Socket & Yayın

Bu sayfa sub-socket (topic başına abone listesi) ve omurgadan gelen mesajın abonelere yayınlanmasını Node.js ile anlatır.


Sub-Socket Veri Yapısı

javascript
// topic -> Set<WebSocket>
const topicSubscribers = new Map();

function addSubscriber(ws, topic) {
  if (!topicSubscribers.has(topic)) {
    topicSubscribers.set(topic, new Set());
  }
  topicSubscribers.get(topic).add(ws);
}

function removeSubscriber(ws, topic) {
  const set = topicSubscribers.get(topic);
  if (set) {
    set.delete(ws);
    if (set.size === 0) topicSubscribers.delete(topic);
  }
}

// Bağlantı kapandığında tüm topic'lerden çıkar
function removeConnection(ws) {
  if (ws.topics) {
    ws.topics.forEach((t) => removeSubscriber(ws, t));
  }
}

NATS'tan Dinleme ve Abonelere Yayın

Gateway, her SUBSCRIBE'ta sadece o istemciyi ilgili topic set'ine ekler. Omurga (NATS) tarafında ise tek bir NATS aboneliği ile topic'leri dinleyip, gelen mesajı topicSubscribers.get(topic) ile ilgili WebSocket'lere yollarız.

javascript
const { connect } = require('nats');
let nc;

async function subscribeToBackbone() {
  nc = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222' });

  // Tüm topic'leri dinlemek için wildcard veya bilinen topic listesi
  const sub = nc.subscribe('>'); // NATS wildcard: tüm subject'ler
  (async () => {
    for await (const msg of sub) {
      const topic = msg.subject;
      const payload = JSON.parse(Buffer.from(msg.data).toString());
      broadcastToTopic(topic, payload);
    }
  })();
}

function broadcastToTopic(topic, payload) {
  const set = topicSubscribers.get(topic);
  if (!set || set.size === 0) return;
  const data = JSON.stringify({
    type: 'message',
    topic,
    seqNo: payload.seqNo,
    messageId: payload.messageId,
    data: payload
  });
  set.forEach((ws) => {
    if (ws.readyState === 1) ws.send(data); // OPEN
  });
}

SUBSCRIBE Handler: Sub-Socket'a Ekleme + NATS Aboneliği (Alternatif)

Eğer NATS'ta her topic için ayrı abonelik açmak isterseniz (gateway tarafında):

javascript
const activeSubs = new Map(); // topic -> nats subscription

async function ensureBackboneSubscription(topic) {
  if (activeSubs.has(topic)) return;
  const sub = nc.subscribe(topic);
  activeSubs.set(topic, sub);
  (async () => {
    for await (const m of sub) {
      const payload = JSON.parse(Buffer.from(m.data).toString());
      broadcastToTopic(topic, payload);
    }
  })();
}

async function handleSubscribe(ws, msg) {
  const { topic } = msg;
  // ... ACL ...
  addSubscriber(ws, topic);
  if (!ws.topics) ws.topics = new Set();
  ws.topics.add(topic);
  await ensureBackboneSubscription(topic);
  ws.send(JSON.stringify({ type: 'subscribed', topic }));
}

Bağlantı Kapanınca Temizlik

javascript
ws.on('close', () => {
  removeConnection(ws);
});

Akış Şeması

Tam proje: Tam E2E Örnek.

Star the repo on GitHub if this documentation is useful — link in the navbar above.