利用RabbitMQ实现分布式事务

标签: | 发表时间:2021-08-03 15:49 | 作者:
出处:https://www.cnblogs.com

  实现要点:1、构建本地消息表及定时任务,确保消息可靠发送;2、RabbitMQ可靠消费;3、redis保证幂等

  两个服务:订单服务和消息服务

  订单服务消息可靠发送

  使用springboot构建项目,相关代码如下

spring:
  datasource:
    druid:
      url: jdbc:postgresql://127.0.0.1:5432/test01?characterEncoding=utf-8
      username: admin
      password: 123456
      driver-class-name: org.postgresql.Driver
      initial-size: 1
      max-active: 20
      max-wait: 6000
      pool-prepared-statements: true
      max-pool-prepared-statement-per-connection-size: 20
      connection-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=2000
      min-idle: 1
      time-between-eviction-runs-millis: 60000
      min-evictable-idle-time-millis: 300000
      validation-query: select 1
      test-while-idle: true
      test-on-borrow: false
      test-on-return: false
  rabbitmq:
    username: guest
    password: guest
    host: 127.0.0.1
    virtual-host: /
    port: 5672
    publisher-confirms: true
server:
  port: 8087
logging:
  level:
    org.springframework.jdbc.core.JdbcTemplate: DEBUG
     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
package com.jlwj.mqtransaction.bean;

import lombok.Data;

import java.io.Serializable;

/**
 * @author hehang on 2019-06-27
 * @descriptionsdf
 */

@Data
public class OrderBean implements Serializable {

    private String orderNo;
    private String orderInfo;
}
package com.jlwj.mqtransaction.service;

import com.alibaba.fastjson.JSON;
import com.jlwj.mqtransaction.bean.OrderBean;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

/**
 * @author hehang on 2019-07-01
 * @description发动消息到MQ
 */
@Service
@Slf4j
public class RabbitMQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @PostConstruct
    private void initRabbitTemplate(){
        //设置消息发送确认回调,发送成功后更新消息表状态
        rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
            log.info(String.valueOf(ack));
            if(ack){
                String sql = "update t_confirm set send_status = ? where id = ?";
                jdbcTemplate.update(sql,1,correlationData.getId());
            }
        });
    }
    public void sendMessage(OrderBean orderBean){
        rabbitTemplate.convertAndSend("orderExchange","orderRoutingKey", JSON.toJSONString(orderBean),
                message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;},
                new CorrelationData(orderBean.getOrderNo()));
    }

}
package com.jlwj.mqtransaction.service;

import com.alibaba.fastjson.JSON;
import com.jlwj.mqtransaction.bean.OrderBean;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;

/**
 * @author hehang on 2019-06-27
 * @description订单服务,简单期间不再写接口
 */

@Service
@Slf4j
public class OrderService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private RabbitMQService rabbitMQService;

    //注意加事务注解
    @Transactional(propagation = Propagation.REQUIRED)
    public void save(OrderBean orderBean){
        String sql1 = "insert into t_order(order_no,order_info) values (?,?)";
        String sql2 = "insert into t_confirm(id,message_info,send_status) values (?,?,?)";
        jdbcTemplate.update(sql1,orderBean.getOrderNo(),orderBean.getOrderInfo());
        jdbcTemplate.update(sql2,orderBean.getOrderNo(), JSON.toJSONString(orderBean),0);
        rabbitMQService.sendMessage(orderBean);
    }


}
package com.jlwj.mqtransaction.service;

import com.jlwj.mqtransaction.bean.OrderBean;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @author hehang on 2019-07-01
 * @description 定时扫描confirm表,重复发送未发送的消息
 */
@Service
@Slf4j
public class OrderScheduleService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private RabbitMQService rabbitMQService;

    //定时扫描记录表,将发送状态为0的消息再次发送,甚至可以记录重发次数,必要时人工干预,生产环境中需要单独部署定时任务
    @Scheduled(cron ="30/30 * * * * ?" )
    public void scanOrder(){
        log.info("定时扫面confirm表");
        String sql = "select o.* from t_order o join t_confirm c on o.order_no = c.id where c.send_status = 0";
        List<OrderBean> orderBeanList = jdbcTemplate.queryForList(sql, OrderBean.class);
        for (OrderBean orderBean : orderBeanList) {
            rabbitMQService.sendMessage(orderBean);
        }
    }
}

  消息服务相关代码

