ts封装websocket,支持失败重连、心跳、事件订阅
- - 木灵鱼儿一直很想体验下websocket,苦于一直没有机会,乘着这次优化,封装了一个原生的websocket处理类,本来是想用Socket.io的,但是它只能和它配到的server端使用,对于一些非封装的服务端,没法直接使用,于是只能自己封装了. ts愉悦的类型推断,传入事件名和回调函数,能自动推断出结果类型.
一直很想体验下websocket,苦于一直没有机会,乘着这次优化,封装了一个原生的websocket处理类,本来是想用Socket.io的,但是它只能和它配到的server端使用,对于一些非封装的服务端,没法直接使用,于是只能自己封装了。
功能:
为什么需要重新初始化,因为我们的spa项目中,用户退出登录需要断开socket连接,用户登录后又需要重新连接,所以重新初始化承接的是切换用户重连的功能。
事件订阅是由于socket传递消息全靠原生的onmessage事件,在通过这个事件的event对象里面的一些属性去区分具体事件,所以我们需要一个事件分发机制,用于通知对应事件。
再者,有些事件是长期的,订阅一次后可能没法重新订阅,比如app.vue中的处理,所以我设计的事件订阅会一直持有所有订阅,除非你手动off移除,这样的话,哪怕重新初始化连接socket,相对于的事件通知也不会丢失。
[hide]
首先是类型声明文件: types.ts
/** socket实例化参数 */
export interface WebSocketOptions {
/** 链接的url */
url: string | URL;
/** 协议字符串或一个协议字符串数组 */
protocols?: string | string[];
}
/** 监听事件回调 */
export type EventCallback<D> = (data: D) => void;
/** socket事件对应的回调参数类型 */
export interface SocketEventMap {
/** 默认事件:连接成功 */
onopen: Event;
/** 默认事件:连接关闭 */
onclose: CloseEvent;
/** 默认事件:message,只有在json解析数据发生错误时触发 */
onmessage: MessageEvent;
/** 心跳事件 */
event_ping: {
/** 与服务器时间差 */
delay: number;
/** 当前与服务器连接的用户id */
id: number;
/** 连接类型 */
type: string;
};
/** 用户充值成功通知 */
event_user_recharge: {
id: number;
user_id: number;
/** 当前充值的金额 */
pay_amount: string;
amount: number;
pay_type_id: number;
create_time: number;
update_time: number;
transaction_id: string;
ip: string;
pay_day: number;
pay_time: number;
notify_data: string;
status: number;
order_id: string;
pay_name: string;
bank_code: string;
pay_times: number;
pay_email: string;
pay_phone: number;
pay_way: number;
pay_ratio: string;
bonus: string;
recharge_times: number;
gift: string;
success_ymd: number;
user_merge_id: number;
};
/** 连接成功 */
event_connect: { client_id: string };
/** 连接的用户 */
event_bind: {};
/** 登录用户的数据 */
event_real_user: {};
}
/** 客户端socket发送消息参数 */
export interface SocketSendData {
data: {
/** 事件名 */
event: keyof SocketEventMap;
/** 数据 */
data: Record<string, any> | Array<any> | string;
};
}
SocketEventMap
里存放着socket事件和对应的事件参数,如果有新的事件,直接填入对应数据即可。
主体文件: index.ts
import type { SocketEventMap, EventCallback, WebSocketOptions, SocketSendData } from "./types";
import { store } from "@/store";
export class Socket {
/** 实例 */
private static instance: Socket;
/** store */
private store: typeof store;
/** 事件map */
private eventMap: Map<keyof SocketEventMap, Array<EventCallback<any>>> = new Map();
/** 是否开发模式 */
private isDev: boolean = import.meta.env.VITE_ENV === "development";
/** 是否已经初始化 */
private isInit: boolean = false;
/** webSocket实例 */
private socket?: WebSocket;
/** webSocket实例options:用于失败重连 */
private socketOptions: WebSocketOptions = {
url: ""
};
/** WebSocket非正常关闭code码 */
public static CLOSE_ABNORMAL = 1006;
/** 当前失败重连次数 */
private reconnectCount: number = 0;
/** 最大运行重连次数 */
private reconnectLimit: number = 3;
/** 心跳定时器 */
private heartCheckTimer: NodeJS.Timeout | null = null;
/** 心跳延迟,后端定的30s,怕赶不上,调整20s */
private heartCheckDelay: number = 20 * 1000;
private constructor() {
this.store = store;
}
/** 获取实例 */
public static getInstance(): Socket {
if (!Socket.instance) {
Socket.instance = new Socket();
}
return Socket.instance;
}
/** 订阅事件 */
public on<T extends keyof SocketEventMap>(event: T, callback: EventCallback<SocketEventMap[T]>): void {
let eventList = this.eventMap.get(event);
if (!eventList) {
eventList = [];
this.eventMap.set(event, eventList);
}
eventList.push(callback);
}
/** 取消订阅事件 */
public off<T extends keyof SocketEventMap>(event: T, callback: EventCallback<SocketEventMap[T]>): void {
const eventList = this.eventMap.get(event);
if (!eventList) return;
const index = eventList.findIndex((item) => item === callback);
if (index > -1) {
eventList.splice(index, 1);
}
}
/** 订阅一次性事件 */
public once<T extends keyof SocketEventMap>(event: T, callback: EventCallback<SocketEventMap[T]>): void {
let eventList = this.eventMap.get(event);
if (!eventList) {
eventList = [];
this.eventMap.set(event, eventList);
}
const onceCallback = (data: SocketEventMap[T]) => {
callback(data);
this.off(event, onceCallback);
};
}
/** 触发订阅 */
private emit<T extends keyof SocketEventMap>(event: T, data: SocketEventMap[T]): void {
const eventList = this.eventMap.get(event);
if (!eventList) return;
eventList.forEach((callback) => callback(data));
}
/** 清空订阅 */
public clear() {
this.eventMap.clear();
}
/** 初始化 */
public init() {
// 未登录或者已经初始化过,不再初始化
if (this.isInit || !this.store.getters["config/isLogin"]) return;
this.isInit = true;
// 创建socket实例
this.socket = this.createWebSocket();
// 首次初始化时订阅心跳事件
this.on("event_ping", this.onPing);
}
/** 重新初始化 */
public reInit() {
// 未登录不进行初始化
if (!this.store.getters["config/isLogin"]) return;
if (!this.isInit) {
this.init();
return;
}
// 重新创建新的websocket实例
this.reconnectCount = 0;
this.socket = this.createWebSocket();
}
/** 获取socket连接地址 */
private getSocketUrl(): string {
if (this.isDev) {
return import.meta.env.VITE_DEV_WS_URL;
}
return `wss://${location.host}`;
}
/** 获取token */
private getToken(): string {
const isLogin = this.store.getters["config/isLogin"];
if (isLogin) {
return this.store.state.config.real_token ?? "";
}
return "";
}
/** 创建websocket */
private createWebSocket(options?: WebSocketOptions): WebSocket {
const op: WebSocketOptions = {
url: ""
};
if (options) {
Object.assign(op, options);
} else {
const baseUrl = this.getSocketUrl();
const token = this.getToken();
op.url = `${baseUrl}?token=${token}`;
}
// 保存options,方便失败重连
this.socketOptions = op;
const socket = new WebSocket(op.url, op.protocols);
// 监听事件
socket.onopen = this.onOpen;
socket.onmessage = this.onMessage;
socket.onclose = this.onClose;
return socket;
}
/** websocket onOpen */
private onOpen = (event: Event) => {
if (this.isDev) {
console.log("socket connection successful");
}
// 创建心跳
this.heartCheck();
// 触发订阅
this.emit("onopen", event);
};
/** websocket onMessage */
private onMessage = (event: MessageEvent) => {
const { data } = event;
try {
const parseData = JSON.parse(data as string);
this.emit(parseData.event as keyof SocketEventMap, parseData.data as SocketEventMap[keyof SocketEventMap]);
} catch (error) {
this.emit("onmessage", data);
}
};
/** websocket onClose */
private onClose = (event: CloseEvent) => {
// 结束心跳,重连时会重新创建
this.clearHeartCheck();
// 如果WebSocket是非正常关闭 则进行重连
if (event.code === Socket.CLOSE_ABNORMAL) {
if (this.reconnectCount < this.reconnectLimit) {
// 重连
this.reconnectCount++;
this.reconnect();
} else {
// 触发订阅
this.emit("onclose", event);
}
} else {
// 触发订阅
this.emit("onclose", event);
}
};
/** websocket onPing */
private onPing = (_data: SocketEventMap["event_ping"]) => {
if (this.isDev) console.log("socket heart check success");
};
/** websocket 重连 */
private reconnect() {
this.socket = this.createWebSocket(this.socketOptions);
}
/** 发送消息 */
public sendMessage(data: SocketSendData) {
this.socket?.send(JSON.stringify(data));
}
/** 创建心跳 */
private heartCheck() {
this.clearHeartCheck();
this.heartCheckTimer = setInterval(() => {
if (this.socket?.readyState === WebSocket.OPEN) {
this.sendMessage({
data: {
event: "event_ping",
data: { time: Date.now() }
}
});
}
}, this.heartCheckDelay);
}
/** 结束心跳 */
private clearHeartCheck() {
if (this.heartCheckTimer !== null) {
clearInterval(this.heartCheckTimer);
this.heartCheckTimer = null;
}
}
/** 结束websocket连接 */
public close() {
this.socket?.close();
}
}
由于我的项目目前需要判断下用户是否登录,所以引入的vuex,大家使用可以根据自己的项目情况调整,就几个判断逻辑。
import { Socket } from "@/socket";
// 初始化socket
Socket.getInstance().init();
一般来说是在main.ts中进行初始化,注意我使用了vuex,所以初始化要放在use(vuex)后,以防出现问题。
当用户退出登录,我们就可以结束连接:
import { Socket } from "@/socket";
Socket.getInstance().close();
用户重新登录后,我们就重新连接:
import { Socket } from "@/socket";
Socket.getInstance().reInit();
用户登录连接,后端是要求传入token的,通过链接参数传递,具体在 createWebSocket
处理了,有需要可以自行调整。
[/hide]