-  
    
  - 
   个人笔记,如有描述不当,欢迎留言指出~
   前提
 我们知道一个电商项目里时刻都有海量的消息通知,比如顾客注册通知、签到通知、下单通知等等,而我们公司的电商项目更加复杂,包含了客户端、门店端以及供应商端三端,各种各样的消息通知游走在各个服务模块间。如果每个模块都要实现一套消息通知的功能,那无疑是多余的。所以我把各模块的消息功能提取出来独立成一个服务模块,就像一个快递员,把各模块的消息准确投递至各端。  
我采用了自己熟悉的rabbitmq来实现消息功能,  
当模块开发完交差时,组长冷不丁来了句:消息去重以及防丢失机制实现了没?w(゚Д゚)w好吧,赶紧去实现。
   消息去重
   
  
依我的经验来看,在消费端去重比较好。因为即使生产端保证投递到rabbitmq上的消息是不重复的,但rabbitmq服务器有可能由于系统或网络原因导致消息重复推送到消费端,所以生产端去重是不可靠的,应当在消费端去重。
 怎么解决呢?我的方案是在生产端投递消息的同时,传入correlationId关联id,在消费端接收消息后,从message的messageProperties中拿到correlationId,再根据correlationId从db中查询是否有相关记录。如果有,则说明这条消息已被我们消费过,直接ack,不进行业务处理;没有,那就把消息内容和correlationId存入表中,然后ack。
   这里说明一下,我把消息的接收和业务处理分开来了。消息监听器只负责监听队列消息,并将其存至db中。在另外的任务线程里,从db中取消息记录进去业务处理,如果业务处理中出现异常,结合elasticsearch实现异常报警(这部分还没做,目前还只是记录下错误信息及消息内容)。
   why?为啥分开处理,其实一开始的设计中消息接收和处理是写在一起的,消息处理成功回复ack,处理异常回复nack。但会有一个严重的问题,但测试环境中,我们发现总有那么几条消息卡在队列里,就因为处理异常回复nack,消息一直在重入队,严重消耗rabbitmq服务器的性能!所以说,大部分异常的消息,都不能指望把消息重推到别的消费端就能处理成功了,所以消息接收和处理分开来是比较好的。
 方案是有了,但具体代码怎么实现呢?  
生产端关键代码:  
  
            1       2       3       4       5       6       7       8       9       10       11       12       13       14      
   |          public void send(String routingKey, String msg) {             RabbitTemplate rabbitTemplate = applicationContext.getBean("rabbitTemplate", RabbitTemplate.class);             rabbitTemplate.setReturnCallback(this);             log.info("消息发送内容 : " + msg);             CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());             rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {                 if (!ack) {                     throw new RuntimeException("send error " + cause);                 } else {                     log.info("send() 消息发送成功 ");                 }             });             rabbitTemplate.convertAndSend("amq.topic", routingKey, msg, correlationId);         }      
  | 
 我们点开  rabbitTemplate.convertAndSend方法  
  
            1       2       3      
   |          public void convertAndSend(String exchange, String routingKey, Object object, CorrelationData correlationData) throws AmqpException {             this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);         }      
  | 
 看到没,convertAndSend方法是以  Object来接收消息内容,它内部调用的send方法最终还是把Object类转成  Message类  
  
  
