Node.js: Tam E2E Örnek
Tek bir proje yapısında: Gateway (ACL + Dedup + Sub-Socket + NATS) ve Client örneği.
Proje Yapısı
micro-socket-demo/
├── package.json
├── gateway.js # WebSocket gateway
├── client.js # Örnek istemci
├── .env # REDIS_URL, NATS_URL, JWT_SECRET
└── README.mdpackage.json
json
{
"name": "micro-socket-demo",
"type": "module",
"scripts": {
"gateway": "node gateway.js",
"client": "node client.js"
},
"dependencies": {
"ws": "^8.14.2",
"nats": "^2.16.0",
"redis": "^4.6.0",
"jsonwebtoken": "^9.0.2",
"dotenv": "^16.3.1"
}
}gateway.js (Özet Yapı)
javascript
import 'dotenv/config';
import WebSocket from 'ws';
import { connect } from 'nats';
import { createClient } from 'redis';
import jwt from 'jsonwebtoken';
const PORT = process.env.WS_PORT || 8080;
const redis = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
await redis.connect();
const nc = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222' });
const topicSubscribers = new Map();
function addSub(ws, topic) {
if (!topicSubscribers.has(topic)) topicSubscribers.set(topic, new Set());
topicSubscribers.get(topic).add(ws);
}
function removeSub(ws, topic) {
const s = topicSubscribers.get(topic);
if (s) { s.delete(ws); if (s.size === 0) topicSubscribers.delete(topic); }
}
function broadcast(topic, payload) {
const s = topicSubscribers.get(topic);
if (!s) return;
const data = JSON.stringify({ type: 'message', topic, ...payload });
s.forEach(ws => ws.readyState === 1 && ws.send(data));
}
async function aclAllow(userId, topic, action) {
const key = `acl:${userId}:${action}`;
const allowed = await redis.sMembers(key);
return allowed.includes(topic) || allowed.some(t => t.endsWith('*') && topic.startsWith(t.slice(0, -1)));
}
async function isDup(userId, messageId) {
const key = `dedup:${userId}`;
const added = await redis.sAdd(key, messageId);
if (added === 0) return true;
await redis.expire(key, 3600);
return false;
}
async function nextSeq(topic) {
return await redis.incr(`seq:${topic}`);
}
function getUserId(token) {
if (!token) return null;
try {
const d = jwt.verify(token, process.env.JWT_SECRET || 'secret');
return d.sub || d.userId;
} catch { return null; }
}
const wss = new WebSocket.Server({ port: PORT });
wss.on('connection', (ws) => {
ws.topics = new Set();
ws.on('close', () => ws.topics.forEach(t => removeSub(ws, t)));
ws.on('message', async (buf) => {
const msg = JSON.parse(buf.toString());
if (msg.type === 'setup') {
ws.userId = getUserId(msg.token);
if (!ws.userId) return ws.send(JSON.stringify({ type: 'error', code: 'AUTH_FAILED' }));
return ws.send(JSON.stringify({ type: 'ready' }));
}
if (!ws.userId) return ws.send(JSON.stringify({ type: 'error', code: 'AUTH_REQUIRED' }));
if (msg.type === 'subscribe') {
if (!(await aclAllow(ws.userId, msg.topic, 'read')))
return ws.send(JSON.stringify({ type: 'error', code: 'ACL_DENIED', topic: msg.topic }));
addSub(ws, msg.topic);
ws.topics.add(msg.topic);
return ws.send(JSON.stringify({ type: 'subscribed', topic: msg.topic }));
}
if (msg.type === 'publish') {
if (!(await aclAllow(ws.userId, msg.topic, 'write')))
return ws.send(JSON.stringify({ type: 'error', code: 'ACL_DENIED', topic: msg.topic }));
if (await isDup(ws.userId, msg.messageId))
return ws.send(JSON.stringify({ type: 'ack', messageId: msg.messageId, status: 'duplicate' }));
const seqNo = await nextSeq(msg.topic);
const payload = { ...msg.payload, seqNo, messageId: msg.messageId };
nc.publish(msg.topic, Buffer.from(JSON.stringify(payload)));
ws.send(JSON.stringify({ type: 'ack', messageId: msg.messageId, status: 'ok' }));
}
});
});
// NATS'tan gelen mesajı abonelere yayınla
(async () => {
const sub = nc.subscribe('>');
for await (const m of sub) {
const topic = m.subject;
const payload = JSON.parse(Buffer.from(m.data).toString());
broadcast(topic, payload);
}
})();
console.log('Gateway on', PORT);client.js (Örnek İstemci)
javascript
import WebSocket from 'ws';
import jwt from 'jsonwebtoken';
const token = jwt.sign({ sub: 'user1' }, process.env.JWT_SECRET || 'secret');
const ws = new WebSocket('ws://localhost:8080');
ws.on('open', () => {
ws.send(JSON.stringify({ type: 'setup', token }));
});
ws.on('message', (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === 'ready') {
ws.send(JSON.stringify({ type: 'subscribe', topic: 'risk.alerts' }));
}
if (msg.type === 'subscribed') {
ws.send(JSON.stringify({
type: 'publish',
topic: 'orders',
messageId: 'ord-1',
payload: { side: 'buy', amount: 1 }
}));
}
if (msg.type === 'message') console.log('Message:', msg);
if (msg.type === 'ack') console.log('Ack:', msg);
});Test İçin ACL ve Redis
bash
# Redis'ta ACL (read/write)
redis-cli SADD acl:user1:read risk.alerts prices.*
redis-cli SADD acl:user1:write ordersbash
npm run gateway # Bir terminal
npm run client # Diğer terminalBu yapı, Metodoloji ve İnvariant'lar ile uyumlu minimal bir E2E örneğidir. Genişletmek için Gateway: ACL + Dedup ve İstemci SDK sayfalarına bakın.