Node.js: Omurga (NATS / Redis)
Gateway'in mesaj omurgası ile nasıl konuştuğu: NATS ve Redis örnekleri.
NATS: Publish & Subscribe
bash
npm install natsjavascript
const { connect } = require('nats');
const nc = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222' });
// Gateway'ten publish (client PUBLISH geldiğinde)
function publishToBackbone(topic, payload) {
nc.publish(topic, Buffer.from(JSON.stringify(payload)));
}
// Gateway: tüm topic'leri dinle (wildcard)
const sub = nc.subscribe('>');
for await (const msg of sub) {
const topic = msg.subject;
const payload = JSON.parse(Buffer.from(msg.data).toString());
broadcastToTopic(topic, payload); // Kendi broadcast fonksiyonunuz
}NATS JetStream (Kalıcı + Replay)
Replay (RESUME) için mesaj geçmişi gerekiyorsa JetStream kullanılabilir:
javascript
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: 'MICRO', subjects: ['>'] });
const js = nc.jetstream();
await js.publish('prices.btc', Buffer.from(JSON.stringify({ seqNo: 1, data: {} })));
// Consumer ile belirli seq'den itibaren oku
const consumer = await js.consumers.get('MICRO', 'gateway');
const messages = await consumer.fetch({ max_messages: 100, expires: 5000 });
for await (const m of messages) {
const topic = m.subject;
const payload = JSON.parse(Buffer.from(m.data).toString());
broadcastToTopic(topic, payload);
}Redis: Dedup Store (Zaten Gateway Sayfasında)
javascript
const redis = require('redis');
const client = redis.createClient({ url: process.env.REDIS_URL });
await client.sAdd(`dedup:${userId}`, messageId);
await client.expire(`dedup:${userId}`, 3600);
const isNew = (await client.sAdd(`dedup:${userId}`, messageId)) === 1;Redis Streams (Omurga Olarak)
Redis Streams ile topic başına stream; replay için XRANGE:
javascript
const streamKey = `stream:${topic}`;
await client.xAdd(streamKey, '*', { seqNo: String(seqNo), data: JSON.stringify(payload) });
// Replay: fromSeq ile son mesajları al
const messages = await client.xRange(streamKey, fromSeq, '+', 'COUNT', 100);SeqNo: Redis INCR
javascript
const seqKey = `seq:${topic}`;
const seqNo = await client.incr(seqKey);Özet Tablo
| İhtiyaç | NATS | Redis |
|---|---|---|
| Pub/Sub | nc.publish / nc.subscribe | Pub/Sub veya Streams |
| Dedup | Yok (uygulama tarafında) | Set + TTL |
| SeqNo | Uygulama | INCR |
| Replay | JetStream consumer | Streams XRANGE |
Tam entegrasyon: Gateway: ACL + Dedup, Gateway: Sub-Socket & Yayın.