Node.js: Gateway (Genel)
WebSocket gateway, istemcileri kabul eder; SUBSCRIBE/PUBLISH işler ve omurga (NATS/Redis) ile konuşur. Bu sayfa temel yapıyı verir; ACL ve dedup ayrı sayfalarda detaylanır.
Bağımlılıklar
bash
npm init -y
npm install ws nats redis jsonwebtokenjson
{
"dependencies": {
"ws": "^8.x",
"nats": "^2.x",
"redis": "^4.x",
"jsonwebtoken": "^9.x"
}
}Minimal Gateway (WebSocket + NATS)
javascript
const WebSocket = require('ws');
const { connect } = require('nats');
const PORT = process.env.WS_PORT || 8080;
let nc;
async function main() {
nc = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222' });
const wss = new WebSocket.Server({ port: PORT });
wss.on('connection', (ws, req) => {
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; });
ws.on('message', async (data) => {
try {
const msg = JSON.parse(data.toString());
if (msg.type === 'subscribe') {
const sub = nc.subscribe(msg.topic);
for await (const m of sub) {
if (ws.readyState !== WebSocket.OPEN) break;
ws.send(JSON.stringify({
type: 'message',
topic: msg.topic,
data: JSON.parse(Buffer.from(m.data).toString())
}));
}
}
if (msg.type === 'publish') {
nc.publish(msg.topic, Buffer.from(JSON.stringify(msg.payload || {})));
ws.send(JSON.stringify({ type: 'ack', messageId: msg.messageId, status: 'ok' }));
}
} catch (err) {
ws.send(JSON.stringify({ type: 'error', message: err.message }));
}
});
});
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (!ws.isAlive) return ws.terminate();
ws.isAlive = false;
ws.ping();
});
}, 30000);
wss.on('close', () => clearInterval(interval));
console.log('Gateway listening on', PORT);
}
main().catch(console.error);Protokol Mesaj Tipleri (Özet)
| type | Yön | Açıklama |
|---|---|---|
connect / setup | C→S | Token ile kimlik; gateway session açar. |
subscribe | C→S | topic; ACL sonrası sub-socket'e eklenir. |
unsubscribe | C→S | topic. |
publish | C→S | topic, messageId, payload; dedup + omurga. |
ack | C→S | messageId; QoS≥1 için teslim onayı. |
resume | C→S | sessionId, lastSeqPerTopic; kesinti sonrası. |
message | S→C | topic, seqNo, data, messageId. |
ack | S→C | messageId, status (ok |
error | S→C | code, message. |
Auth: Token'dan userId (Örnek)
javascript
const jwt = require('jsonwebtoken');
const SECRET = process.env.JWT_SECRET || 'your-secret';
function getUserIdFromToken(token) {
if (!token) return null;
try {
const decoded = jwt.verify(token, SECRET);
return decoded.sub || decoded.userId || decoded.id;
} catch {
return null;
}
}
// CONNECT/setup mesajında:
ws.on('message', async (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === 'setup' || msg.type === 'connect') {
ws.userId = getUserIdFromToken(msg.token);
if (!ws.userId) {
ws.send(JSON.stringify({ type: 'error', code: 'AUTH_FAILED' }));
return;
}
ws.send(JSON.stringify({ type: 'ready', sessionId: ws.sessionId }));
return;
}
// ... diğer mesaj tipleri
});Sub-Socket Yapısı (Topic → İstemci Seti)
javascript
// Gateway genelinde tek bir Map: topic -> Set<WebSocket>
const topicSubscribers = new Map();
function addSubscriber(ws, topic) {
if (!topicSubscribers.has(topic)) {
topicSubscribers.set(topic, new Set());
}
topicSubscribers.get(topic).add(ws);
}
function removeSubscriber(ws, topic) {
const set = topicSubscribers.get(topic);
if (set) {
set.delete(ws);
if (set.size === 0) topicSubscribers.delete(topic);
}
}
function broadcastToTopic(topic, payload) {
const set = topicSubscribers.get(topic);
if (!set) return;
const data = JSON.stringify(payload);
set.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) ws.send(data);
});
}Omurga'dan gelen mesajı bu broadcastToTopic ile abonelere iletebilirsiniz. Tam entegrasyon: Gateway: Sub-Socket & Yayın.
Sonraki Adımlar
- Gateway: ACL + Dedup — Yetki ve çift işlem engelleme
- Gateway: Sub-Socket & Yayın — Topic abonelik ve broadcast
- İstemci SDK — Client tarafı örnek