从上图可以看的出,Message包含了  ENCODING(编码方式)、  SERIALIZER_MESSAGE_CONVERTER(序列化消息转换器)、  messageProperties(消息属性)、  body(消息内容),队列里消息存放着这些东东。  
我们再看看  MessageProperties里放着什么  
  
            1       2       3       4       5       6       7       8       9       10       11       12       13       14       15       16       17       18       19       20       21       22       23       24       25       26       27       28       29       30       31       32       33       34       35       36       37       38       39       40       41       42       43       44       45       46       47      
   |          public class MessageProperties implements Serializable {           private static final long serialVersionUID = 1619000546531112290L;           public static final String CONTENT_TYPE_BYTES = "application/octet-stream";           public static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain";           public static final String CONTENT_TYPE_SERIALIZED_OBJECT = "application/x-java-serialized-object";           public static final String CONTENT_TYPE_JSON = "application/json";           public static final String CONTENT_TYPE_JSON_ALT = "text/x-json";           public static final String CONTENT_TYPE_XML = "application/xml";           public static final String SPRING_BATCH_FORMAT = "springBatchFormat";           public static final String BATCH_FORMAT_LENGTH_HEADER4 = "lengthHeader4";           public static final String SPRING_AUTO_DECOMPRESS = "springAutoDecompress";           public static final String X_DELAY = "x-delay";           public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";           public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE;           public static final Integer DEFAULT_PRIORITY;           private final Map<String, Object> headers = new HashMap();           private volatile Date timestamp;           private volatile String messageId;           private volatile String userId;           private volatile String appId;           private volatile String clusterId;           private volatile String type;           private volatile String correlationId;           private volatile String replyTo;           private volatile String contentType = "application/octet-stream";           private volatile String contentEncoding;           private volatile long contentLength;           private volatile boolean contentLengthSet;           private volatile MessageDeliveryMode deliveryMode;           private volatile String expiration;           private volatile Integer priority;           private volatile Boolean redelivered;           private volatile String receivedExchange;           private volatile String receivedRoutingKey;           private volatile String receivedUserId;           private volatile long deliveryTag;           private volatile boolean deliveryTagSet;           private volatile Integer messageCount;           private volatile String consumerTag;           private volatile String consumerQueue;           private volatile Integer receivedDelay;           private volatile MessageDeliveryMode receivedDeliveryMode;           private volatile boolean finalRetryForMessageWithNoId;           private transient volatile Type inferredArgumentType;           private transient volatile Method targetMethod;           private transient volatile Object targetBean;       }      
  | 
 果然  correlationId就在这里,然后看到这里我就没继续深入了,原以为rabbitTemplate.convertAndSend方法会自动将correlationId放入messageProperties中,结果表明我错了。在消费端拿到的correlationId为  null。也就是说,convertAndSend方法里correlationId根本就没有被放进去的,大家感兴趣的话可以看看源码,这里就不说了。
 问题找出来就好办了  
  
            1       2       3       4       5       6       7       8       9       10       11       12       13       14       15       16       17       18      
   |          public void send(String routingKey, String msg) {             RabbitTemplate rabbitTemplate = applicationContext.getBean("rabbitTemplate", RabbitTemplate.class);             rabbitTemplate.setReturnCallback(this);             log.info("消息发送内容 : " + msg);             CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());             rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {                 if (!ack) {                     throw new RuntimeException("send error " + cause);                 } else {                     log.info("send() 消息发送成功 ");                 }             });             Message message = MessageBuilder.withBody(msg.getBytes())                     .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)                     .setCorrelationId(correlationId.toString())                     .build();             rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationId);         }      
  | 
 直接构建Message类,手动传入correlationId总行了吧。在消费端从Message里拿到correlationId,再从db查询就行了。好了,到这里去重机制就实现了
   消息防丢失
 rabbitmq是支持队列、消息的持久化的。即便rabbitmq突然挂了,那些尚在队列未能推送的消息在rabbitmq重启后也是能够继续推送的,所以丢失问题一般不出现在rabbitmq上。
 rabbitmq将消息从队列推到消费端后,需要有一个回应告诉它队列里的这条消息的去留。主要有两种方式:
   - auto: 自动回应,消息在发送给消费端后立即确认
   - manual 手动回应,消息消费正常后由消费端返回ack;或消费异常返回nack,将消息重入队;或返回reject,丢弃该条消息
 
      springboot的yaml配置  
  
            1       2       3       4       5       6       7       8       9       10       11       12      
   |          rabbitmq:          host: 127.0.0.1          port: 5672          username: admin          password: admin          publisher-confirms: true #开启confirmcallback          publisher-returns: true #开启returncallback          listener:            simple:              acknowledge-mode: manual            direct:              acknowledge-mode: MANUAL      
   | 
 另外,如果消息在消费的时候,消费端与rabbitmq的连接中断了,那这条消息会被重新放回队列进行推送,这个时候我们的去重机制就起作用了;如果消费的时候,消费端死机了,长时间不回应rabbitmq,这时候我们可以将该消息转至死信队列,防止原队列阻塞。死信队列,这里也不做介绍,有兴趣百度呗。  
