利用RabbitMQ实现分布式事务
- - 实现要点:1、构建本地消息表及定时任务,确保消息可靠发送;2、RabbitMQ可靠消费;3、redis保证幂等. 两个服务:订单服务和消息服务. 使用springboot构建项目,相关代码如下. //设置消息发送确认回调,发送成功后更新消息表状态. //定时扫描记录表,将发送状态为0的消息再次发送,甚至可以记录重发次数,必要时人工干预,生产环境中需要单独部署定时任务.
实现要点:1、构建本地消息表及定时任务,确保消息可靠发送;2、RabbitMQ可靠消费;3、redis保证幂等
两个服务:订单服务和消息服务
订单服务消息可靠发送
使用springboot构建项目,相关代码如下
spring: datasource: druid: url: jdbc:postgresql://127.0.0.1:5432/test01?characterEncoding=utf-8 username: admin password: 123456 driver-class-name: org.postgresql.Driver initial-size: 1 max-active: 20 max-wait: 6000 pool-prepared-statements: true max-pool-prepared-statement-per-connection-size: 20 connection-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=2000 min-idle: 1 time-between-eviction-runs-millis: 60000 min-evictable-idle-time-millis: 300000 validation-query: select 1 test-while-idle: true test-on-borrow: false test-on-return: false rabbitmq: username: guest password: guest host: 127.0.0.1 virtual-host: / port: 5672 publisher-confirms: true server: port: 8087 logging: level: org.springframework.jdbc.core.JdbcTemplate: DEBUG
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.10</version> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
package com.jlwj.mqtransaction.bean; import lombok.Data; import java.io.Serializable; /** * @author hehang on 2019-06-27 * @descriptionsdf */ @Data public class OrderBean implements Serializable { private String orderNo; private String orderInfo; }
package com.jlwj.mqtransaction.service; import com.alibaba.fastjson.JSON; import com.jlwj.mqtransaction.bean.OrderBean; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; /** * @author hehang on 2019-07-01 * @description发动消息到MQ */ @Service @Slf4j public class RabbitMQService { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private JdbcTemplate jdbcTemplate; @PostConstruct private void initRabbitTemplate(){ //设置消息发送确认回调,发送成功后更新消息表状态 rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> { log.info(String.valueOf(ack)); if(ack){ String sql = "update t_confirm set send_status = ? where id = ?"; jdbcTemplate.update(sql,1,correlationData.getId()); } }); } public void sendMessage(OrderBean orderBean){ rabbitTemplate.convertAndSend("orderExchange","orderRoutingKey", JSON.toJSONString(orderBean), message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}, new CorrelationData(orderBean.getOrderNo())); } }
package com.jlwj.mqtransaction.service; import com.alibaba.fastjson.JSON; import com.jlwj.mqtransaction.bean.OrderBean; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import javax.annotation.PostConstruct; /** * @author hehang on 2019-06-27 * @description订单服务,简单期间不再写接口 */ @Service @Slf4j public class OrderService { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private RabbitMQService rabbitMQService; //注意加事务注解 @Transactional(propagation = Propagation.REQUIRED) public void save(OrderBean orderBean){ String sql1 = "insert into t_order(order_no,order_info) values (?,?)"; String sql2 = "insert into t_confirm(id,message_info,send_status) values (?,?,?)"; jdbcTemplate.update(sql1,orderBean.getOrderNo(),orderBean.getOrderInfo()); jdbcTemplate.update(sql2,orderBean.getOrderNo(), JSON.toJSONString(orderBean),0); rabbitMQService.sendMessage(orderBean); } }
package com.jlwj.mqtransaction.service; import com.jlwj.mqtransaction.bean.OrderBean; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.util.List; /** * @author hehang on 2019-07-01 * @description 定时扫描confirm表,重复发送未发送的消息 */ @Service @Slf4j public class OrderScheduleService { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private RabbitMQService rabbitMQService; //定时扫描记录表,将发送状态为0的消息再次发送,甚至可以记录重发次数,必要时人工干预,生产环境中需要单独部署定时任务 @Scheduled(cron ="30/30 * * * * ?" ) public void scanOrder(){ log.info("定时扫面confirm表"); String sql = "select o.* from t_order o join t_confirm c on o.order_no = c.id where c.send_status = 0"; List<OrderBean> orderBeanList = jdbcTemplate.queryForList(sql, OrderBean.class); for (OrderBean orderBean : orderBeanList) { rabbitMQService.sendMessage(orderBean); } } }
消息服务相关代码
spring: rabbitmq: username: guest password: guest host: 127.0.0.1 virtual-host: / port: 5672 listener: simple: acknowledge-mode: manual redis: host: 127.0.0.1 port: 6379 timeout: 5000 jedis: pool: max-idle: 8 min-idle: 0 max-active: 8 max-wait: 1
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
package com.jlwj.messageservice.config; import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @author hehang on 2019-06-27 * @descriptionmq配置类 */ @Configuration public class RabbitConfig { /** * 死信队列 * @return */ @Bean public Queue dlQueue(){ return QueueBuilder.durable("dlQueue") .build(); } @Bean public DirectExchange dlExchange(){ return (DirectExchange) ExchangeBuilder.directExchange("dlExchange").build(); } @Bean public Binding dlMessageBinding(){ return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dlRoutingKey"); } @Bean public DirectExchange messageDirectExchange() { return (DirectExchange) ExchangeBuilder.directExchange("orderExchange") .durable(true) .build(); } @Bean public Queue messageQueue() { return QueueBuilder.durable("orderQueue") //配置死信 .withArgument("x-dead-letter-exchange","dlExchange") .withArgument("x-dead-letter-routing-key","dlRoutingKey") .build(); } @Bean public Binding messageBinding() { return BindingBuilder.bind(messageQueue()) .to(messageDirectExchange()) .with("orderRoutingKey"); } }
package com.jlwj.messageservice.listener; import com.alibaba.fastjson.JSON; import com.jlwj.messageservice.bean.OrderBean; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author hehang on 2019-06-28 * @description订单监听 */ @Component @Slf4j public class OrderListener { @Autowired private StringRedisTemplate stringRedisTemplate; @RabbitListener(queues = "orderQueue") public void HandlerMessage(Channel channel, @Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag, @Header(AmqpHeaders.REDELIVERED) boolean reDelivered ) throws IOException { log.info(message); OrderBean orderBean = JSON.parseObject(message,OrderBean.class); try { log.info("收到的消息为{}",JSON.toJSONString(orderBean)); //保证幂等性 if(stringRedisTemplate.opsForValue().get(orderBean.getOrderNo())==null){ sendMessage(orderBean); stringRedisTemplate.opsForValue().set(orderBean.getOrderNo(),"1"); } channel.basicAck(tag,false); } catch (Exception e) { if(reDelivered){ log.info("消息已重复处理失败:{}",message); channel.basicReject(tag,false); }else{ log.error("消息处理失败",e); //重新入队一次 channel.basicNack(tag,false,true); } } } private void sendMessage(OrderBean orderBean)throws Exception{ if(orderBean.getOrderNo().equals("0007")){ int a =3/0; } log.info("模拟发送短信"); } }
package com.jlwj.messageservice.listener; import com.alibaba.fastjson.JSON; import com.jlwj.messageservice.bean.OrderBean; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author hehang on 2019-06-28 * @description订单监听 */ @Component @Slf4j public class DlListener { @Autowired private StringRedisTemplate stringRedisTemplate; @RabbitListener(queues = "dlQueue") public void HandlerMessage(Channel channel, Message message, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { log.info(new String(message.getBody())); //人工处理死信队列中的消息 handlerDl(new String(message.getBody())); channel.basicAck(tag,false); } private void handlerDl(String message){ log.info("发送邮件,请求人工干预:{}",message); } }