Skip to content

Algoritma: Resume & Replay

I₆ invariant'ı: Bağlantı koptuğunda istemci kayıp yaşamadan kaldığı yerden devam eder (RESUME + gap replay).


Akış Şeması


Session ve lastSeq Saklama

  • Gateway: Her bağlantı için sessionId (UUID) üretilir; Redis/bellekte sessionId → { userId, topics[], lastSeqPerTopic } saklanır.
  • İstemci: Bağlantı kopunca yeniden bağlanır ve ilk mesajda RESUME(sessionId, lastSeqPerTopic) gönderir.

Pseudocode: Gateway Resume Handler

FUNCTION onResume(ws, sessionId, lastSeqPerTopic):
  session = sessionStore.get(sessionId)
  IF NOT session OR session.userId != ws.userId THEN
    sendError(ws, "SESSION_EXPIRED")
    RETURN
  END IF

  FOR EACH (topic, clientLastSeq) IN lastSeqPerTopic:
    serverLastSeq = getLastSeqDelivered(sessionId, topic)
    IF clientLastSeq < serverLastSeq THEN
      messages = getMessagesFromBackbone(topic, clientLastSeq + 1, serverLastSeq)
      FOR EACH m IN messages:
        sendToClient(ws, m)
    END IF
  END FOR

  // Artık canlı dinlemeye geç
  attachToSubSockets(ws, session.topics)

Replay Kaynağı

KaynakAçıklama
Redis/StreamOmurga Redis Streams ise XRANGE ile seq aralığı alınır.
KafkaPartition offset ile belirli aralık consume edilir.
Gateway bufferSon N mesajı gateway bellekte tutar; gap küçükse buradan replay.

Node.js: Session Store (Bellek Örneği)

javascript
const sessionStore = new Map(); // sessionId -> { userId, topics, lastSeqPerTopic }

function createSession(userId) {
  const sessionId = crypto.randomUUID();
  sessionStore.set(sessionId, {
    userId,
    topics: [],
    lastSeqPerTopic: {},
    createdAt: Date.now()
  });
  return sessionId;
}

function onResume(ws, sessionId, lastSeqPerTopic) {
  const session = sessionStore.get(sessionId);
  if (!session || session.userId !== ws.userId) {
    ws.send(JSON.stringify({ type: 'error', code: 'SESSION_EXPIRED' }));
    return;
  }
  // Gap varsa replay; sonra normal dinlemeye devam
  for (const [topic, clientLast] of Object.entries(lastSeqPerTopic)) {
    const serverLast = session.lastSeqPerTopic[topic] ?? 0;
    if (serverLast > clientLast) {
      const gap = await getMessagesFromBackbone(topic, clientLast + 1, serverLast);
      gap.forEach(m => ws.send(JSON.stringify(m)));
    }
  }
}

Tam örnek: İstemci SDK, Tam E2E Örnek.

Star the repo on GitHub if this documentation is useful — link in the navbar above.