Skip to content

Node.js: Omurga (NATS / Redis)

Gateway'in mesaj omurgası ile nasıl konuştuğu: NATS ve Redis örnekleri.


NATS: Publish & Subscribe

bash
npm install nats
javascript
const { connect } = require('nats');

const nc = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222' });

// Gateway'ten publish (client PUBLISH geldiğinde)
function publishToBackbone(topic, payload) {
  nc.publish(topic, Buffer.from(JSON.stringify(payload)));
}

// Gateway: tüm topic'leri dinle (wildcard)
const sub = nc.subscribe('>');
for await (const msg of sub) {
  const topic = msg.subject;
  const payload = JSON.parse(Buffer.from(msg.data).toString());
  broadcastToTopic(topic, payload); // Kendi broadcast fonksiyonunuz
}

NATS JetStream (Kalıcı + Replay)

Replay (RESUME) için mesaj geçmişi gerekiyorsa JetStream kullanılabilir:

javascript
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: 'MICRO', subjects: ['>'] });

const js = nc.jetstream();
await js.publish('prices.btc', Buffer.from(JSON.stringify({ seqNo: 1, data: {} })));

// Consumer ile belirli seq'den itibaren oku
const consumer = await js.consumers.get('MICRO', 'gateway');
const messages = await consumer.fetch({ max_messages: 100, expires: 5000 });
for await (const m of messages) {
  const topic = m.subject;
  const payload = JSON.parse(Buffer.from(m.data).toString());
  broadcastToTopic(topic, payload);
}

Redis: Dedup Store (Zaten Gateway Sayfasında)

javascript
const redis = require('redis');
const client = redis.createClient({ url: process.env.REDIS_URL });

await client.sAdd(`dedup:${userId}`, messageId);
await client.expire(`dedup:${userId}`, 3600);
const isNew = (await client.sAdd(`dedup:${userId}`, messageId)) === 1;

Redis Streams (Omurga Olarak)

Redis Streams ile topic başına stream; replay için XRANGE:

javascript
const streamKey = `stream:${topic}`;
await client.xAdd(streamKey, '*', { seqNo: String(seqNo), data: JSON.stringify(payload) });

// Replay: fromSeq ile son mesajları al
const messages = await client.xRange(streamKey, fromSeq, '+', 'COUNT', 100);

SeqNo: Redis INCR

javascript
const seqKey = `seq:${topic}`;
const seqNo = await client.incr(seqKey);

Özet Tablo

İhtiyaçNATSRedis
Pub/Subnc.publish / nc.subscribePub/Sub veya Streams
DedupYok (uygulama tarafında)Set + TTL
SeqNoUygulamaINCR
ReplayJetStream consumerStreams XRANGE

Tam entegrasyon: Gateway: ACL + Dedup, Gateway: Sub-Socket & Yayın.

Star the repo on GitHub if this documentation is useful — link in the navbar above.