RabbitMQ:RabbitMQ之消息确认机制(事务+Confirm) - 简书
1、概述
在Rabbitmq中我们可以通过持久化来解决因为服务器异常而导致丢失的问题, 除此之外我们还会遇到一个问题:生产者将消息发送出去之后,消息到底有没有正确到达 Rabbit服务器呢?如果不错得数处理,我们是不知道的,(即Rabbit服务器不会反馈任何消息给生产者),也就是默认的情况下是不知道消息有没有正确到达;
导致的问题:消息到达服务器之前丢失,那么持久化也不能解决此问题,因为消息根本就没有到达Rabbit服务器!
RabbitMQ
为我们提供了两种方式:
- 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
- 通过将channel设置成confirm模式来实现;
事务机制
RabbitMQ中与事务机制有关的方法有三个: txSelect()
, txCommit()
以及 txRollback()
, txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。 关键代码:
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.txCommit();
2、事务机制
事务机制类似于数据库的事务机制
2.1 生产者
package com.hrabbit.rabbitmq.amqp.send;
import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: hrabbit
* @Date: 2018-07-02 下午3:05
* @Description:
*/
public class Send {
private static final String QUEUE_NAME = "QUEUE_simple";
public static void main(String[] args) throws IOException, TimeoutException {
/* 获取一个连接 */
Connection connection = ConnectionUtils.getConnection();
/* 从连接中创建通道 */
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "Hello Simple QUEUE !";
try {
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
int result = 1 / 0;
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
System.out.println("----msg rollabck ");
} finally {
channel.close();
connection.close();
}
}
}
2.1 消费者
package com.hrabbit.rabbitmq.amqp.recover;
import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Auther: hrabbit
* @Date: 2018-07-02 下午3:08
* @Description:
*/
public class Recover {
private static final String QUEUE_NAME = "QUEUE_simple";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
//获取到达的消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
此种方式虽然可以解决事物的问题,但是此种模式还是很耗时的,采用这种方式 降低了Rabbitmq的消息吞吐量。
2、Confirm模式
2.1概念
上面我们介绍了RabbitMQ可能会遇到的一个问题,即生成者不知道消息是否真正到达broker,随后通过AMQP协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低RabbitMQ的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用Confirm模式。
2.2 producer端confirm模式的实现原理
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。 confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
2.3 开启confirm模式的方法
已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。
生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
核心代码:
//生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
channel.confirmSelect();
编程模式
- 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
- 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
- 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。
2.4 普通confirm模式
package com.hrabbit.rabbitmq.confirm;
import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: hrabbit
* @Date: 2018-07-02 下午3:20
* @Description:
*/
public class SendConfirm {
private static final String QUEUE_NAME = "QUEUE_simple_confirm";
@Test
public void sendMsg() throws IOException, TimeoutException, InterruptedException {
/* 获取一个连接 */
Connection connection = ConnectionUtils.getConnection();
/* 从连接中创建通道 */
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
channel.confirmSelect();
String msg = "Hello QUEUE !";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
if (!channel.waitForConfirms()) {
System.out.println("send message 失败");
} else {
System.out.println(" send messgae ok ...");
}
channel.close();
connection.close();
}
}
2.4 批量confirm模式
批量confirm模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,然后等待服务器端confirm, 相比普通confirm模式,批量极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。
package com.hrabbit.rabbitmq.confirm;
import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: hrabbit
* @Date: 2018-07-02 下午3:25
* @Description:
*/
public class SendbatchConfirm {
private static final String QUEUE_NAME = "QUEUE_simple_confirm";
@Test
public void sendMsg() throws IOException, TimeoutException, InterruptedException {
/* 获取一个连接 */
Connection connection = ConnectionUtils.getConnection();
/* 从连接中创建通道 */
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
channel.confirmSelect();
//生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
channel.confirmSelect();
String msg = "Hello QUEUE !";
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME, null,msg.getBytes());
}
if (!channel.waitForConfirms()) {
System.out.println("send message error");
} else {
System.out.println(" send messgae ok ...");
}
channel.close();
connection.close();
}
}
2.5 异步confirm模式
Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。
package com.hrabbit.rabbitmq.confirm;
import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
/**
* @Auther: hrabbit
* @Date: 2018-07-02 下午3:28
* @Description:
*/
public class SendAync {
private static final String QUEUE_NAME = "QUEUE_simple_confirm_aync";
public static void main(String[] args) throws IOException, TimeoutException {
/* 获取一个连接 */
Connection connection = ConnectionUtils.getConnection();
/* 从连接中创建通道 */
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
channel.confirmSelect();
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.addConfirmListener(new ConfirmListener() {
//每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("--multiple--");
confirmSet.headSet(deliveryTag + 1).clear();
//用一个SortedSet, 返回此有序集合中小于end的所有元素。
} else {
System.out.println("--multiple false--");
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
String msg = "Hello QUEUE !";
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
confirmSet.add(nextSeqNo);
}
}
}
系列文章:
RabbitMQ:RabbitMQ-理论基础
RabbitMQ:RabbitMQ:快速入门hello word
RabbitMQ:RabbitMQ:work queues 工作队列(Round-robin/Fair dispatch)
RabbitMQ:RabbitMQ:消息应答与消息持久化
RabbitMQ:发布/订阅 Publish/Subscribe
RabbitMQ:路由Routing
RabbitMQ:Topic类型的exchange
RabbitMQ:spring整合RabbitMQ