ts封装websocket,支持失败重连、心跳、事件订阅

标签: ts 封装 websocket | 发表时间:2023-11-22 11:26 | 作者:木灵鱼儿
出处:https://www.mulingyuer.com/

前言

一直很想体验下websocket,苦于一直没有机会,乘着这次优化,封装了一个原生的websocket处理类,本来是想用Socket.io的,但是它只能和它配到的server端使用,对于一些非封装的服务端,没法直接使用,于是只能自己封装了。

功能:

  1. 支持失败重连
  2. 支持心跳
  3. 支持重新初始化
  4. 事件订阅
  5. ts愉悦的类型推断,传入事件名和回调函数,能自动推断出结果类型
  6. 支持手动断卡,重新初始化即可重新连接

为什么需要重新初始化,因为我们的spa项目中,用户退出登录需要断开socket连接,用户登录后又需要重新连接,所以重新初始化承接的是切换用户重连的功能。

事件订阅是由于socket传递消息全靠原生的onmessage事件,在通过这个事件的event对象里面的一些属性去区分具体事件,所以我们需要一个事件分发机制,用于通知对应事件。

再者,有些事件是长期的,订阅一次后可能没法重新订阅,比如app.vue中的处理,所以我设计的事件订阅会一直持有所有订阅,除非你手动off移除,这样的话,哪怕重新初始化连接socket,相对于的事件通知也不会丢失。

源码

[hide]

首先是类型声明文件: types.ts

  /** socket实例化参数 */
export interface WebSocketOptions {
    /** 链接的url */
    url: string | URL;
    /** 协议字符串或一个协议字符串数组 */
    protocols?: string | string[];
}

/** 监听事件回调 */
export type EventCallback<D> = (data: D) => void;

/** socket事件对应的回调参数类型 */
export interface SocketEventMap {
    /** 默认事件:连接成功 */
    onopen: Event;
    /** 默认事件:连接关闭 */
    onclose: CloseEvent;
    /** 默认事件:message,只有在json解析数据发生错误时触发 */
    onmessage: MessageEvent;
    /** 心跳事件 */
    event_ping: {
        /** 与服务器时间差 */
        delay: number;
        /** 当前与服务器连接的用户id */
        id: number;
        /** 连接类型 */
        type: string;
    };
    /** 用户充值成功通知 */
    event_user_recharge: {
        id: number;
        user_id: number;
        /** 当前充值的金额 */
        pay_amount: string;
        amount: number;
        pay_type_id: number;
        create_time: number;
        update_time: number;
        transaction_id: string;
        ip: string;
        pay_day: number;
        pay_time: number;
        notify_data: string;
        status: number;
        order_id: string;
        pay_name: string;
        bank_code: string;
        pay_times: number;
        pay_email: string;
        pay_phone: number;
        pay_way: number;
        pay_ratio: string;
        bonus: string;
        recharge_times: number;
        gift: string;
        success_ymd: number;
        user_merge_id: number;
    };
    /** 连接成功 */
    event_connect: { client_id: string };
    /** 连接的用户 */
    event_bind: {};
    /** 登录用户的数据 */
    event_real_user: {};
}

/** 客户端socket发送消息参数 */
export interface SocketSendData {
    data: {
        /** 事件名 */
        event: keyof SocketEventMap;
        /** 数据 */
        data: Record<string, any> | Array<any> | string;
    };
}

SocketEventMap里存放着socket事件和对应的事件参数,如果有新的事件,直接填入对应数据即可。

主体文件: index.ts

  import type { SocketEventMap, EventCallback, WebSocketOptions, SocketSendData } from "./types";
import { store } from "@/store";

