rabbitmq消息去重及防丢失解决方案

标签: 后端笔记 rabbitmq | 发表时间:2019-03-22 11:38 | 作者:
出处:https://cloudintheking.github.io/

-


-

个人笔记,如有描述不当,欢迎留言指出~

前提

我们知道一个电商项目里时刻都有海量的消息通知,比如顾客注册通知、签到通知、下单通知等等,而我们公司的电商项目更加复杂,包含了客户端、门店端以及供应商端三端,各种各样的消息通知游走在各个服务模块间。如果每个模块都要实现一套消息通知的功能,那无疑是多余的。所以我把各模块的消息功能提取出来独立成一个服务模块,就像一个快递员,把各模块的消息准确投递至各端。
我采用了自己熟悉的rabbitmq来实现消息功能,
当模块开发完交差时,组长冷不丁来了句:消息去重以及防丢失机制实现了没?w(゚Д゚)w好吧,赶紧去实现。

消息去重

@消息大致流程
依我的经验来看,在消费端去重比较好。因为即使生产端保证投递到rabbitmq上的消息是不重复的,但rabbitmq服务器有可能由于系统或网络原因导致消息重复推送到消费端,所以生产端去重是不可靠的,应当在消费端去重。

怎么解决呢?我的方案是在生产端投递消息的同时,传入correlationId关联id,在消费端接收消息后,从message的messageProperties中拿到correlationId,再根据correlationId从db中查询是否有相关记录。如果有,则说明这条消息已被我们消费过,直接ack,不进行业务处理;没有,那就把消息内容和correlationId存入表中,然后ack。

这里说明一下,我把消息的接收和业务处理分开来了。消息监听器只负责监听队列消息,并将其存至db中。在另外的任务线程里,从db中取消息记录进去业务处理,如果业务处理中出现异常,结合elasticsearch实现异常报警(这部分还没做,目前还只是记录下错误信息及消息内容)。

why?为啥分开处理,其实一开始的设计中消息接收和处理是写在一起的,消息处理成功回复ack,处理异常回复nack。但会有一个严重的问题,但测试环境中,我们发现总有那么几条消息卡在队列里,就因为处理异常回复nack,消息一直在重入队,严重消耗rabbitmq服务器的性能!所以说,大部分异常的消息,都不能指望把消息重推到别的消费端就能处理成功了,所以消息接收和处理分开来是比较好的。

方案是有了,但具体代码怎么实现呢?
生产端关键代码:

1      
2
3
4
5
6
7
8
9
10
11
12
13
14
public void send(String routingKey, String msg) {      
RabbitTemplate rabbitTemplate = applicationContext.getBean("rabbitTemplate", RabbitTemplate.class);
rabbitTemplate.setReturnCallback(this);
log.info("消息发送内容 : " + msg);
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
throw new RuntimeException("send error " + cause);
} else {
log.info("send() 消息发送成功 ");
}
});
rabbitTemplate.convertAndSend("amq.topic", routingKey, msg, correlationId);
}

我们点开 rabbitTemplate.convertAndSend方法

1      
2
3
public void convertAndSend(String exchange, String routingKey, Object object, CorrelationData correlationData) throws AmqpException {      
this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
}

看到没,convertAndSend方法是以 Object来接收消息内容,它内部调用的send方法最终还是把Object类转成 Message
@Message.java
从上图可以看的出,Message包含了 ENCODING(编码方式)、 SERIALIZER_MESSAGE_CONVERTER(序列化消息转换器)、 messageProperties(消息属性)、 body(消息内容),队列里消息存放着这些东东。
我们再看看 MessageProperties里放着什么

