RabbitMQ实现即时通讯居然如此简单!连后端代码都省得写了?

标签: rabbitmq 即时通讯 后端 | 发表时间:2020-10-14 00:56 | 作者:MacroZheng
出处:https://juejin.im/backend

SpringBoot实战电商项目mall(40k+star)地址: github.com/macrozheng/…

摘要

有时候我们的项目中会用到 即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功。最近发现RabbitMQ可以很方便的实现 即时通讯功能,如果你没有特殊的业务需求,甚至可以不写后端代码,今天给大家讲讲如何使用RabbitMQ来实现 即时通讯

MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的 轻量级通讯协议,该协议构建于 TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

MQTT相关概念

  • Publisher(发布者):消息的发出者,负责发送消息。
  • Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
  • Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。
  • Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
  • Payload(负载);可以理解为发送消息的内容。
  • QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有 QoS 0QoS 1QoS 2三个等级,下面分别介绍下:
    • QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
    • QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
    • QoS 2(Exactly Once):只有一次,确保消息只到达一次。

RabbitMQ启用MQTT功能

RabbitMQ启用MQTT功能,需要先安装然RabbitMQ然后再启用MQTT插件。

   rabbitmq-plugins enable rabbitmq_mqtt
复制代码
  • 开启成功后,查看管理控制台,我们可以发现MQTT服务运行在 1883端口上了。

MQTT客户端

我们可以使用MQTT客户端来测试MQTT的即时通讯功能,这里使用的是 MQTTBox这个客户端工具。

  • 点击 Create MQTT Client按钮来创建一个MQTT客户端;

  • 接下来对MQTT客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可;

  • 再配置一个订阅者,订阅者订阅 testTopicA这个主题,我们会向这个主题发送消息;

  • 发布者向主题中发布消息,订阅者可以实时接收到。

前端直接实现即时通讯

既然 MQTTBox客户端可以直接通过RabbitMQ实现即时通讯,那我们是不是直接使用前端技术也可以实现即时通讯?答案是肯定的!下面我们将通过 html+javascript实现一个简单的聊天功能,真正不写一行后端代码实现即时通讯!

  • 由于RabbitMQ与Web端交互底层使用的是WebSocket,所以我们需要开启RabbitMQ的MQTT WEB支持,使用如下命令开启即可;
   rabbitmq-plugins enable rabbitmq_web_mqtt
复制代码
  • 开启成功后,查看管理控制台,我们可以发现MQTT的WEB服务运行在 15675端口上了;

  • 实现的功能非常简单,一个单聊功能,需要注意的是配置好MQTT服务的访问地址为:ws://localhost:15675/ws
   html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Titletitle>
head>
<body>
<div>
    <label>目标Topic:<input id="targetTopicInput" type="text">label><br>
    <label>发送消息:<input id="messageInput" type="text">label><br>
    <button onclick="sendMessage()">发送button>
    <button onclick="clearMessage()">清空button>
    <div id="messageDiv">div>
div>
body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js">script>
<script>
    //RabbitMQ的web-mqtt连接地址
    const url = 'ws://localhost:15675/ws';
    //获取订阅的topic
    const topic = getQueryString("topic");
    //连接到消息队列
    let client = mqtt.connect(url);
    client.on('connect', function () {
        //连接成功后订阅topic
        client.subscribe(topic, function (err) {
            if (!err) {
                showMessage("订阅topic:" + topic + "成功!");
            }
        });
    });
    //获取订阅topic中的消息
    client.on('message', function (topic, message) {
        showMessage("收到消息:" + message.toString());
    });

    //发送消息
    function sendMessage() {
        let targetTopic = document.getElementById("targetTopicInput").value;
        let message = document.getElementById("messageInput").value;
        //向目标topic中发送消息
        client.publish(targetTopic, message);
        showMessage("发送消息给" + targetTopic + "的消息:" + message);
    }

    //从URL中获取参数
    function getQueryString(name) {
        let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
        let r = window.location.search.substr(1).match(reg);
        if (r != null) {
            return decodeURIComponent(r[2]);
        }
        return null;
    }

    //在消息列表中展示消息
    function showMessage(message) {
        let messageDiv = document.getElementById("messageDiv");
        let messageEle = document.createElement("div");
        messageEle.innerText = message;
        messageDiv.appendChild(messageEle);
    }

    //清空消息列表
    function clearMessage() {
        let messageDiv = document.getElementById("messageDiv");
        messageDiv.innerHTML = "";
    }