spring:
  rabbitmq:
    username: guest
    password: guest
    host: 127.0.0.1
    virtual-host: /
    port: 5672
    listener:
      simple:
        acknowledge-mode: manual
  redis:
    host: 127.0.0.1
      port: 6379
      timeout: 5000
      jedis:
        pool:
          max-idle: 8
          min-idle: 0
          max-active: 8
          max-wait: 1
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
package com.jlwj.messageservice.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * @author hehang on 2019-06-27
 * @descriptionmq配置类
 */
@Configuration
public class RabbitConfig {

    /**
     * 死信队列
     * @return
     */
    @Bean
    public Queue dlQueue(){
        return QueueBuilder.durable("dlQueue")
                .build();
    }

    @Bean
    public DirectExchange dlExchange(){
        return (DirectExchange) ExchangeBuilder.directExchange("dlExchange").build();
    }
    @Bean
    public Binding dlMessageBinding(){
        return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dlRoutingKey");
    }

    
    @Bean
    public DirectExchange messageDirectExchange() {
        return (DirectExchange) ExchangeBuilder.directExchange("orderExchange")
                .durable(true)
                .build();
    }

    @Bean
    public Queue messageQueue() {
        return QueueBuilder.durable("orderQueue")
                //配置死信
                .withArgument("x-dead-letter-exchange","dlExchange")
                .withArgument("x-dead-letter-routing-key","dlRoutingKey")
                .build();
    }

    @Bean
    public Binding messageBinding() {
        return BindingBuilder.bind(messageQueue())
                .to(messageDirectExchange())
                .with("orderRoutingKey");
    }

}
package com.jlwj.messageservice.listener;

import com.alibaba.fastjson.JSON;
import com.jlwj.messageservice.bean.OrderBean;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author hehang on 2019-06-28
 * @description订单监听
 */
@Component
@Slf4j

public class OrderListener {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;


    @RabbitListener(queues = "orderQueue")
    public void HandlerMessage(Channel channel, @Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
                                @Header(AmqpHeaders.REDELIVERED) boolean reDelivered ) throws IOException {
        log.info(message);
        OrderBean orderBean = JSON.parseObject(message,OrderBean.class);
        try {
            log.info("收到的消息为{}",JSON.toJSONString(orderBean));
            //保证幂等性
            if(stringRedisTemplate.opsForValue().get(orderBean.getOrderNo())==null){
                sendMessage(orderBean);
                stringRedisTemplate.opsForValue().set(orderBean.getOrderNo(),"1");
            }
            channel.basicAck(tag,false);
        } catch (Exception e) {
            if(reDelivered){
                log.info("消息已重复处理失败:{}",message);
                channel.basicReject(tag,false);
            }else{
                log.error("消息处理失败",e);
                //重新入队一次
                channel.basicNack(tag,false,true);
            }

        }
    }


    private void sendMessage(OrderBean orderBean)throws Exception{
        if(orderBean.getOrderNo().equals("0007")){
            int a =3/0;
        }
        log.info("模拟发送短信");
    }
}
package com.jlwj.messageservice.listener;

import com.alibaba.fastjson.JSON;
import com.jlwj.messageservice.bean.OrderBean;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author hehang on 2019-06-28
 * @description订单监听
 */
@Component
@Slf4j

public class DlListener {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;


    @RabbitListener(queues = "dlQueue")
    public void HandlerMessage(Channel channel, Message message, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info(new String(message.getBody()));
        //人工处理死信队列中的消息
        handlerDl(new String(message.getBody()));
        channel.basicAck(tag,false);
    }


    private void handlerDl(String message){
        log.info("发送邮件,请求人工干预:{}",message);
    }
}

相关 [利用 rabbitmq 分布] 推荐:

利用RabbitMQ实现分布式事务

- -
  实现要点:1、构建本地消息表及定时任务,确保消息可靠发送;2、RabbitMQ可靠消费;3、redis保证幂等.   两个服务:订单服务和消息服务.   使用springboot构建项目,相关代码如下. //设置消息发送确认回调,发送成功后更新消息表状态. //定时扫描记录表,将发送状态为0的消息再次发送,甚至可以记录重发次数,必要时人工干预,生产环境中需要单独部署定时任务.