export class Socket {
    /** 实例 */
    private static instance: Socket;
    /** store */
    private store: typeof store;
    /** 事件map */
    private eventMap: Map<keyof SocketEventMap, Array<EventCallback<any>>> = new Map();
    /** 是否开发模式 */
    private isDev: boolean = import.meta.env.VITE_ENV === "development";
    /** 是否已经初始化 */
    private isInit: boolean = false;
    /** webSocket实例 */
    private socket?: WebSocket;
    /** webSocket实例options:用于失败重连 */
    private socketOptions: WebSocketOptions = {
        url: ""
    };
    /** WebSocket非正常关闭code码 */
    public static CLOSE_ABNORMAL = 1006;
    /** 当前失败重连次数 */
    private reconnectCount: number = 0;
    /** 最大运行重连次数 */
    private reconnectLimit: number = 3;
    /** 心跳定时器 */
    private heartCheckTimer: NodeJS.Timeout | null = null;
    /** 心跳延迟,后端定的30s,怕赶不上,调整20s */
    private heartCheckDelay: number = 20 * 1000;

    private constructor() {
        this.store = store;
    }

    /** 获取实例 */
    public static getInstance(): Socket {
        if (!Socket.instance) {
            Socket.instance = new Socket();
        }
        return Socket.instance;
    }

    /** 订阅事件 */
    public on<T extends keyof SocketEventMap>(event: T, callback: EventCallback<SocketEventMap[T]>): void {
        let eventList = this.eventMap.get(event);
        if (!eventList) {
            eventList = [];
            this.eventMap.set(event, eventList);
        }
        eventList.push(callback);
    }

    /** 取消订阅事件 */
    public off<T extends keyof SocketEventMap>(event: T, callback: EventCallback<SocketEventMap[T]>): void {
        const eventList = this.eventMap.get(event);
        if (!eventList) return;
        const index = eventList.findIndex((item) => item === callback);
        if (index > -1) {
            eventList.splice(index, 1);
        }
    }

    /** 订阅一次性事件 */
    public once<T extends keyof SocketEventMap>(event: T, callback: EventCallback<SocketEventMap[T]>): void {
        let eventList = this.eventMap.get(event);
        if (!eventList) {
            eventList = [];
            this.eventMap.set(event, eventList);
        }
        const onceCallback = (data: SocketEventMap[T]) => {
            callback(data);
            this.off(event, onceCallback);
        };
    }

    /** 触发订阅 */
    private emit<T extends keyof SocketEventMap>(event: T, data: SocketEventMap[T]): void {
        const eventList = this.eventMap.get(event);
        if (!eventList) return;
        eventList.forEach((callback) => callback(data));
    }

    /** 清空订阅 */
    public clear() {
        this.eventMap.clear();
    }

    /** 初始化 */
    public init() {
        // 未登录或者已经初始化过,不再初始化
        if (this.isInit || !this.store.getters["config/isLogin"]) return;
        this.isInit = true;
        // 创建socket实例
        this.socket = this.createWebSocket();
        // 首次初始化时订阅心跳事件
        this.on("event_ping", this.onPing);
    }

    /** 重新初始化 */
    public reInit() {
        // 未登录不进行初始化
        if (!this.store.getters["config/isLogin"]) return;
        if (!this.isInit) {
            this.init();
            return;
        }
        // 重新创建新的websocket实例
        this.reconnectCount = 0;
        this.socket = this.createWebSocket();
    }

    /** 获取socket连接地址 */
    private getSocketUrl(): string {
        if (this.isDev) {
            return import.meta.env.VITE_DEV_WS_URL;
        }
        return `wss://${location.host}`;
    }

    /** 获取token */
    private getToken(): string {
        const isLogin = this.store.getters["config/isLogin"];
        if (isLogin) {
            return this.store.state.config.real_token ?? "";
        }
        return "";
    }

    /** 创建websocket */
    private createWebSocket(options?: WebSocketOptions): WebSocket {
        const op: WebSocketOptions = {
            url: ""
        };
        if (options) {
            Object.assign(op, options);
        } else {
            const baseUrl = this.getSocketUrl();
            const token = this.getToken();
            op.url = `${baseUrl}?token=${token}`;
        }
        // 保存options,方便失败重连
        this.socketOptions = op;
        const socket = new WebSocket(op.url, op.protocols);
        // 监听事件
        socket.onopen = this.onOpen;
        socket.onmessage = this.onMessage;
        socket.onclose = this.onClose;

        return socket;
    }

