Node.js: İstemci SDK
Micro Socket protokolüne uyumlu bir istemci örneği: CONNECT/setup, SUBSCRIBE, PUBLISH, ACK, seqNo takibi ve RESUME.
Bağımlılık
bash
npm install wsTemel Bağlantı ve Setup
javascript
const WebSocket = require('ws');
class MicroSocketClient {
constructor(url, options = {}) {
this.url = url;
this.token = options.token;
this.ws = null;
this.lastSeqPerTopic = new Map();
this.sessionId = null;
this.listeners = new Map(); // topic -> callback[]
}
connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.url);
this.ws.on('open', () => {
this.ws.send(JSON.stringify({ type: 'setup', token: this.token }));
});
this.ws.on('message', (data) => {
this.handleMessage(JSON.parse(data.toString()));
});
this.ws.on('message', (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === 'ready') {
this.sessionId = msg.sessionId;
resolve();
}
});
this.ws.on('error', reject);
});
}
handleMessage(msg) {
if (msg.type === 'message') {
this.handleIncomingMessage(msg);
return;
}
if (msg.type === 'error') {
console.error('Server error:', msg);
return;
}
}
handleIncomingMessage(msg) {
const { topic, seqNo, data, messageId } = msg;
const last = this.lastSeqPerTopic.get(topic) ?? 0;
if (seqNo === last + 1) {
this.lastSeqPerTopic.set(topic, seqNo);
this.emit(topic, data);
this.sendAck(messageId);
return;
}
if (seqNo > last + 1) {
this.requestReplay(topic, last + 1);
return;
}
this.sendAck(messageId);
}
emit(topic, data) {
const cbs = this.listeners.get(topic);
if (cbs) cbs.forEach((cb) => cb(data));
}
sendAck(messageId) {
if (this.ws && this.ws.readyState === 1) {
this.ws.send(JSON.stringify({ type: 'ack', messageId }));
}
}
requestReplay(topic, fromSeq) {
if (this.ws && this.ws.readyState === 1) {
this.ws.send(JSON.stringify({ type: 'replay', topic, fromSeq }));
}
}
subscribe(topic, callback) {
if (!this.listeners.has(topic)) this.listeners.set(topic, []);
this.listeners.get(topic).push(callback);
this.ws.send(JSON.stringify({ type: 'subscribe', topic }));
}
publish(topic, payload, messageId) {
const id = messageId || `msg-${Date.now()}-${Math.random().toString(36).slice(2)}`;
this.ws.send(JSON.stringify({ type: 'publish', topic, messageId: id, payload }));
return id;
}
resume() {
const lastSeqPerTopic = {};
this.lastSeqPerTopic.forEach((seq, topic) => {
lastSeqPerTopic[topic] = seq;
});
this.ws.send(JSON.stringify({
type: 'resume',
sessionId: this.sessionId,
lastSeqPerTopic
}));
}
close() {
if (this.ws) this.ws.close();
}
}Kullanım Örneği
javascript
const client = new MicroSocketClient('ws://localhost:8080', {
token: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'
});
await client.connect();
client.subscribe('risk.alerts', (data) => {
console.log('Alert:', data);
});
client.publish('orders', { side: 'buy', amount: 1 }, 'order-123');RESUME ile Yeniden Bağlanma
Bağlantı koptuğunda aynı URL'e tekrar bağlanıp resume() çağrılır:
javascript
client.ws.on('close', () => {
const reconnect = () => {
const ws = new WebSocket(client.url);
ws.on('open', () => {
ws.send(JSON.stringify({ type: 'setup', token: client.token }));
});
ws.on('message', (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === 'ready') {
client.ws = ws;
client.resume();
client.reattachMessageHandler(ws);
}
});
};
setTimeout(reconnect, 1000);
});SeqNo Akışı (İstemci Tarafı)
Daha fazla algoritma detayı: Sıralama & SeqNo.