1      
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class MessageProperties implements Serializable {      
private static final long serialVersionUID = 1619000546531112290L;
public static final String CONTENT_TYPE_BYTES = "application/octet-stream";
public static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain";
public static final String CONTENT_TYPE_SERIALIZED_OBJECT = "application/x-java-serialized-object";
public static final String CONTENT_TYPE_JSON = "application/json";
public static final String CONTENT_TYPE_JSON_ALT = "text/x-json";
public static final String CONTENT_TYPE_XML = "application/xml";
public static final String SPRING_BATCH_FORMAT = "springBatchFormat";
public static final String BATCH_FORMAT_LENGTH_HEADER4 = "lengthHeader4";
public static final String SPRING_AUTO_DECOMPRESS = "springAutoDecompress";
public static final String X_DELAY = "x-delay";
public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE;
public static final Integer DEFAULT_PRIORITY;
private final Map<String, Object> headers = new HashMap();
private volatile Date timestamp;
private volatile String messageId;
private volatile String userId;
private volatile String appId;
private volatile String clusterId;
private volatile String type;
private volatile String correlationId;
private volatile String replyTo;
private volatile String contentType = "application/octet-stream";
private volatile String contentEncoding;
private volatile long contentLength;
private volatile boolean contentLengthSet;
private volatile MessageDeliveryMode deliveryMode;
private volatile String expiration;
private volatile Integer priority;
private volatile Boolean redelivered;
private volatile String receivedExchange;
private volatile String receivedRoutingKey;
private volatile String receivedUserId;
private volatile long deliveryTag;
private volatile boolean deliveryTagSet;
private volatile Integer messageCount;
private volatile String consumerTag;
private volatile String consumerQueue;
private volatile Integer receivedDelay;
private volatile MessageDeliveryMode receivedDeliveryMode;
private volatile boolean finalRetryForMessageWithNoId;
private transient volatile Type inferredArgumentType;
private transient volatile Method targetMethod;
private transient volatile Object targetBean;
}

果然 correlationId就在这里,然后看到这里我就没继续深入了,原以为rabbitTemplate.convertAndSend方法会自动将correlationId放入messageProperties中,结果表明我错了。在消费端拿到的correlationId为 null。也就是说,convertAndSend方法里correlationId根本就没有被放进去的,大家感兴趣的话可以看看源码,这里就不说了。

问题找出来就好办了

1      
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void send(String routingKey, String msg) {      
RabbitTemplate rabbitTemplate = applicationContext.getBean("rabbitTemplate", RabbitTemplate.class);
rabbitTemplate.setReturnCallback(this);
log.info("消息发送内容 : " + msg);
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
throw new RuntimeException("send error " + cause);
} else {
log.info("send() 消息发送成功 ");
}
});
Message message = MessageBuilder.withBody(msg.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setCorrelationId(correlationId.toString())
.build();
rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationId);
}

直接构建Message类,手动传入correlationId总行了吧。在消费端从Message里拿到correlationId,再从db查询就行了。好了,到这里去重机制就实现了

消息防丢失

rabbitmq是支持队列、消息的持久化的。即便rabbitmq突然挂了,那些尚在队列未能推送的消息在rabbitmq重启后也是能够继续推送的,所以丢失问题一般不出现在rabbitmq上。

rabbitmq将消息从队列推到消费端后,需要有一个回应告诉它队列里的这条消息的去留。主要有两种方式:

  • auto: 自动回应,消息在发送给消费端后立即确认
  • manual 手动回应,消息消费正常后由消费端返回ack;或消费异常返回nack,将消息重入队;或返回reject,丢弃该条消息

springboot的yaml配置

1      
2
3
4
5
6
7
8
9
10
11
12
rabbitmq:      
host: 127.0.0.1
port: 5672
username: admin
password: admin
publisher-confirms: true #开启confirmcallback
publisher-returns: true #开启returncallback
listener:
simple:
acknowledge-mode: manual
direct:
acknowledge-mode: MANUAL