    /** websocket onOpen */
    private onOpen = (event: Event) => {
        if (this.isDev) {
            console.log("socket connection successful");
        }
        // 创建心跳
        this.heartCheck();
        // 触发订阅
        this.emit("onopen", event);
    };

    /** websocket onMessage */
    private onMessage = (event: MessageEvent) => {
        const { data } = event;
        try {
            const parseData = JSON.parse(data as string);
            this.emit(parseData.event as keyof SocketEventMap, parseData.data as SocketEventMap[keyof SocketEventMap]);
        } catch (error) {
            this.emit("onmessage", data);
        }
    };

    /** websocket onClose */
    private onClose = (event: CloseEvent) => {
        // 结束心跳,重连时会重新创建
        this.clearHeartCheck();
        // 如果WebSocket是非正常关闭 则进行重连
        if (event.code === Socket.CLOSE_ABNORMAL) {
            if (this.reconnectCount < this.reconnectLimit) {
                // 重连
                this.reconnectCount++;
                this.reconnect();
            } else {
                // 触发订阅
                this.emit("onclose", event);
            }
        } else {
            // 触发订阅
            this.emit("onclose", event);
        }
    };

    /** websocket onPing */
    private onPing = (_data: SocketEventMap["event_ping"]) => {
        if (this.isDev) console.log("socket heart check success");
    };

    /** websocket 重连 */
    private reconnect() {
        this.socket = this.createWebSocket(this.socketOptions);
    }

    /** 发送消息 */
    public sendMessage(data: SocketSendData) {
        this.socket?.send(JSON.stringify(data));
    }

    /** 创建心跳 */
    private heartCheck() {
        this.clearHeartCheck();
        this.heartCheckTimer = setInterval(() => {
            if (this.socket?.readyState === WebSocket.OPEN) {
                this.sendMessage({
                    data: {
                        event: "event_ping",
                        data: { time: Date.now() }
                    }
                });
            }
        }, this.heartCheckDelay);
    }

    /** 结束心跳 */
    private clearHeartCheck() {
        if (this.heartCheckTimer !== null) {
            clearInterval(this.heartCheckTimer);
            this.heartCheckTimer = null;
        }
    }

    /** 结束websocket连接 */
    public close() {
        this.socket?.close();
    }
}

由于我的项目目前需要判断下用户是否登录,所以引入的vuex,大家使用可以根据自己的项目情况调整,就几个判断逻辑。

使用

  import { Socket } from "@/socket";

// 初始化socket
Socket.getInstance().init();

一般来说是在main.ts中进行初始化,注意我使用了vuex,所以初始化要放在use(vuex)后,以防出现问题。

当用户退出登录,我们就可以结束连接:

  import { Socket } from "@/socket";

Socket.getInstance().close();

用户重新登录后,我们就重新连接:

  import { Socket } from "@/socket";

Socket.getInstance().reInit();

用户登录连接,后端是要求传入token的,通过链接参数传递,具体在 createWebSocket处理了,有需要可以自行调整。

[/hide]

相关 [ts 封装 websocket] 推荐:

ts封装websocket,支持失败重连、心跳、事件订阅

- - 木灵鱼儿
一直很想体验下websocket,苦于一直没有机会,乘着这次优化,封装了一个原生的websocket处理类,本来是想用Socket.io的,但是它只能和它配到的server端使用,对于一些非封装的服务端,没法直接使用,于是只能自己封装了. ts愉悦的类型推断,传入事件名和回调函数,能自动推断出结果类型.

WebSocket实战