script>
html>
复制代码
  • 接下来我们订阅不同的主题开启两个页面测试下功能(页面放在了SpringBoot应用的resource目录下了,需要先启动应用再访问):

    • 第一个订阅主题 testTopicA,访问地址:http://localhost:8088/page/index?topic=testTopicA
    • 第二个订阅主题 testTopicB,访问地址:http://localhost:8088/page/index?topic=testTopicB
  • 之后互相发送消息,让我们来看看效果吧!

在SpringBoot中使用

没有特殊业务需求的时候,前端可以直接和RabbitMQ对接实现即时通讯。但是有时候我们需要通过服务端去通知前端,此时就需要在应用中集成MQTT了,接下来我们来讲讲如何在SpringBoot应用中使用MQTT。

  • 首先我们需要在 pom.xml中添加MQTT相关依赖;
   
<dependency>
    <groupId>org.springframework.integrationgroupId>
    <artifactId>spring-integration-mqttartifactId>
dependency>
复制代码
  • application.yml中添加MQTT相关配置,主要是访问地址、用户名密码、默认主题信息;
   rabbitmq:
  mqtt:
    url: tcp://localhost:1883
    username: guest
    password: guest
    defaultTopic: testTopic
复制代码
  • 编写一个Java配置类从配置文件中读取配置便于使用;
   /**
 * MQTT相关配置
 * Created by macro on 2020/9/15.
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
    /**
     * RabbitMQ连接用户名
     */
    private String username;
    /**
     * RabbitMQ连接密码
     */
    private String password;
    /**
     * RabbitMQ的MQTT默认topic
     */
    private String defaultTopic;
    /**
     * RabbitMQ的MQTT连接地址
     */
    private String url;
}
复制代码
  • 添加MQTT消息订阅者相关配置,使用 @ServiceActivator注解声明一个服务激活器,通过 MessageHandler来处理订阅消息;
   /**
 * MQTT消息订阅者相关配置
 * Created by macro on 2020/9/15.
 */
@Slf4j
@Configuration
public class MqttInboundConfig {
    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
                        mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //设置消息质量:0->至多一次;1->至少一次;2->只有一次
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message message) throws MessagingException {
                //处理订阅消息
                log.info("handleMessage : {}",message.getPayload());
            }

        };
    }
}
复制代码
  • 添加MQTT消息发布者相关配置;
   /**
 * MQTT消息发布者相关配置
 * Created by macro on 2020/9/15.
 */
@Configuration
public class MqttOutboundConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttConfig.getUrl()});
        options.setUserName(mqttConfig.getUsername());
        options.setPassword(mqttConfig.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}
复制代码
  • 添加MQTT网关,用于向主题中发送消息;
   /**
 * MQTT网关,通过接口将数据传递到集成流
 * Created by macro on 2020/9/15.
 */
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    /**
     * 发送消息到默认topic
     */
    void sendToMqtt(String payload);

    /**
     * 发送消息到指定topic
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    /**
     * 发送消息到指定topic并设置QOS
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
复制代码
  • 添加MQTT测试接口,使用MQTT网关向特定主题中发送消息;
   /**
 * MQTT测试接口
 * Created by macro on 2020/9/15.
 */
@Api(tags = "MqttController", description = "MQTT测试接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttGateway mqttGateway;

    @PostMapping("/sendToDefaultTopic")
    @ApiOperation("向默认主题发送消息")
    public CommonResult sendToDefaultTopic(String payload) {
        mqttGateway.sendToMqtt(payload);
        return CommonResult.success(null);
    }

    @PostMapping("/sendToTopic")
    @ApiOperation("向指定主题发送消息")
    public CommonResult sendToTopic(String payload, String topic) {
        mqttGateway.sendToMqtt(payload, topic);
        return CommonResult.success(null);
    }
}
复制代码
  • 调用接口向主题中发送消息进行测试;

  • 后台成功接收到消息并进行打印。
   2020-09-17 14:29:01.689  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息
2020-09-17 14:29:06.101  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息
2020-09-17 14:29:07.384  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息
复制代码

总结

消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。如果没有特殊业务需求,客户端或者前端直接使用MQTT对接消息中间件即可实现即时通讯,有特殊需求的时候也可以使用SpringBoot集成MQTT的方式来实现,总之消息中间件是实现即时通讯的一个好选择!

项目源码地址

github.com/macrozheng/…

本文 GitHub github.com/macrozheng/… 已经收录,欢迎大家Star!

