-
-
个人笔记,如有描述不当,欢迎留言指出~
前提
我们知道一个电商项目里时刻都有海量的消息通知,比如顾客注册通知、签到通知、下单通知等等,而我们公司的电商项目更加复杂,包含了客户端、门店端以及供应商端三端,各种各样的消息通知游走在各个服务模块间。如果每个模块都要实现一套消息通知的功能,那无疑是多余的。所以我把各模块的消息功能提取出来独立成一个服务模块,就像一个快递员,把各模块的消息准确投递至各端。
我采用了自己熟悉的rabbitmq来实现消息功能,
当模块开发完交差时,组长冷不丁来了句:消息去重以及防丢失机制实现了没?w(゚Д゚)w好吧,赶紧去实现。
消息去重
依我的经验来看,在消费端去重比较好。因为即使生产端保证投递到rabbitmq上的消息是不重复的,但rabbitmq服务器有可能由于系统或网络原因导致消息重复推送到消费端,所以生产端去重是不可靠的,应当在消费端去重。
怎么解决呢?我的方案是在生产端投递消息的同时,传入correlationId关联id,在消费端接收消息后,从message的messageProperties中拿到correlationId,再根据correlationId从db中查询是否有相关记录。如果有,则说明这条消息已被我们消费过,直接ack,不进行业务处理;没有,那就把消息内容和correlationId存入表中,然后ack。
这里说明一下,我把消息的接收和业务处理分开来了。消息监听器只负责监听队列消息,并将其存至db中。在另外的任务线程里,从db中取消息记录进去业务处理,如果业务处理中出现异常,结合elasticsearch实现异常报警(这部分还没做,目前还只是记录下错误信息及消息内容)。
why?为啥分开处理,其实一开始的设计中消息接收和处理是写在一起的,消息处理成功回复ack,处理异常回复nack。但会有一个严重的问题,但测试环境中,我们发现总有那么几条消息卡在队列里,就因为处理异常回复nack,消息一直在重入队,严重消耗rabbitmq服务器的性能!所以说,大部分异常的消息,都不能指望把消息重推到别的消费端就能处理成功了,所以消息接收和处理分开来是比较好的。
方案是有了,但具体代码怎么实现呢?
生产端关键代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void send(String routingKey, String msg) { RabbitTemplate rabbitTemplate = applicationContext.getBean("rabbitTemplate", RabbitTemplate.class); rabbitTemplate.setReturnCallback(this); log.info("消息发送内容 : " + msg); CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (!ack) { throw new RuntimeException("send error " + cause); } else { log.info("send() 消息发送成功 "); } }); rabbitTemplate.convertAndSend("amq.topic", routingKey, msg, correlationId); }
|
我们点开 rabbitTemplate.convertAndSend
方法
1 2 3
| public void convertAndSend(String exchange, String routingKey, Object object, CorrelationData correlationData) throws AmqpException { this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData); }
|
看到没,convertAndSend方法是以 Object
来接收消息内容,它内部调用的send方法最终还是把Object类转成 Message
类
从上图可以看的出,Message包含了 ENCODING
(编码方式)、 SERIALIZER_MESSAGE_CONVERTER
(序列化消息转换器)、 messageProperties
(消息属性)、 body
(消息内容),队列里消息存放着这些东东。
我们再看看 MessageProperties里放着什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class MessageProperties implements Serializable { private static final long serialVersionUID = 1619000546531112290L; public static final String CONTENT_TYPE_BYTES = "application/octet-stream"; public static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain"; public static final String CONTENT_TYPE_SERIALIZED_OBJECT = "application/x-java-serialized-object"; public static final String CONTENT_TYPE_JSON = "application/json"; public static final String CONTENT_TYPE_JSON_ALT = "text/x-json"; public static final String CONTENT_TYPE_XML = "application/xml"; public static final String SPRING_BATCH_FORMAT = "springBatchFormat"; public static final String BATCH_FORMAT_LENGTH_HEADER4 = "lengthHeader4"; public static final String SPRING_AUTO_DECOMPRESS = "springAutoDecompress"; public static final String X_DELAY = "x-delay"; public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE; public static final Integer DEFAULT_PRIORITY; private final Map<String, Object> headers = new HashMap(); private volatile Date timestamp; private volatile String messageId; private volatile String userId; private volatile String appId; private volatile String clusterId; private volatile String type; private volatile String correlationId; private volatile String replyTo; private volatile String contentType = "application/octet-stream"; private volatile String contentEncoding; private volatile long contentLength; private volatile boolean contentLengthSet; private volatile MessageDeliveryMode deliveryMode; private volatile String expiration; private volatile Integer priority; private volatile Boolean redelivered; private volatile String receivedExchange; private volatile String receivedRoutingKey; private volatile String receivedUserId; private volatile long deliveryTag; private volatile boolean deliveryTagSet; private volatile Integer messageCount; private volatile String consumerTag; private volatile String consumerQueue; private volatile Integer receivedDelay; private volatile MessageDeliveryMode receivedDeliveryMode; private volatile boolean finalRetryForMessageWithNoId; private transient volatile Type inferredArgumentType; private transient volatile Method targetMethod; private transient volatile Object targetBean; }
|
果然 correlationId
就在这里,然后看到这里我就没继续深入了,原以为rabbitTemplate.convertAndSend方法会自动将correlationId放入messageProperties中,结果表明我错了。在消费端拿到的correlationId为 null
。也就是说,convertAndSend方法里correlationId根本就没有被放进去的,大家感兴趣的话可以看看源码,这里就不说了。
问题找出来就好办了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void send(String routingKey, String msg) { RabbitTemplate rabbitTemplate = applicationContext.getBean("rabbitTemplate", RabbitTemplate.class); rabbitTemplate.setReturnCallback(this); log.info("消息发送内容 : " + msg); CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (!ack) { throw new RuntimeException("send error " + cause); } else { log.info("send() 消息发送成功 "); } }); Message message = MessageBuilder.withBody(msg.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setCorrelationId(correlationId.toString()) .build(); rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationId); }
|
直接构建Message类,手动传入correlationId总行了吧。在消费端从Message里拿到correlationId,再从db查询就行了。好了,到这里去重机制就实现了
消息防丢失
rabbitmq是支持队列、消息的持久化的。即便rabbitmq突然挂了,那些尚在队列未能推送的消息在rabbitmq重启后也是能够继续推送的,所以丢失问题一般不出现在rabbitmq上。
rabbitmq将消息从队列推到消费端后,需要有一个回应告诉它队列里的这条消息的去留。主要有两种方式:
- auto: 自动回应,消息在发送给消费端后立即确认
- manual 手动回应,消息消费正常后由消费端返回ack;或消费异常返回nack,将消息重入队;或返回reject,丢弃该条消息
springboot的yaml配置
1 2 3 4 5 6 7 8 9 10 11 12
| rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin publisher-confirms: true #开启confirmcallback publisher-returns: true #开启returncallback listener: simple: acknowledge-mode: manual direct: acknowledge-mode: MANUAL
|
另外,如果消息在消费的时候,消费端与rabbitmq的连接中断了,那这条消息会被重新放回队列进行推送,这个时候我们的去重机制就起作用了;如果消费的时候,消费端死机了,长时间不回应rabbitmq,这时候我们可以将该消息转至死信队列,防止原队列阻塞。死信队列,这里也不做介绍,有兴趣百度呗。
所以消费端出现消息丢失的可能性也不大,问题就可能出在生产端。看看下面这张图
左边P代表生产端,中间是rabbitmq,右边是消费端,绿色的X是交换机,红色的是队列,用过rabbitmq的小伙伴肯定一目了然了。
rabbitmq 整个消息投递的路径为:
producer
-> rabbitmq broker cluster
-> exchange
-> queue
-> consumer
生产端投递消息到rabbitmq里,rabbitmq将消息发到交换机中,交换机再根据路由键将消息最终送到队列中,队列取出消息推送到消费端。只有最终抵达队列的消息才是可靠的,不会丢失。所以我们要实现的就是保证生产端的消息务必推送到rabbitmq的队列中。
那么生产端是怎么知道自己的消费准确投递到了队列中呢?rabbitmq返回了两个回调给生产端。
- message 从 producer 到 rabbitmq broker cluster 则会返回一个
confirmCallback
- message 从 exchange->queue 投递失败则会返回一个
returnCallback
。我们将利用这两个 callback 控制消息的最终一致性和部分纠错能力。
解决方案
生产端在投递消息前,先将消息内容、投递状态、重试次数记录在db中,然后在两个回调中修改记录状态。另外再开一个任务线程去取db中记录的失败消息,进行重新投递。
代码实现
失败消息记录实体类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| /** * @time: 2019/2/18 9:14 * @author: hl * @descripe: 失败消息记录 * @version: 1.0 */ Entity @Table(name = "t_failure_mq_record") @Data @NoArgsConstructor @EntityListeners(AuditingEntityListener.class) public class FailureMqRecord extends Uuid { /** * 失败消息内容 */ private String message; /** * 重试次数 */ @Column(name = "retry_time") private Integer retryTime; /** * 消息状态 1:投递成功 2:投递失败 */ private Integer status; /** * 关联id */ private String correlationId; /** * 创建时间 */ @CreatedDate @Column(name = "create_time", updatable = false) private Date createTime; public FailureMqRecord(String message) { this.message = message; } }
|
rabbitmq发送器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| /** * @time: 2019/2/13 9:47 * @author: hl * @descripe: * @version: 1.0 */ @Component @Slf4j public class RabbitMqSender implements RabbitTemplate.ReturnCallback { @Autowired private ApplicationContext applicationContext; @Autowired private FailureMqRecordRepository failureMqRecordRepository; public void send(String routingKey, FailureMqRecord failureMqRecord) { RabbitTemplate rabbitTemplate = applicationContext.getBean("rabbitTemplate", RabbitTemplate.class); //设置当前实例为rabbitmqtemplate的returncallback rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(((correlationData1, ack, cause) -> { if (!ack) { //投递至broker失败 failureMqRecord.setStatus(2);//设为投递失败 failureMqRecordRepository.save(failureMqRecord); } })); Message message = MessageBuilder.withBody(failureMqRecord.getMessage().getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setCorrelationId(failureMqRecord.getCorrelationId()) .build(); rabbitTemplate.convertAndSend("amq.topic", routingKey, message, new CorrelationData(failureMqRecord.getCorrelationId())); failureMqRecord.setStatus(1);//设为投递成功 failureMqRecord.setRetryTime(failureMqRecord.getRetryTime() + 1);//重试次数+1 failureMqRecordRepository.save(failureMqRecord); } /** * 消息由exchang未能正确投递到queue时触发回调 * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) throws BusinessException { FailureMqRecord mq = failureMqRecordRepository.findByCorrelationId(message.getMessageProperties().getCorrelationId()); mq.setStatus(2); failureMqRecordRepository.save(mq); log.error("审批消息:{} 投递至路由:{}失败", message.getBody(), routingKey); } }
|
任务线程,实现消息重试机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| /** * @time: 2019/2/18 10:10 * @author: huanglong * @descripe: 消息重发定时器 * @version: 1.0 */ @Component @Slf4j public class MqScheduler { @Autowired private RabbitMqSender rabbitMqSender; @Autowired private ApprovalMqRepository failureMqRecordRepository; @Autowired private RedisDistributedLock redisDistributedLock; /** * 每3分钟执行一次 * 将投递失败消息重新投递到rabbitmq */ @Scheduled(cron = "* */3 * * * ?") void push() { List<FailureMqRecord> failureMqRecords = failureMqRecordRepository.findAll() .stream() .filter(failureMqRecord -> { if (failureMqRecord.getRetryTime() == 3) { log.error("警告!该消息已重投3次失败,请人工处理,消息记录uuid:{}", failureMqRecord.getUuid()); } //过滤出重试次数不超过3次、状态为2的消息记录 if (failureMqRecord.getRetryTime() < 3 && failureMqRecord.getStatus() == 0) { return true; } return false; }) .collect(Collectors.toList()); Iterator<FailureMqRecord> mqIterator = failureMqRecords.iterator(); while (mqIterator.hasNext()) { FailureMqRecord failureMqRecord = mqIterator.next(); //获取,过期时间5秒,不重复获取 if (redisDistributedLock.lock(failureMqRecord.getUuid(), 5000L, 0, 1000L)) { // 因为有可能上一个线程刚释放该记录的锁,就被当前先线程获取到该记录的锁,导致记录已被 FailureMqRecord failurelatest = failureMqRecordRepository.findById(failureMqRecord.getUuid()).orElse(null); ApprovalPushMessage pushMessage = JSON.parseObject(failurelatest.getMessage(), ApprovalPushMessage.class); //当前时间距离该记录最近一次修改时间的间隔,防止上个线程重试过后,当前线程又重试一次 long lastUpdatePeriod = System.currentTimeMillis() - failurelatest.getUpdateTime().getTime(); //距离上次更新间隔 //重判断记录是否符合条件,重试次数小于3、状态为投递失败、距离上次重试不能少于2分钟 if (failurelatest.getRetryTime() < 3 && failurelatest.getStatus() == 2 && lastUpdatePeriod > 2 * 60 * 1000) { rabbitMqSender.send("approval.create", failureMqRecord); } //释放 redisDistributedLock.releaseLock(failureMqRecord.getUuid()); } } } }
|
聪明的小伙伴看到这里,会发现任务线程里还用到了分布式。为啥还要加分布式锁,因为是分布式架构啊,会有多个相同定时器从db里取记录处理,如果不加分布式锁,那真的要乱套了。因为redis用的多,就用redis来实现分布式锁了,zookeeper啥的,有空再研究了。redis分布式锁的代码实现,网上有很多资源,我这里就不贴了,嘿嘿
好了,到这里rabbitmq的去重以及防丢失方案已经实现了,如果你有更好的解决方案或者指出我方案的不足,欢迎留言讨论