RabbitMQ:RabbitMQ之消息确认机制(事务+Confirm) - 简书

标签: | 发表时间:2019-07-23 10:36 | 作者:
出处:https://www.jianshu.com
9824247-b8c9f19a9e567080.jpg

1、概述

在Rabbitmq中我们可以通过持久化来解决因为服务器异常而导致丢失的问题, 除此之外我们还会遇到一个问题:生产者将消息发送出去之后,消息到底有没有正确到达 Rabbit服务器呢?如果不错得数处理,我们是不知道的,(即Rabbit服务器不会反馈任何消息给生产者),也就是默认的情况下是不知道消息有没有正确到达;

导致的问题:消息到达服务器之前丢失,那么持久化也不能解决此问题,因为消息根本就没有到达Rabbit服务器!

RabbitMQ为我们提供了两种方式:

  • 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
  • 通过将channel设置成confirm模式来实现;
事务机制

RabbitMQ中与事务机制有关的方法有三个: txSelect(), txCommit()以及 txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。 关键代码:

      channel.txSelect(); 
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); 
channel.txCommit();

2、事务机制

事务机制类似于数据库的事务机制

2.1 生产者
      package com.hrabbit.rabbitmq.amqp.send;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午3:05
 * @Description:
 */
public class Send {
    private static final String QUEUE_NAME = "QUEUE_simple";

    public static void main(String[] args) throws IOException, TimeoutException {
         /* 获取一个连接 */
        Connection connection = ConnectionUtils.getConnection();
        /* 从连接中创建通道 */
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "Hello  Simple QUEUE !";
        try {
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            int result = 1 / 0;
            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
            System.out.println("----msg rollabck ");
        } finally {
            channel.close();
            connection.close();
        }
    }
}
2.1 消费者
      package com.hrabbit.rabbitmq.amqp.recover;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午3:08
 * @Description:
 */
public class Recover {
    private static final String QUEUE_NAME = "QUEUE_simple";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //获取到达的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

此种方式虽然可以解决事物的问题,但是此种模式还是很耗时的,采用这种方式 降低了Rabbitmq的消息吞吐量。

2、Confirm模式

2.1概念

上面我们介绍了RabbitMQ可能会遇到的一个问题,即生成者不知道消息是否真正到达broker,随后通过AMQP协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低RabbitMQ的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用Confirm模式。

2.2 producer端confirm模式的实现原理

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。 confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。

2.3 开启confirm模式的方法

已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。
生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
核心代码:

      //生产者通过调用channel的confirmSelect方法将channel设置为confirm模式  
channel.confirmSelect();

编程模式

  1. 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
  2. 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
  3. 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。
2.4 普通confirm模式
      package com.hrabbit.rabbitmq.confirm;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午3:20
 * @Description:
 */
public class SendConfirm {
    private static final String QUEUE_NAME = "QUEUE_simple_confirm";

    @Test
    public void sendMsg() throws IOException, TimeoutException, InterruptedException {
        /* 获取一个连接 */
        Connection connection = ConnectionUtils.getConnection();
        /* 从连接中创建通道 */
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
        channel.confirmSelect();
        String msg = "Hello   QUEUE !";
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        if (!channel.waitForConfirms()) {
            System.out.println("send message 失败");
        } else {
            System.out.println(" send messgae ok ...");
        }
        channel.close();
        connection.close();
    }
}
2.4 批量confirm模式

批量confirm模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,然后等待服务器端confirm, 相比普通confirm模式,批量极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。

      package com.hrabbit.rabbitmq.confirm;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午3:25
 * @Description:
 */
public class SendbatchConfirm {

    private static final String QUEUE_NAME = "QUEUE_simple_confirm";

    @Test
    public void sendMsg() throws IOException, TimeoutException, InterruptedException {
        /* 获取一个连接 */
        Connection connection = ConnectionUtils.getConnection();
        /* 从连接中创建通道 */
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
        channel.confirmSelect();

        //生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
        channel.confirmSelect();
        String msg = "Hello   QUEUE !";
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", QUEUE_NAME, null,msg.getBytes());
        }

        if (!channel.waitForConfirms()) {
            System.out.println("send message error");
        } else {
            System.out.println(" send messgae ok ...");
        }
        channel.close();
        connection.close();
    }
}
2.5 异步confirm模式

Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。

      package com.hrabbit.rabbitmq.confirm;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午3:28
 * @Description:
 */
public class SendAync {

    private static final String QUEUE_NAME = "QUEUE_simple_confirm_aync";
    public static void main(String[] args) throws IOException, TimeoutException {
        /* 获取一个连接 */
        Connection connection = ConnectionUtils.getConnection();
        /* 从连接中创建通道 */
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
        channel.confirmSelect();
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
        channel.addConfirmListener(new ConfirmListener() {
            //每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("--multiple--");
                    confirmSet.headSet(deliveryTag + 1).clear();
                    //用一个SortedSet, 返回此有序集合中小于end的所有元素。
                    } else {
                    System.out.println("--multiple false--");
                    confirmSet.remove(deliveryTag);
                }
            }
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
                if (multiple) {
                    confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    confirmSet.remove(deliveryTag);
                }
            }
        });
        String msg = "Hello   QUEUE !";
        while (true) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            confirmSet.add(nextSeqNo);
        }
    }
}