相关 [rabbitmq 即时通讯 后端] 推荐:

RabbitMQ实现即时通讯居然如此简单!连后端代码都省得写了?

- - 掘金后端
SpringBoot实战电商项目mall(40k+star)地址:. 有时候我们的项目中会用到 即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功. 最近发现RabbitMQ可以很方便的实现 即时通讯功能,如果你没有特殊的业务需求,甚至可以不写后端代码,今天给大家讲讲如何使用RabbitMQ来实现 即时通讯.

史上最透彻的 RabbitMQ 可靠消息传输实战 - 后端 - 掘金

- -
缓存架构之史上讲的最明白的RabbitMQ可靠消息传输实战演练. 一、背景介绍:消息可靠传递的重要性. 比如:某个广告主(如:天猫)想在我们的平台(如:今日头条)投放广告,当通过我们的广告系统新建广告的时候,该消息在同步给redis缓存(es)的时候丢失了,而我们又没有发现,造成该广告无法正常显示出来,那这损失就打了,如果1天都没有该广告的投放记录,那就有可能是上百万的损失了,所以消息的可靠传输多我们的广告系统也是很重要的.

【架构】关于RabbitMQ

- - 学习笔记
1      什么是RabbitMQ. RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗. 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:. 例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一个exchange即可,剩下的消息分发工作由RabbitMQ完成.

RabbitMQ (三) 发布/订阅

- - CSDN博客架构设计推荐文章
转发请标明出处: http://blog.csdn.net/lmj623565791/article/details/37657225. 本系列教程主要来自于官网入门教程的翻译,然后自己进行了部分的修改与实验,内容仅供参考. 上一篇博客中,我们实现了工作队列,并且我们的工作队列中的一个任务只会发给一个工作者,除非某个工作者未完成任务意外被杀死,会转发给另外的工作者,如果你还不了解: RabbitMQ (二)工作队列.

rabbitmq java client api详解

- - 五四陈科学院
以下内容由 [五四陈科学院]提供. AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现. 每个rabbitmq-server叫做一个Broker,等着tcp连接进入. 在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型. Queue是进程内的逻辑队列,有多个,有名字.

RabbitMQ:镜像队列Mirrored queue

- - 飞翔的荷兰人
        在上一节 《RabbitMQ集群类型一:在单节点上构建built-in内置集群》中我们已经学习过:在集群环境中,队列只有元数据会在集群的所有节点同步,但队列中的数据只会存在于一个节点,数据没有冗余且容易丢,甚至在durable的情况下,如果所在的服务器节点宕机,就要等待节点恢复才能继续提供消息服务.

企业即时通讯市场研究

- chris - 月光博客
  企业即时通信,简称EIM(Enterprise Instant Messaging),它是一种面向企业终端使用者的网络沟通工具服务,使用者可以通过安装了即时通信的终端机进行两人或多人之间的实时沟通. 交流内容包括文字、界面、语音、视频及文件互发等.   根据国外调研机构eMarketer数据显示,截止到2010年底,企业即时通讯市场规模将达到6.88亿美元.

即时通讯解决方案参考

- - CSDN博客Web前端推荐文章
方案1、使用GCM服务(Google Cloud Messaging). 简介:Google推出的云消息服务,即第二代的C2DM. 优点:Google提供的服务、原生、简单,无需实现和部署服务端. 缺点:Android版本限制(必须大于2.2版本),该服务在国内不够稳定、需要用户绑定Google帐号,受限于Google.

即时通讯协议之Qunar

- - 标点符
Qunar 由于业务上对 IM 系统的需求,以及对 IM 需要支持的功能和扩展,结合市面上已有的 IM 的实现,实现了自己的一套完善的办公 IM 和客服 IM 系统. 具备了以下几个重要特点:实时性,可靠性,一致性,安全性,扩展性,高并发. Startalk是去哪儿开源的一款通用的,高性能的企业级im套件.

抽取rabbitmq网络层做的echo server

- 2sin18 - codedump
传说rabbitmq网络层实现的优雅高效,于是我就尝试着将其中的网络层抽取出来,模拟着做了一个echo服务器,代码放在这里.. rabbitmq的做法是内置状态机,通过切换callback的形式处理不同的业务,这样只有一个子进程处理一个链接,性能提高不少.. 测试这个echo服务器的客户端我使用的是telnet,telnet输入的数据会自动在后面加上”\r\n”发送到对端,于是代码中以这个来判断是否接收了一条消息,抽取出来回复给对端..