Skip to content

Node.js: Tam E2E Örnek

Tek bir proje yapısında: Gateway (ACL + Dedup + Sub-Socket + NATS) ve Client örneği.


Proje Yapısı

micro-socket-demo/
├── package.json
├── gateway.js      # WebSocket gateway
├── client.js       # Örnek istemci
├── .env            # REDIS_URL, NATS_URL, JWT_SECRET
└── README.md

package.json

json
{
  "name": "micro-socket-demo",
  "type": "module",
  "scripts": {
    "gateway": "node gateway.js",
    "client": "node client.js"
  },
  "dependencies": {
    "ws": "^8.14.2",
    "nats": "^2.16.0",
    "redis": "^4.6.0",
    "jsonwebtoken": "^9.0.2",
    "dotenv": "^16.3.1"
  }
}

gateway.js (Özet Yapı)

javascript
import 'dotenv/config';
import WebSocket from 'ws';
import { connect } from 'nats';
import { createClient } from 'redis';
import jwt from 'jsonwebtoken';

const PORT = process.env.WS_PORT || 8080;
const redis = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
await redis.connect();
const nc = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222' });

const topicSubscribers = new Map();
function addSub(ws, topic) {
  if (!topicSubscribers.has(topic)) topicSubscribers.set(topic, new Set());
  topicSubscribers.get(topic).add(ws);
}
function removeSub(ws, topic) {
  const s = topicSubscribers.get(topic);
  if (s) { s.delete(ws); if (s.size === 0) topicSubscribers.delete(topic); }
}
function broadcast(topic, payload) {
  const s = topicSubscribers.get(topic);
  if (!s) return;
  const data = JSON.stringify({ type: 'message', topic, ...payload });
  s.forEach(ws => ws.readyState === 1 && ws.send(data));
}

async function aclAllow(userId, topic, action) {
  const key = `acl:${userId}:${action}`;
  const allowed = await redis.sMembers(key);
  return allowed.includes(topic) || allowed.some(t => t.endsWith('*') && topic.startsWith(t.slice(0, -1)));
}
async function isDup(userId, messageId) {
  const key = `dedup:${userId}`;
  const added = await redis.sAdd(key, messageId);
  if (added === 0) return true;
  await redis.expire(key, 3600);
  return false;
}
async function nextSeq(topic) {
  return await redis.incr(`seq:${topic}`);
}

function getUserId(token) {
  if (!token) return null;
  try {
    const d = jwt.verify(token, process.env.JWT_SECRET || 'secret');
    return d.sub || d.userId;
  } catch { return null; }
}

const wss = new WebSocket.Server({ port: PORT });

wss.on('connection', (ws) => {
  ws.topics = new Set();
  ws.on('close', () => ws.topics.forEach(t => removeSub(ws, t)));

  ws.on('message', async (buf) => {
    const msg = JSON.parse(buf.toString());
    if (msg.type === 'setup') {
      ws.userId = getUserId(msg.token);
      if (!ws.userId) return ws.send(JSON.stringify({ type: 'error', code: 'AUTH_FAILED' }));
      return ws.send(JSON.stringify({ type: 'ready' }));
    }
    if (!ws.userId) return ws.send(JSON.stringify({ type: 'error', code: 'AUTH_REQUIRED' }));

    if (msg.type === 'subscribe') {
      if (!(await aclAllow(ws.userId, msg.topic, 'read')))
        return ws.send(JSON.stringify({ type: 'error', code: 'ACL_DENIED', topic: msg.topic }));
      addSub(ws, msg.topic);
      ws.topics.add(msg.topic);
      return ws.send(JSON.stringify({ type: 'subscribed', topic: msg.topic }));
    }

    if (msg.type === 'publish') {
      if (!(await aclAllow(ws.userId, msg.topic, 'write')))
        return ws.send(JSON.stringify({ type: 'error', code: 'ACL_DENIED', topic: msg.topic }));
      if (await isDup(ws.userId, msg.messageId))
        return ws.send(JSON.stringify({ type: 'ack', messageId: msg.messageId, status: 'duplicate' }));
      const seqNo = await nextSeq(msg.topic);
      const payload = { ...msg.payload, seqNo, messageId: msg.messageId };
      nc.publish(msg.topic, Buffer.from(JSON.stringify(payload)));
      ws.send(JSON.stringify({ type: 'ack', messageId: msg.messageId, status: 'ok' }));
    }
  });
});

// NATS'tan gelen mesajı abonelere yayınla
(async () => {
  const sub = nc.subscribe('>');
  for await (const m of sub) {
    const topic = m.subject;
    const payload = JSON.parse(Buffer.from(m.data).toString());
    broadcast(topic, payload);
  }
})();

console.log('Gateway on', PORT);

client.js (Örnek İstemci)

javascript
import WebSocket from 'ws';
import jwt from 'jsonwebtoken';

const token = jwt.sign({ sub: 'user1' }, process.env.JWT_SECRET || 'secret');
const ws = new WebSocket('ws://localhost:8080');

ws.on('open', () => {
  ws.send(JSON.stringify({ type: 'setup', token }));
});
ws.on('message', (data) => {
  const msg = JSON.parse(data.toString());
  if (msg.type === 'ready') {
    ws.send(JSON.stringify({ type: 'subscribe', topic: 'risk.alerts' }));
  }
  if (msg.type === 'subscribed') {
    ws.send(JSON.stringify({
      type: 'publish',
      topic: 'orders',
      messageId: 'ord-1',
      payload: { side: 'buy', amount: 1 }
    }));
  }
  if (msg.type === 'message') console.log('Message:', msg);
  if (msg.type === 'ack') console.log('Ack:', msg);
});

Test İçin ACL ve Redis

bash
# Redis'ta ACL (read/write)
redis-cli SADD acl:user1:read risk.alerts prices.*
redis-cli SADD acl:user1:write orders
bash
npm run gateway   # Bir terminal
npm run client    # Diğer terminal

Bu yapı, Metodoloji ve İnvariant'lar ile uyumlu minimal bir E2E örneğidir. Genişletmek için Gateway: ACL + Dedup ve İstemci SDK sayfalarına bakın.

Star the repo on GitHub if this documentation is useful — link in the navbar above.