- - 新浪UED
互联网发展到现在,早已超越了原始的初衷,人类从来没有像现在这样依赖过他;也正是这种依赖,促进了互联网技术的飞速发展. 而终端设备的创新与发展,更加速了互联网的进化;. WebSocket的前世今生. 为什么使用WebSocket. 搭建WebSocket服务器. 以上六点分为两大块,前3点侧重理论,主要让大家明白WebSocket是什么,而后3点则结合代码实战,加深对WebSocket的认知.

tomcat7之websocket

- - ITeye博客
从tomcat7.0.26之后开始支持websocket,建议大家使用tomcat7.0.30,期间版本的接口有一些改动. chrome默认支持websocket. 其他浏览器可能由于安全原因,默认是关闭的. // 与7.0.27不同的,Tomcat改变了createWebSocketInbound方法的定义,增加了一个HttpServletRequest参数.

基于Tomcat的WebSocket

- - ITeye博客
之前大概的看过WebSocket,当时Tomcat还不支持WebSocket,所以当时写了一篇基于Jetty的WebSocket实现,地址如下:. 现在Tomcat7.0.27发布了,从这个版本开始Tomcat就支持WebSocket了. Tomcat的WebSocket和Jetty的大致上差不多,大同小异,这里就简单的贴两个类吧:.

【转载】认识HTML5的WebSocket

- - HTML5研究小组
在HTML5规范中,我最喜欢的Web技术就是正迅速变得流行的WebSocket API. WebSocket提供了一个受欢迎的技术,以替代我们过去几年一直在用的Ajax技术. 这个新的API提供了一个方法,从客户端使用简单的语法 有效地推动消息到服务器. 让我们看一看HTML5的WebSocket API:它可用于客户端、服务器端.

QNAP TS-419P II:擁有 2.0GHz 處理器的進階 NAS

- Felix - T客邦
挑選NAS首先要比較的規格是處理器時脈與記憶體大小,只要這2項就能決定 NAS 的處理能力,其次考慮的是操作介面以及附加功能. QNAP 的 TS-419P 系列依硬體規格差異,有許多種版本. 目前這款最新版的 TS-419P II,記憶體為512MB,處理器時脈一口氣拉到2GHz,並擁有2個GbE網路埠.

利用ffmepg把ts文件转m3u8并切片

- - 三棵杏软件工作室
网上很多垃圾文章推荐segmenter工具,但用的时候,3.5G的ts文件丢了一半的数据,于是想到了ffmpeg转. 在国外网站找到命令,一句话搞定,没报半句错: ffmpeg -i 12生肖.ts -c copy -map 0 -f segment -segment_list playlist.m3u8 -segment_time 10 output%03d.ts 顺便共享给各位国内的同仁,免得深受其苦.

反向Ajax,第2部分:WebSocket

- KnightE - 译言-电脑/网络/数码科技
来源Reverse Ajax, Part 2: WebSockets. 时至今日,用户期待的是可通过web访问快速、动态的应用. 这一文章系列展示了如何使用反向Ajax(Reverse Ajax)技术来开发事件驱动的web应用. 系列的第1部分介绍了反向Ajax、轮询(polling)、流(streaming)、Comet和长轮询(long polling).

htm5-websocket实现数据查询应用

- - 博客园_首页
在之前的文章讲述了使用Websocket调用远程方式的功能,在这基础我们可以简单地使用WebSocket进行数据处理方面的应用;只需要在方法执行相关的数据库操作返回即可,结合jeasyui库所提供丰富的控件进行数据应用处理变得非常简单的事情.下面使用jeasyui和WebSocket实现一个查询Northwind数据订单的应用案例..

七种WebSocket框架的性能比较

- - 鸟窝
前一篇文章 使用四种框架分别实现百万websocket常连接的服务器介绍了四种websocket框架的测试方法和基本数据. 最近我又使用几个框架实现了websocket push服务器的原型,并专门对这七种实现做了测试. 本文记录了测试结果和一些对结果的分析. 使用三台C3.4xlarge AWS服务器做测试.