Algoritma: Resume & Replay
I₆ invariant'ı: Bağlantı koptuğunda istemci kayıp yaşamadan kaldığı yerden devam eder (RESUME + gap replay).
Akış Şeması
Session ve lastSeq Saklama
- Gateway: Her bağlantı için
sessionId(UUID) üretilir; Redis/bellektesessionId → { userId, topics[], lastSeqPerTopic }saklanır. - İstemci: Bağlantı kopunca yeniden bağlanır ve ilk mesajda
RESUME(sessionId, lastSeqPerTopic)gönderir.
Pseudocode: Gateway Resume Handler
FUNCTION onResume(ws, sessionId, lastSeqPerTopic):
session = sessionStore.get(sessionId)
IF NOT session OR session.userId != ws.userId THEN
sendError(ws, "SESSION_EXPIRED")
RETURN
END IF
FOR EACH (topic, clientLastSeq) IN lastSeqPerTopic:
serverLastSeq = getLastSeqDelivered(sessionId, topic)
IF clientLastSeq < serverLastSeq THEN
messages = getMessagesFromBackbone(topic, clientLastSeq + 1, serverLastSeq)
FOR EACH m IN messages:
sendToClient(ws, m)
END IF
END FOR
// Artık canlı dinlemeye geç
attachToSubSockets(ws, session.topics)Replay Kaynağı
| Kaynak | Açıklama |
|---|---|
| Redis/Stream | Omurga Redis Streams ise XRANGE ile seq aralığı alınır. |
| Kafka | Partition offset ile belirli aralık consume edilir. |
| Gateway buffer | Son N mesajı gateway bellekte tutar; gap küçükse buradan replay. |
Node.js: Session Store (Bellek Örneği)
javascript
const sessionStore = new Map(); // sessionId -> { userId, topics, lastSeqPerTopic }
function createSession(userId) {
const sessionId = crypto.randomUUID();
sessionStore.set(sessionId, {
userId,
topics: [],
lastSeqPerTopic: {},
createdAt: Date.now()
});
return sessionId;
}
function onResume(ws, sessionId, lastSeqPerTopic) {
const session = sessionStore.get(sessionId);
if (!session || session.userId !== ws.userId) {
ws.send(JSON.stringify({ type: 'error', code: 'SESSION_EXPIRED' }));
return;
}
// Gap varsa replay; sonra normal dinlemeye devam
for (const [topic, clientLast] of Object.entries(lastSeqPerTopic)) {
const serverLast = session.lastSeqPerTopic[topic] ?? 0;
if (serverLast > clientLast) {
const gap = await getMessagesFromBackbone(topic, clientLast + 1, serverLast);
gap.forEach(m => ws.send(JSON.stringify(m)));
}
}
}Tam örnek: İstemci SDK, Tam E2E Örnek.