Skip to content

Node.js: İstemci SDK

Micro Socket protokolüne uyumlu bir istemci örneği: CONNECT/setup, SUBSCRIBE, PUBLISH, ACK, seqNo takibi ve RESUME.


Bağımlılık

bash
npm install ws

Temel Bağlantı ve Setup

javascript
const WebSocket = require('ws');

class MicroSocketClient {
  constructor(url, options = {}) {
    this.url = url;
    this.token = options.token;
    this.ws = null;
    this.lastSeqPerTopic = new Map();
    this.sessionId = null;
    this.listeners = new Map(); // topic -> callback[]
  }

  connect() {
    return new Promise((resolve, reject) => {
      this.ws = new WebSocket(this.url);
      this.ws.on('open', () => {
        this.ws.send(JSON.stringify({ type: 'setup', token: this.token }));
      });
      this.ws.on('message', (data) => {
        this.handleMessage(JSON.parse(data.toString()));
      });
      this.ws.on('message', (data) => {
        const msg = JSON.parse(data.toString());
        if (msg.type === 'ready') {
          this.sessionId = msg.sessionId;
          resolve();
        }
      });
      this.ws.on('error', reject);
    });
  }

  handleMessage(msg) {
    if (msg.type === 'message') {
      this.handleIncomingMessage(msg);
      return;
    }
    if (msg.type === 'error') {
      console.error('Server error:', msg);
      return;
    }
  }

  handleIncomingMessage(msg) {
    const { topic, seqNo, data, messageId } = msg;
    const last = this.lastSeqPerTopic.get(topic) ?? 0;

    if (seqNo === last + 1) {
      this.lastSeqPerTopic.set(topic, seqNo);
      this.emit(topic, data);
      this.sendAck(messageId);
      return;
    }
    if (seqNo > last + 1) {
      this.requestReplay(topic, last + 1);
      return;
    }
    this.sendAck(messageId);
  }

  emit(topic, data) {
    const cbs = this.listeners.get(topic);
    if (cbs) cbs.forEach((cb) => cb(data));
  }

  sendAck(messageId) {
    if (this.ws && this.ws.readyState === 1) {
      this.ws.send(JSON.stringify({ type: 'ack', messageId }));
    }
  }

  requestReplay(topic, fromSeq) {
    if (this.ws && this.ws.readyState === 1) {
      this.ws.send(JSON.stringify({ type: 'replay', topic, fromSeq }));
    }
  }

  subscribe(topic, callback) {
    if (!this.listeners.has(topic)) this.listeners.set(topic, []);
    this.listeners.get(topic).push(callback);
    this.ws.send(JSON.stringify({ type: 'subscribe', topic }));
  }

  publish(topic, payload, messageId) {
    const id = messageId || `msg-${Date.now()}-${Math.random().toString(36).slice(2)}`;
    this.ws.send(JSON.stringify({ type: 'publish', topic, messageId: id, payload }));
    return id;
  }

  resume() {
    const lastSeqPerTopic = {};
    this.lastSeqPerTopic.forEach((seq, topic) => {
      lastSeqPerTopic[topic] = seq;
    });
    this.ws.send(JSON.stringify({
      type: 'resume',
      sessionId: this.sessionId,
      lastSeqPerTopic
    }));
  }

  close() {
    if (this.ws) this.ws.close();
  }
}

Kullanım Örneği

javascript
const client = new MicroSocketClient('ws://localhost:8080', {
  token: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'
});

await client.connect();

client.subscribe('risk.alerts', (data) => {
  console.log('Alert:', data);
});

client.publish('orders', { side: 'buy', amount: 1 }, 'order-123');

RESUME ile Yeniden Bağlanma

Bağlantı koptuğunda aynı URL'e tekrar bağlanıp resume() çağrılır:

javascript
client.ws.on('close', () => {
  const reconnect = () => {
    const ws = new WebSocket(client.url);
    ws.on('open', () => {
      ws.send(JSON.stringify({ type: 'setup', token: client.token }));
    });
    ws.on('message', (data) => {
      const msg = JSON.parse(data.toString());
      if (msg.type === 'ready') {
        client.ws = ws;
        client.resume();
        client.reattachMessageHandler(ws);
      }
    });
  };
  setTimeout(reconnect, 1000);
});

SeqNo Akışı (İstemci Tarafı)

Daha fazla algoritma detayı: Sıralama & SeqNo.

Star the repo on GitHub if this documentation is useful — link in the navbar above.