import WebSocket from "ws"; import Logs from "./logs.js"; /** * 解析推送过来的消息 * @param {*} data * @returns {object} */ const parseMessage = (data) => { let message; try { message = JSON.parse(data); } catch(e) { message = { text: data.toString() }; } return message; } /** * 自定义Websocket类 * 添加自动发送ping指令 */ class WsClient { #websocket; #pingTimer; constructor(wsserver, options) { this.#websocket = new WebSocket(wsserver, options); } get readyState() { return this.#websocket.readyState; } on(type, callback) { switch(type) { case 'open': this.#websocket.on('open', event => { this.#pingTimer = setInterval(() => { if (this.readyState === WebSocket.OPEN) { this.#websocket.send('PING'); } }, 10_000); callback(event); }); break; case 'message': this.#websocket.on('message', message => { const data = parseMessage(message); callback(data); }); break; case 'error': this.#websocket.on('error', error => { clearInterval(this.#pingTimer); callback(error); }); break; case 'close': this.#websocket.on('close', code => { clearInterval(this.#pingTimer); callback(code); }); break; default: console.log(type, callback); } return this; } send(message) { this.#websocket.send(JSON.stringify(message)); } close(code) { this.#websocket.close(code); } } /** * 建立长连接 */ class WebSocketClient { #url; #wsClient = null; #connecting = 0; #retryTimer = null; #retryCount = 0; #options = {}; #callback = {}; constructor(url, options) { this.#url = url; this.#options = options; } get wsClient() { return this.#wsClient; } get connecting() { return this.#connecting; } #setCallback(type, callback) { if (['open', 'message', 'error', 'close'].includes(type)) { this.#callback[type] = callback; } else { throw new Error(`Invalid callback type: ${type}`); } } #runCallback(type, event) { if (this.#callback[type]) { this.#callback[type](event); } } /** * 建立连接(若已在连接中/已连接,会先关闭旧连接再重建) * @returns {WsClient} */ connect() { if (this.#retryTimer) { clearTimeout(this.#retryTimer); this.#retryTimer = null; } // const { url, onmessage } = this.#connectInfo; const wsClient = new WsClient(this.#url, this.#options); this.#wsClient = wsClient; this.#connecting = 1; wsClient.on('open', event => { this.#retryCount = 0; this.#connecting = 0; if (this.#retryTimer) { clearTimeout(this.#retryTimer); this.#retryTimer = null; } this.#runCallback('open', event); // Logs.outDev('connect success'); }); wsClient.on('message', data => { const message = data; if (message.toUpperCase?.() == 'PONG' || message?.text?.toUpperCase?.() == 'PONG') { return; } this.#runCallback('message', data); // Logs.outDev('receive message', message); }); wsClient.on('error', event => { Logs.outDev('connect error', event); this.#runCallback('error', event); }); wsClient.on('close', event => { this.#runCallback('close', event); if (event.code == 1000) { // 手动断开,直接结束 this.#wsClient = null; this.#connecting = 0; if (this.#retryTimer) { clearTimeout(this.#retryTimer); this.#retryTimer = null; } Logs.outDev('close connection autonomously'); return; } // 异常断开,重新尝试连接 this.#retryCount += 1; if (this.#retryCount > 180) { this.#wsClient = null; this.#connecting = 0; Logs.outDev('ws connect retry reached maximum limit'); return; } let timeout = 5; if (this.#retryCount > 24) { timeout = 30; } else if (this.#retryCount > 40) { timeout = 60; } this.#connecting = 1; this.#retryTimer = setTimeout(() => { this.connect(); }, 1000 * timeout); Logs.outDev('connect closed, try again after %d seconds', timeout); }); return wsClient; } /** * 手动关闭连接(默认 code=1000) */ close(code = 1000) { if (this.#retryTimer) { clearTimeout(this.#retryTimer); this.#retryTimer = null; } this.#retryCount = 0; this.#connecting = 0; this.#wsClient?.close(code); } /** * 发送消息 * @param {*} message */ send(message) { this.#wsClient?.send(message); } /** * 绑定事件 * @param {*} type * @param {*} callback * @returns */ on(type, callback) { this.#setCallback(type, callback); return this; } } export default WebSocketClient;