另外,如果消息在消费的时候,消费端与rabbitmq的连接中断了,那这条消息会被重新放回队列进行推送,这个时候我们的去重机制就起作用了;如果消费的时候,消费端死机了,长时间不回应rabbitmq,这时候我们可以将该消息转至死信队列,防止原队列阻塞。死信队列,这里也不做介绍,有兴趣百度呗。
所以消费端出现消息丢失的可能性也不大,问题就可能出在生产端。看看下面这张图
@来源于网络
左边P代表生产端,中间是rabbitmq,右边是消费端,绿色的X是交换机,红色的是队列,用过rabbitmq的小伙伴肯定一目了然了。

rabbitmq 整个消息投递的路径为:
producer-> rabbitmq broker cluster-> exchange-> queue-> consumer

生产端投递消息到rabbitmq里,rabbitmq将消息发到交换机中,交换机再根据路由键将消息最终送到队列中,队列取出消息推送到消费端。只有最终抵达队列的消息才是可靠的,不会丢失。所以我们要实现的就是保证生产端的消息务必推送到rabbitmq的队列中。

那么生产端是怎么知道自己的消费准确投递到了队列中呢?rabbitmq返回了两个回调给生产端。

  • message 从 producer 到 rabbitmq broker cluster 则会返回一个 confirmCallback
  • message 从 exchange->queue 投递失败则会返回一个 returnCallback 。我们将利用这两个 callback 控制消息的最终一致性和部分纠错能力。

解决方案

生产端在投递消息前,先将消息内容、投递状态、重试次数记录在db中,然后在两个回调中修改记录状态。另外再开一个任务线程去取db中记录的失败消息,进行重新投递。

代码实现

失败消息记录实体类:

1      
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**      
* @time: 2019/2/18 9:14
* @author: hl
* @descripe: 失败消息记录
* @version: 1.0
*/
Entity
@Table(name = "t_failure_mq_record")
@Data
@NoArgsConstructor
@EntityListeners(AuditingEntityListener.class)
public class FailureMqRecord extends Uuid {
/**
* 失败消息内容
*/
private String message;
/**
* 重试次数
*/
@Column(name = "retry_time")
private Integer retryTime;

/**
* 消息状态 1:投递成功 2:投递失败
*/
private Integer status;

/**
* 关联id
*/
private String correlationId;
/**
* 创建时间
*/
@CreatedDate
@Column(name = "create_time", updatable = false)
private Date createTime;

public FailureMqRecord(String message) {
this.message = message;
}
}

rabbitmq发送器:

1      
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**      
* @time: 2019/2/13 9:47
* @author: hl
* @descripe:
* @version: 1.0
*/
@Component
@Slf4j
public class RabbitMqSender implements RabbitTemplate.ReturnCallback {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private FailureMqRecordRepository failureMqRecordRepository;

public void send(String routingKey, FailureMqRecord failureMqRecord) {
RabbitTemplate rabbitTemplate = applicationContext.getBean("rabbitTemplate", RabbitTemplate.class);
//设置当前实例为rabbitmqtemplate的returncallback
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(((correlationData1, ack, cause) -> {
if (!ack) { //投递至broker失败
failureMqRecord.setStatus(2);//设为投递失败
failureMqRecordRepository.save(failureMqRecord);
}
}));
Message message = MessageBuilder.withBody(failureMqRecord.getMessage().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setCorrelationId(failureMqRecord.getCorrelationId())
.build();
rabbitTemplate.convertAndSend("amq.topic", routingKey, message, new CorrelationData(failureMqRecord.getCorrelationId()));
failureMqRecord.setStatus(1);//设为投递成功
failureMqRecord.setRetryTime(failureMqRecord.getRetryTime() + 1);//重试次数+1
failureMqRecordRepository.save(failureMqRecord);
}

/**
* 消息由exchang未能正确投递到queue时触发回调
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) throws BusinessException {
FailureMqRecord mq = failureMqRecordRepository.findByCorrelationId(message.getMessageProperties().getCorrelationId());
mq.setStatus(2);
failureMqRecordRepository.save(mq);
log.error("审批消息:{} 投递至路由:{}失败", message.getBody(), routingKey);
}
}

任务线程,实现消息重试机制:

1      
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**      
* @time: 2019/2/18 10:10
* @author: huanglong
* @descripe: 消息重发定时器
* @version: 1.0
*/
@Component
@Slf4j
public class MqScheduler {
@Autowired
private RabbitMqSender rabbitMqSender;
@Autowired
private ApprovalMqRepository failureMqRecordRepository;
@Autowired
private RedisDistributedLock redisDistributedLock;

/**
* 每3分钟执行一次
* 将投递失败消息重新投递到rabbitmq
*/
@Scheduled(cron = "* */3 * * * ?")
void push() {
List<FailureMqRecord> failureMqRecords = failureMqRecordRepository.findAll()
.stream()
.filter(failureMqRecord -> {
if (failureMqRecord.getRetryTime() == 3) {
log.error("警告!该消息已重投3次失败,请人工处理,消息记录uuid:{}", failureMqRecord.getUuid());
}
//过滤出重试次数不超过3次、状态为2的消息记录
if (failureMqRecord.getRetryTime() < 3 && failureMqRecord.getStatus() == 0) {
return true;
}
return false;
})
.collect(Collectors.toList());

Iterator<FailureMqRecord> mqIterator = failureMqRecords.iterator();
while (mqIterator.hasNext()) {
FailureMqRecord failureMqRecord = mqIterator.next();
//获取,过期时间5秒,不重复获取
if (redisDistributedLock.lock(failureMqRecord.getUuid(), 5000L, 0, 1000L)) {
// 因为有可能上一个线程刚释放该记录的锁,就被当前先线程获取到该记录的锁,导致记录已被
FailureMqRecord failurelatest = failureMqRecordRepository.findById(failureMqRecord.getUuid()).orElse(null);
ApprovalPushMessage pushMessage = JSON.parseObject(failurelatest.getMessage(), ApprovalPushMessage.class);
//当前时间距离该记录最近一次修改时间的间隔,防止上个线程重试过后,当前线程又重试一次
long lastUpdatePeriod = System.currentTimeMillis() - failurelatest.getUpdateTime().getTime(); //距离上次更新间隔
//重判断记录是否符合条件,重试次数小于3、状态为投递失败、距离上次重试不能少于2分钟
if (failurelatest.getRetryTime() < 3 && failurelatest.getStatus() == 2 && lastUpdatePeriod > 2 * 60 * 1000) {
rabbitMqSender.send("approval.create", failureMqRecord);
}
//释放
redisDistributedLock.releaseLock(failureMqRecord.getUuid());
}
}
}
}

聪明的小伙伴看到这里,会发现任务线程里还用到了分布式。为啥还要加分布式锁,因为是分布式架构啊,会有多个相同定时器从db里取记录处理,如果不加分布式锁,那真的要乱套了。因为redis用的多,就用redis来实现分布式锁了,zookeeper啥的,有空再研究了。redis分布式锁的代码实现,网上有很多资源,我这里就不贴了,嘿嘿

好了,到这里rabbitmq的去重以及防丢失方案已经实现了,如果你有更好的解决方案或者指出我方案的不足,欢迎留言讨论

相关 [rabbitmq 消息] 推荐:

消息中间件的技术选型心得-RabbitMQ、ActiveMQ和ZeroMQ

- - haohtml's blog
RabbitMQ、ActiveMQ和ZeroMQ都是极好的消息中间件,但是我们在项目中该选择哪个更适合呢. 下面我会对这三个消息中间件做一个比较,看了后你们就心中有数了. RabbitMQ是AMQP协议领先的一个实现,它实现了代理(Broker)架构,意味着消息在发送到客户端之前可以在中央节点上排队.

rabbitmq消息去重及防丢失解决方案

- - fatboa
个人笔记,如有描述不当,欢迎留言指出~. 我们知道一个电商项目里时刻都有海量的消息通知,比如顾客注册通知、签到通知、下单通知等等,而我们公司的电商项目更加复杂,包含了客户端、门店端以及供应商端三端,各种各样的消息通知游走在各个服务模块间. 如果每个模块都要实现一套消息通知的功能,那无疑是多余的. 所以我把各模块的消息功能提取出来独立成一个服务模块,就像一个快递员,把各模块的消息准确投递至各端.

RabbitMQ高级之如何保证消息可靠性?

- - SegmentFault 最新的文章
人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长. 本篇是消息队列 RabbitMQ的第四弹. RabbitMQ我已经写了三篇了,基础的收发消息和基础的概念我都已经写了,学任何东西都是这样,先基础的上手能用,然后遇到问题再去解决,无法理解就去深入源码,随着时间的积累对这一门技术的理解也会随之提高.

rabbitmq消息去重及防丢失解决方案

- - cloudintheking
个人笔记,如有描述不当,欢迎留言指出~. 我们知道一个电商项目里时刻都有海量的消息通知,比如顾客注册通知、签到通知、下单通知等等,而我们公司的电商项目更加复杂,包含了客户端、门店端以及供应商端三端,各种各样的消息通知游走在各个服务模块间. 如果每个模块都要实现一套消息通知的功能,那无疑是多余的. 所以我把各模块的消息功能提取出来独立成一个服务模块,就像一个快递员,把各模块的消息准确投递至各端.

消息中间件选型分析——从Kafka与RabbitMQ的对比来看全局

- - 程序猿DD
有很多网友留言:公司要做消息中间件选型,该如何选. 消息选型的确是一个大论题,实则说来话长的事情又如何长话短说. 对此笔者专门撰稿一篇内功心法: 如何看待消息中间件的选型,不过这篇只表其意未表其行,为了弥补这种缺陷,笔者最近特意重新撰稿一篇,以供参考. 温馨提示:本文一万多字,建议先马(关注)后看.

史上最透彻的 RabbitMQ 可靠消息传输实战 - 后端 - 掘金

- -
缓存架构之史上讲的最明白的RabbitMQ可靠消息传输实战演练. 一、背景介绍:消息可靠传递的重要性. 比如:某个广告主(如:天猫)想在我们的平台(如:今日头条)投放广告,当通过我们的广告系统新建广告的时候,该消息在同步给redis缓存(es)的时候丢失了,而我们又没有发现,造成该广告无法正常显示出来,那这损失就打了,如果1天都没有该广告的投放记录,那就有可能是上百万的损失了,所以消息的可靠传输多我们的广告系统也是很重要的.

【架构】关于RabbitMQ

- - 学习笔记
1      什么是RabbitMQ. RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗. 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:. 例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一个exchange即可,剩下的消息分发工作由RabbitMQ完成.

RabbitMQ (三) 发布/订阅

- - CSDN博客架构设计推荐文章
转发请标明出处: http://blog.csdn.net/lmj623565791/article/details/37657225. 本系列教程主要来自于官网入门教程的翻译,然后自己进行了部分的修改与实验,内容仅供参考. 上一篇博客中,我们实现了工作队列,并且我们的工作队列中的一个任务只会发给一个工作者,除非某个工作者未完成任务意外被杀死,会转发给另外的工作者,如果你还不了解: RabbitMQ (二)工作队列.

rabbitmq java client api详解

- - 五四陈科学院
以下内容由 [五四陈科学院]提供. AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现. 每个rabbitmq-server叫做一个Broker,等着tcp连接进入. 在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型. Queue是进程内的逻辑队列,有多个,有名字.

RabbitMQ:镜像队列Mirrored queue

- - 飞翔的荷兰人
        在上一节 《RabbitMQ集群类型一:在单节点上构建built-in内置集群》中我们已经学习过:在集群环境中,队列只有元数据会在集群的所有节点同步,但队列中的数据只会存在于一个节点,数据没有冗余且容易丢,甚至在durable的情况下,如果所在的服务器节点宕机,就要等待节点恢复才能继续提供消息服务.