【架构】关于RabbitMQ

- - 学习笔记
1      什么是RabbitMQ. RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗. 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:. 例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一个exchange即可,剩下的消息分发工作由RabbitMQ完成.

RabbitMQ (三) 发布/订阅

- - CSDN博客架构设计推荐文章
转发请标明出处: http://blog.csdn.net/lmj623565791/article/details/37657225. 本系列教程主要来自于官网入门教程的翻译,然后自己进行了部分的修改与实验,内容仅供参考. 上一篇博客中,我们实现了工作队列,并且我们的工作队列中的一个任务只会发给一个工作者,除非某个工作者未完成任务意外被杀死,会转发给另外的工作者,如果你还不了解: RabbitMQ (二)工作队列.

rabbitmq java client api详解

- - 五四陈科学院
以下内容由 [五四陈科学院]提供. AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现. 每个rabbitmq-server叫做一个Broker,等着tcp连接进入. 在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型. Queue是进程内的逻辑队列,有多个,有名字.

RabbitMQ:镜像队列Mirrored queue

- - 飞翔的荷兰人
        在上一节 《RabbitMQ集群类型一:在单节点上构建built-in内置集群》中我们已经学习过:在集群环境中,队列只有元数据会在集群的所有节点同步,但队列中的数据只会存在于一个节点,数据没有冗余且容易丢,甚至在durable的情况下,如果所在的服务器节点宕机,就要等待节点恢复才能继续提供消息服务.

抽取rabbitmq网络层做的echo server

- 2sin18 - codedump
传说rabbitmq网络层实现的优雅高效,于是我就尝试着将其中的网络层抽取出来,模拟着做了一个echo服务器,代码放在这里.. rabbitmq的做法是内置状态机,通过切换callback的形式处理不同的业务,这样只有一个子进程处理一个链接,性能提高不少.. 测试这个echo服务器的客户端我使用的是telnet,telnet输入的数据会自动在后面加上”\r\n”发送到对端,于是代码中以这个来判断是否接收了一条消息,抽取出来回复给对端..

RabbitMQ关键性问题调研

- - Java - 编程语言 - ITeye博客
摘要:本篇是本人对RabbitMQ使用的关键性问题进行的调研,如性能上限、数据存储、集群等,.             具体的 RabbitMQ概念、使用方法、SpringAMQP配置,假设读者已有了基础. 1.1  RabbitMQ数据速率问题. 在边读边写的情况下:速率只与网络带宽正相关,网络使用率最高能达到接近100%,并且数据使用率很高(90%以上).

[转][RabbitMQ+Python入门经典] 兔子和兔子窝

- lostsnow - heiyeluren的blog(黑夜路人的开源世界)
高级消息队列协议(AMQP1)是一个异步消息传递所使用的应用层协议规范. AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具. RabbitMQ作为一个工业级的消息队列中间件,基于AMQP协议的实现,由erlang语言编写. 本文讲解 RabbitMQ+Python 的使用.

使用django+celery+RabbitMQ实现异步执行

- MetUrMaker - idv2
RabbitMQ大家应该不陌生,著名的消息队列嘛. 可惜我最近才听说它的大名,了解之后不禁惊呼,世界上居然还有这种东西. 立刻觉得手里有了锤子,就看什么都是钉子了,主网站不愿意干的操作统统扔给RabbitMQ去做吧. 言归正传,先介绍一下这篇文章的应用场景吧. 我们知道大型网站的性能非常重要,然而有时不得不做一些相当耗时的操作.

Python 的服务器推送解决方案:Orbited + RabbitMQ

- 非狐外传 - python.cn(jobs, news)
最近公司要用到服务器推送技术,google了一下,nodejs固然好,但是公司的东西都是python搞的,. 所以选择了python的 Orbited +  RabbitMQ,无奈Orbited文档极其缺乏,所以要做下笔记. 以下都是在windows平台上搞的测试. RabbitMQ:先要装好Erlang,然后下RabbitMQ的win版exe文件安装,由于要用到stomp协议,.