所以消费端出现消息丢失的可能性也不大,问题就可能出在生产端。看看下面这张图  
  
  
左边P代表生产端,中间是rabbitmq,右边是消费端,绿色的X是交换机,红色的是队列,用过rabbitmq的小伙伴肯定一目了然了。
 rabbitmq 整个消息投递的路径为:  
  producer->  rabbitmq broker cluster->  exchange->  queue->  consumer
 生产端投递消息到rabbitmq里,rabbitmq将消息发到交换机中,交换机再根据路由键将消息最终送到队列中,队列取出消息推送到消费端。只有最终抵达队列的消息才是可靠的,不会丢失。所以我们要实现的就是保证生产端的消息务必推送到rabbitmq的队列中。
 那么生产端是怎么知道自己的消费准确投递到了队列中呢?rabbitmq返回了两个回调给生产端。
   - message 从 producer 到 rabbitmq broker cluster 则会返回一个    
confirmCallback    - message 从 exchange->queue 投递失败则会返回一个    
returnCallback 。我们将利用这两个 callback 控制消息的最终一致性和部分纠错能力。 
   解决方案
 生产端在投递消息前,先将消息内容、投递状态、重试次数记录在db中,然后在两个回调中修改记录状态。另外再开一个任务线程去取db中记录的失败消息,进行重新投递。
   代码实现
      失败消息记录实体类:  
  
            1       2       3       4       5       6       7       8       9       10       11       12       13       14       15       16       17       18       19       20       21       22       23       24       25       26       27       28       29       30       31       32       33       34       35       36       37       38       39       40       41       42      
   |          /**        * @time: 2019/2/18 9:14        * @author: hl        * @descripe: 失败消息记录        * @version: 1.0        */       Entity       @Table(name = "t_failure_mq_record")       @Data       @NoArgsConstructor       @EntityListeners(AuditingEntityListener.class)       public class FailureMqRecord extends Uuid {           /**            * 失败消息内容            */           private String message;           /**            * 重试次数            */           @Column(name = "retry_time")           private Integer retryTime;                  /**            * 消息状态 1:投递成功 2:投递失败            */           private Integer status;                  /**            * 关联id            */           private String correlationId;           /**            * 创建时间            */           @CreatedDate           @Column(name = "create_time", updatable = false)           private Date createTime;                  public FailureMqRecord(String message) {               this.message = message;           }       }      
   | 
      rabbitmq发送器:  
  
            1       2       3       4       5       6       7       8       9       10       11       12       13       14       15       16       17       18       19       20       21       22       23       24       25       26       27       28       29       30       31       32       33       34       35       36       37       38       39       40       41       42       43       44       45       46       47       48       49       50       51      
   |          /**        * @time: 2019/2/13 9:47        * @author: hl        * @descripe:        * @version: 1.0        */       @Component       @Slf4j       public class RabbitMqSender implements RabbitTemplate.ReturnCallback {           @Autowired           private ApplicationContext applicationContext;           @Autowired           private FailureMqRecordRepository failureMqRecordRepository;                  public void send(String routingKey, FailureMqRecord failureMqRecord) {               RabbitTemplate rabbitTemplate = applicationContext.getBean("rabbitTemplate", RabbitTemplate.class);               //设置当前实例为rabbitmqtemplate的returncallback               rabbitTemplate.setReturnCallback(this);               rabbitTemplate.setConfirmCallback(((correlationData1, ack, cause) -> {                   if (!ack) { //投递至broker失败                       failureMqRecord.setStatus(2);//设为投递失败                       failureMqRecordRepository.save(failureMqRecord);                   }               }));               Message message = MessageBuilder.withBody(failureMqRecord.getMessage().getBytes())                       .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)                       .setCorrelationId(failureMqRecord.getCorrelationId())                       .build();               rabbitTemplate.convertAndSend("amq.topic", routingKey, message, new CorrelationData(failureMqRecord.getCorrelationId()));               failureMqRecord.setStatus(1);//设为投递成功               failureMqRecord.setRetryTime(failureMqRecord.getRetryTime() + 1);//重试次数+1               failureMqRecordRepository.save(failureMqRecord);           }                  /**            * 消息由exchang未能正确投递到queue时触发回调            *            * @param message            * @param replyCode            * @param replyText            * @param exchange            * @param routingKey            */           @Override           public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) throws BusinessException {               FailureMqRecord mq = failureMqRecordRepository.findByCorrelationId(message.getMessageProperties().getCorrelationId());               mq.setStatus(2);               failureMqRecordRepository.save(mq);               log.error("审批消息:{} 投递至路由:{}失败", message.getBody(), routingKey);           }       }      
   | 
      任务线程,实现消息重试机制:  
  
            1       2       3       4       5       6       7       8       9       10       11       12       13       14       15       16       17       18       19       20       21       22       23       24       25       26       27       28       29       30       31       32       33       34       35       36       37       38       39       40       41       42       43       44       45       46       47       48       49       50       51       52       53       54       55       56      
   |          /**        * @time: 2019/2/18 10:10        * @author: huanglong        * @descripe: 消息重发定时器        * @version: 1.0        */       @Component       @Slf4j       public class MqScheduler {           @Autowired           private RabbitMqSender rabbitMqSender;           @Autowired           private ApprovalMqRepository failureMqRecordRepository;           @Autowired           private RedisDistributedLock redisDistributedLock;                  /**            * 每3分钟执行一次            * 将投递失败消息重新投递到rabbitmq            */           @Scheduled(cron = "* */3 * * * ?")           void push() {               List<FailureMqRecord> failureMqRecords = failureMqRecordRepository.findAll()                       .stream()                       .filter(failureMqRecord -> {                           if (failureMqRecord.getRetryTime() == 3) {                               log.error("警告!该消息已重投3次失败,请人工处理,消息记录uuid:{}", failureMqRecord.getUuid());                           }                           //过滤出重试次数不超过3次、状态为2的消息记录                           if (failureMqRecord.getRetryTime() < 3 && failureMqRecord.getStatus() == 0) {                               return true;                           }                           return false;                       })                       .collect(Collectors.toList());                      Iterator<FailureMqRecord> mqIterator = failureMqRecords.iterator();               while (mqIterator.hasNext()) {                   FailureMqRecord failureMqRecord = mqIterator.next();                   //获取,过期时间5秒,不重复获取                   if (redisDistributedLock.lock(failureMqRecord.getUuid(), 5000L, 0, 1000L)) {                       // 因为有可能上一个线程刚释放该记录的锁,就被当前先线程获取到该记录的锁,导致记录已被                       FailureMqRecord failurelatest = failureMqRecordRepository.findById(failureMqRecord.getUuid()).orElse(null);                       ApprovalPushMessage pushMessage = JSON.parseObject(failurelatest.getMessage(), ApprovalPushMessage.class);                       //当前时间距离该记录最近一次修改时间的间隔,防止上个线程重试过后,当前线程又重试一次                       long lastUpdatePeriod = System.currentTimeMillis() - failurelatest.getUpdateTime().getTime(); //距离上次更新间隔                       //重判断记录是否符合条件,重试次数小于3、状态为投递失败、距离上次重试不能少于2分钟                       if (failurelatest.getRetryTime() < 3 && failurelatest.getStatus() == 2 && lastUpdatePeriod > 2 * 60 * 1000) {                           rabbitMqSender.send("approval.create", failureMqRecord);                       }                       //释放                       redisDistributedLock.releaseLock(failureMqRecord.getUuid());                   }               }           }       }      
   | 
 聪明的小伙伴看到这里,会发现任务线程里还用到了分布式。为啥还要加分布式锁,因为是分布式架构啊,会有多个相同定时器从db里取记录处理,如果不加分布式锁,那真的要乱套了。因为redis用的多,就用redis来实现分布式锁了,zookeeper啥的,有空再研究了。redis分布式锁的代码实现,网上有很多资源,我这里就不贴了,嘿嘿
 好了,到这里rabbitmq的去重以及防丢失方案已经实现了,如果你有更好的解决方案或者指出我方案的不足,欢迎留言讨论