系列文章:

RabbitMQ:RabbitMQ-理论基础
RabbitMQ:RabbitMQ:快速入门hello word
RabbitMQ:RabbitMQ:work queues 工作队列(Round-robin/Fair dispatch)
RabbitMQ:RabbitMQ:消息应答与消息持久化
RabbitMQ:发布/订阅 Publish/Subscribe
RabbitMQ:路由Routing
RabbitMQ:Topic类型的exchange
RabbitMQ:spring整合RabbitMQ

相关 [rabbitmq rabbitmq 消息] 推荐:

【架构】关于RabbitMQ

- - 学习笔记
1      什么是RabbitMQ. RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗. 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:. 例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一个exchange即可,剩下的消息分发工作由RabbitMQ完成.

消息中间件的技术选型心得-RabbitMQ、ActiveMQ和ZeroMQ

- - haohtml's blog
RabbitMQ、ActiveMQ和ZeroMQ都是极好的消息中间件,但是我们在项目中该选择哪个更适合呢. 下面我会对这三个消息中间件做一个比较,看了后你们就心中有数了. RabbitMQ是AMQP协议领先的一个实现,它实现了代理(Broker)架构,意味着消息在发送到客户端之前可以在中央节点上排队.

rabbitmq消息去重及防丢失解决方案

- - fatboa
个人笔记,如有描述不当,欢迎留言指出~. 我们知道一个电商项目里时刻都有海量的消息通知,比如顾客注册通知、签到通知、下单通知等等,而我们公司的电商项目更加复杂,包含了客户端、门店端以及供应商端三端,各种各样的消息通知游走在各个服务模块间. 如果每个模块都要实现一套消息通知的功能,那无疑是多余的. 所以我把各模块的消息功能提取出来独立成一个服务模块,就像一个快递员,把各模块的消息准确投递至各端.

RabbitMQ高级之如何保证消息可靠性?

- - SegmentFault 最新的文章
人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长. 本篇是消息队列 RabbitMQ的第四弹. RabbitMQ我已经写了三篇了,基础的收发消息和基础的概念我都已经写了,学任何东西都是这样,先基础的上手能用,然后遇到问题再去解决,无法理解就去深入源码,随着时间的积累对这一门技术的理解也会随之提高.

rabbitmq消息去重及防丢失解决方案

- - cloudintheking
个人笔记,如有描述不当,欢迎留言指出~. 我们知道一个电商项目里时刻都有海量的消息通知,比如顾客注册通知、签到通知、下单通知等等,而我们公司的电商项目更加复杂,包含了客户端、门店端以及供应商端三端,各种各样的消息通知游走在各个服务模块间. 如果每个模块都要实现一套消息通知的功能,那无疑是多余的. 所以我把各模块的消息功能提取出来独立成一个服务模块,就像一个快递员,把各模块的消息准确投递至各端.

消息中间件选型分析——从Kafka与RabbitMQ的对比来看全局

- - 程序猿DD
有很多网友留言:公司要做消息中间件选型,该如何选. 消息选型的确是一个大论题,实则说来话长的事情又如何长话短说. 对此笔者专门撰稿一篇内功心法: 如何看待消息中间件的选型,不过这篇只表其意未表其行,为了弥补这种缺陷,笔者最近特意重新撰稿一篇,以供参考. 温馨提示:本文一万多字,建议先马(关注)后看.

史上最透彻的 RabbitMQ 可靠消息传输实战 - 后端 - 掘金

- -
缓存架构之史上讲的最明白的RabbitMQ可靠消息传输实战演练. 一、背景介绍:消息可靠传递的重要性. 比如:某个广告主(如:天猫)想在我们的平台(如:今日头条)投放广告,当通过我们的广告系统新建广告的时候,该消息在同步给redis缓存(es)的时候丢失了,而我们又没有发现,造成该广告无法正常显示出来,那这损失就打了,如果1天都没有该广告的投放记录,那就有可能是上百万的损失了,所以消息的可靠传输多我们的广告系统也是很重要的.

RabbitMQ (三) 发布/订阅

- - CSDN博客架构设计推荐文章
转发请标明出处: http://blog.csdn.net/lmj623565791/article/details/37657225. 本系列教程主要来自于官网入门教程的翻译,然后自己进行了部分的修改与实验,内容仅供参考. 上一篇博客中,我们实现了工作队列,并且我们的工作队列中的一个任务只会发给一个工作者,除非某个工作者未完成任务意外被杀死,会转发给另外的工作者,如果你还不了解: RabbitMQ (二)工作队列.

rabbitmq java client api详解

- - 五四陈科学院
以下内容由 [五四陈科学院]提供. AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现. 每个rabbitmq-server叫做一个Broker,等着tcp连接进入. 在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型. Queue是进程内的逻辑队列,有多个,有名字.

RabbitMQ:镜像队列Mirrored queue

- - 飞翔的荷兰人
        在上一节 《RabbitMQ集群类型一:在单节点上构建built-in内置集群》中我们已经学习过:在集群环境中,队列只有元数据会在集群的所有节点同步,但队列中的数据只会存在于一个节点,数据没有冗余且容易丢,甚至在durable的情况下,如果所在的服务器节点宕机,就要等待节点恢复才能继续提供消息服务.