Node.js: Gateway — Sub-Socket & Yayın
Bu sayfa sub-socket (topic başına abone listesi) ve omurgadan gelen mesajın abonelere yayınlanmasını Node.js ile anlatır.
Sub-Socket Veri Yapısı
javascript
// topic -> Set<WebSocket>
const topicSubscribers = new Map();
function addSubscriber(ws, topic) {
if (!topicSubscribers.has(topic)) {
topicSubscribers.set(topic, new Set());
}
topicSubscribers.get(topic).add(ws);
}
function removeSubscriber(ws, topic) {
const set = topicSubscribers.get(topic);
if (set) {
set.delete(ws);
if (set.size === 0) topicSubscribers.delete(topic);
}
}
// Bağlantı kapandığında tüm topic'lerden çıkar
function removeConnection(ws) {
if (ws.topics) {
ws.topics.forEach((t) => removeSubscriber(ws, t));
}
}NATS'tan Dinleme ve Abonelere Yayın
Gateway, her SUBSCRIBE'ta sadece o istemciyi ilgili topic set'ine ekler. Omurga (NATS) tarafında ise tek bir NATS aboneliği ile topic'leri dinleyip, gelen mesajı topicSubscribers.get(topic) ile ilgili WebSocket'lere yollarız.
javascript
const { connect } = require('nats');
let nc;
async function subscribeToBackbone() {
nc = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222' });
// Tüm topic'leri dinlemek için wildcard veya bilinen topic listesi
const sub = nc.subscribe('>'); // NATS wildcard: tüm subject'ler
(async () => {
for await (const msg of sub) {
const topic = msg.subject;
const payload = JSON.parse(Buffer.from(msg.data).toString());
broadcastToTopic(topic, payload);
}
})();
}
function broadcastToTopic(topic, payload) {
const set = topicSubscribers.get(topic);
if (!set || set.size === 0) return;
const data = JSON.stringify({
type: 'message',
topic,
seqNo: payload.seqNo,
messageId: payload.messageId,
data: payload
});
set.forEach((ws) => {
if (ws.readyState === 1) ws.send(data); // OPEN
});
}SUBSCRIBE Handler: Sub-Socket'a Ekleme + NATS Aboneliği (Alternatif)
Eğer NATS'ta her topic için ayrı abonelik açmak isterseniz (gateway tarafında):
javascript
const activeSubs = new Map(); // topic -> nats subscription
async function ensureBackboneSubscription(topic) {
if (activeSubs.has(topic)) return;
const sub = nc.subscribe(topic);
activeSubs.set(topic, sub);
(async () => {
for await (const m of sub) {
const payload = JSON.parse(Buffer.from(m.data).toString());
broadcastToTopic(topic, payload);
}
})();
}
async function handleSubscribe(ws, msg) {
const { topic } = msg;
// ... ACL ...
addSubscriber(ws, topic);
if (!ws.topics) ws.topics = new Set();
ws.topics.add(topic);
await ensureBackboneSubscription(topic);
ws.send(JSON.stringify({ type: 'subscribed', topic }));
}Bağlantı Kapanınca Temizlik
javascript
ws.on('close', () => {
removeConnection(ws);
});Akış Şeması
Tam proje: Tam E2E Örnek.