Springcloud + RocketMQ 解决分布式事务

标签: springcloud rocketmq 分布 | 发表时间:2020-03-25 10:47 | 作者:holyplace
出处:https://juejin.im/tag/%E6%9E%B6%E6%9E%84

开篇思考

  1. 为什么要分布式事务?
  2. 分布式事务有哪些实现方式?哪种可靠?
  3. 分布式哪些环节会出问题?出了问题怎么应对?

站在巨人的肩膀观察和思考

随着互联网时代的高速发展,分布式成了大型系统的标配,这是时代发展的选择。大型分布式系统不是每个公司和开发人员都能够涉及的领域,因为大型系统后面都 隐藏着众多代名词:复杂,昂贵,高科技,人才云集,大战略。。。

大部分领头互联网公司甚至依托自己的分布式经验逐步建立自己的体系,并使用这套体系搭建自己的平台对内,甚至对外提供服务, 就像现在众多的云平台提供的服务,甚至有些大战略提出促进发展:大中台小前台、大炮台支援单兵作战等等。

这里提到了中台的概念,这个概念很广,都是以用户为中心的,分布式只是其中的一小部分运用,之所以强行和分布式挂钩,是想说明现在的发展趋势变了, 我们的眼界是有限的,但是完全可以站在巨人的肩膀上,利用他们的高度来提升自己的眼界,来思考,我们到底应该怎么做,怎么做适合 我们自身发展。

我认为思考能力永远是一个程序员魅力所在,善于发现和思考,能够不断的帮助我们提升。

中台齿轮图

微服务架构的优势和问题

优势:

  • 扩展性强:可根据业务需要增加服务,不影响现有的服务架构
  • 单一隔离:服务和服务之前通过远程调用,单个服务只提供单一职责的功能,开发人员可以一人一服务进行开发,互不影响
  • 高可用:服务可以集群部署,单个应用失败通过重试和熔断等措施可以提高稳定性
  • 技术选型灵活:可以根据团队特点、业务需求选择合适的技术栈,提高开发效率
  • 突破性能瓶颈:可以集群方式部署,解决单体访问峰值等处理能力的性能瓶颈
  • 降低运维成本:应对不同的场景,例如双十一,可以动态增减服务集群数量,做到按需付费,减少开支

问题:

  • 复杂度高:整体架构设计十分复杂,需要考虑到整体性、经济性、稳定性
  • 容易混肴服务界限:哪些服务需要划分微服务,哪些可以作为子模块,需要认真思考
  • 难以确保一致性:因为是链路调用,一个请求会经过多个服务,难以确保执行结果达到一致性

CAP & BASE

因为写过这两个理论的具体介绍,这里简单再说明下。还有疑问的可以看介绍 链接

distribute-cap

这两是分布式架构发展至今形成的理论基础,只要分布式都绕不开的原则。CAP 教会我们如何在设计的时候取舍,是要高可用,还是强一致性? 这个看业务的具体情况和需求分析,如果是关于资金的流转的 例如:银行转账系统,肯定会要求保证一致性的场景更多,钱嘛,你懂得,不能多也不能少,少了错了就是大事。
如果只是一些简单的商品查询,可能用户更希望是可用性,也就是我随时查看都可以看到商品,但是商品信息可以在短时间内不一致。

也就是任何的架构设计,都是根据业务场景来分析的。通常 CAP 理论不够完善明确具体实现,这时候就可以用到 BASE 理论,简单的说就是
通过基本可用和软状态,来达到最终一致性的状态。

举个例子:秒杀服务,商品秒杀成功,商品库存 -1,但是订单入库需要调用订单服务,但是我们必须要先执行减库存操作成功后再执行,
这个时候可以先提交,然后发送到可靠消息系统 MQ,MQ 发送到订单服务进行入库操作,如果入库失败则进行重试

这里有个中间状态,库存信息入库而订单服务可能失败或者进行重试状态,这里要么重试成功,最终订单成功入库;要么彻底失败,回滚库存
入库信息改为初始状态。

上面的整体流程基本就是下面介绍的基于 RocketMQ 进行的分布式事务流程。

基于可靠消息(RocketMQ)最终一致性方案

distribute-cap

RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源。
​RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与
producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ 本身提供的存储机制为事务消息提供了
持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。

使用场景分析

流程举例(中国银行 -- 转账 -- 人民银行):

  1. 中国银行要扣款 - 100 ,准备 HalfMsg,消息中携带中国银行 - 100 的信息
  2. 中国银行 HalfMsg 成功发送后,执行数据库本地事务,在自己的库中 - 100
  3. 查看本地事务执行情况,成功则 commit 消息;失败则回滚不发送消息
  4. 人民银行系统订阅了 MQ,MQ 确保消息发送到人民银行系统
  5. 人民银行系统执行本地事务,数据库中 + 100

问题分析:

  1. half 消息没有发送成功:不会进入本地事务执行逻辑,且 MQ 没有消息
  2. 本地事务失败,rollback half 消息:MQ 只有 half 消息,不会被其他服务消费,此时可以回滚清除消息
  3. 本地事务成功,half 消息 commit 失败:MQ 定时查询本地事务状态,本地事务成功则继续 commit MQ 消息
  4. 消费者消费 MQ 消息失败:事务没有执行,或者事务执行失败,基于 ACK 的重试机制重试,知道消费成功,如果还是不行,记录日志持久化,预警查看问题是执行回滚还是更新

如何在项目中使用

以上主要流程都是 RocketMQ 实现,对用户使用来说,用户需要实现的部分是:①本地事务执行 ;②本地事务回查方法
因此代码中的实现只需关注本地事务的执行状态即可。下面贴出具体实现方式

pom 依赖

springcloud pom 的基础上,添加依赖,完整的 springcloud alibaba pom 可以参考
这里

    
 <dependency> <groupId>com.alibaba.cloudgroupId> <artifactId>spring-cloud-starter-stream-rocketmqartifactId> dependency>  
复制代码

模块添加

    
@SpringBootApplication  
@EnableDiscoveryClient  
@EnableFeignClients  
@MapperScan("com.holy.nacosconsumerone.dao")  
@EnableBinding({ MySource.class })  
public class NacosConsumerOneApplication {  
  
 public static void main(String[] args) { SpringApplication.run(NacosConsumerOneApplication.class, args); }}  
复制代码

mysource

  public interface MySource {  
  
 @Output("output1") MessageChannel output1();  
 @Output("output2") MessageChannel output2();  
 @Output("output3") MessageChannel output3();  
 @Output("output4") MessageChannel output4();  
 }```  
  
sendservice   
```java  
/**  
 * @author */@Service  
public class SenderService {  
  
 @Autowired private MySource source;  
 public void send(String msg) throws Exception { source.output1().send(MessageBuilder.withPayload(msg).build()); }  
 public  void sendTransactionalMsg(T msg, int num) throws Exception { MessageBuilder builder = MessageBuilder.withPayload(msg) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON); builder.setHeader("tx-header-num", String.valueOf(num)); builder.setHeader(RocketMQHeaders.TAGS, "binder"); Message message = builder.build(); source.output2().send(message); }  
}  
复制代码

划重点 : MQ 事务监听类实现 RocketMQLocalTransactionListener

  import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;  
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;  
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;  
import org.springframework.messaging.Message;  
  
/**  
 * 对需要使用分布式事务的消息发送接口监听 * 根据事务消息分组来致性 * ①本地事务先执行,根据业务情况执行提交、回滚操作 * ②本地事务回查 * * @author */@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5,  
 maximumPoolSize = 10)public class TransactionListenerImpl implements RocketMQLocalTransactionListener {  
  
 /** * 执行本地事务 * ①事务执行成功,commit * ②事务执行失败,rollback * ③回查发送消息,unknown * * @param msg  
 * @param arg  
 * @return */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { Object num = msg.getHeaders().get("tx-header-num"); try { // 本地业务代码,事务执行 if ("1".equals(num)) { System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown"); return RocketMQLocalTransactionState.UNKNOWN; } else if ("2".equals(num)) { throw new Exception("Exception for RocketMQ rollback"); } System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit"); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { System.out.println(e.getMessage()); System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback"); return RocketMQLocalTransactionState.ROLLBACK; }  
 }  
 /** * 执行本地事务回查,当状态为 UNKNOW 会执行这个方法,回查间隔时间差不多一分钟。 * * 业务代码用来检查事务当前状态,是否执行完成,如果完成就执行 COMMIT * * @param msg  
 * @return */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 检查本地事务 System.out.println("check: " + new String((byte[]) msg.getPayload())); return RocketMQLocalTransactionState.COMMIT; }  
}  
复制代码

controller 测试:

  /**  
 * @author holy */@RestController  
@RequestMapping("/rocketMQ")  
public class RocketMQController {  
  
 @Resource private SenderService senderService;  
 @GetMapping(value = "/transactionMsg") public Object rocketMQTX(int num,String msg) { try { senderService.sendTransactionalMsg(msg,num); } catch (Exception e) { e.printStackTrace(); } return "OK" + num; }  
}  
复制代码

配置文件:

  spring:
    application:
        name: service-consumer
    cloud:
        stream:
          bindings:
              output1:
                  content-type: application/json
                  destination: test-topic
              output2:
                  content-type: application/json
                  destination: TransactionTopic
              output3:
                  content-type: text/plain
                  destination: pull-topic
          rocketmq:
              bindings:
                  output1:
                      producer:
                          group: binder-group
                          sync: true
                  output2:
                      producer:
                          group: myTxProducerGroup
                          transactional: true
                  output3:
                      producer:
                          group: pull-binder-group
              binder:
                name-server: 192.168.244.89:9876
复制代码

完整的代码的可以看我的完整项目: 项目地址

后续思考

本地事务复杂,执行查询时间太久如何处理?

针对不同的情况,其实我们只要认真分析场景,自然可以设计出对应的解决办法。
有些时候我们更希望通过一张事务执行情况表来判断事务的整体实行情况,比如业务比较复杂的时候,需要更新很多的表信息,这时候
使用事务表根据 TransactionID 来记录事务执行情况反而更贴合实际的使用场景。

如果我不是用 rocketMQ ,可以通过其他的 MQ 来实现分布式事务处理吗?

其实这个也是没有问题,大体思路就是封装新服务,专门用来检查事务执行情况,根据事务状态来决定是否发送消息到 MQ。

有好的建议欢迎在下方留言,大家一起讨论一起进步。优秀的评论会被选出并赠送奖励

喜欢文章请关注我

程序领域
程序领域(id:think-holy) 作者:holy 程序汪一只

公众号 赞赏码

相关 [springcloud rocketmq 分布] 推荐:

Springcloud + RocketMQ 解决分布式事务

- - 掘金架构
分布式事务有哪些实现方式. 随着互联网时代的高速发展,分布式成了大型系统的标配,这是时代发展的选择. 大型分布式系统不是每个公司和开发人员都能够涉及的领域,因为大型系统后面都 隐藏着众多代名词:复杂,昂贵,高科技,人才云集,大战略. 大部分领头互联网公司甚至依托自己的分布式经验逐步建立自己的体系,并使用这套体系搭建自己的平台对内,甚至对外提供服务, 就像现在众多的云平台提供的服务,甚至有些大战略提出促进发展:大中台小前台、大炮台支援单兵作战等等.

分布式开放消息系统(RocketMQ)的原理与实践

- - 编程语言 - ITeye博客
备注:1.如果您此前未接触过RocketMQ,请先阅读附录部分,以便了解RocketMQ的整体架构和相关术语2.文中的MQServer与Broker表示同一概念.   分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点. 而谈到消息系统的设计,就回避不了两个问题:  .

基于springcloud实现的灰度发布

- -
基于springcloud实现的灰度发布. gray-config-server 配置中心. 端口:6007,方便起见直接读取配置文件,生产环境可以读取git. 先启动配置中心,所有服务的配置(包括注册中心的地址)均从配置中心读取. gray-xxx-service 服务消费者. 调用服务提供者和服务提供者,验证是否进入灰度服务.

SpringCloud项目接入Jaeger(下) - 掘金

- -
spring-cloud-sleuth这个组件时,会面临两个问题. 首先是日志中无法显示traceId和spanId这些链路信息,其次是不能在用. spring-cloud-sleuth所提供的方式进行链路传值. spring-cloud-sleuth是将traceId等链路信息保存在. slf4j的MDC(Mapped Diagnostic Contexts)中,然后通过%X{traceId}这种方式将traceId提取出来,比如打印到控制台的默认格式是:.

RocketMQ与Kafka对比(18项差异)

- - 开源软件 - ITeye博客
为了方便大家选型,整理一份RocketMQ与Kafka的对比文档,文中如有错误之处,欢迎来函指正. RocketMQ支持异步实时刷盘,同步刷盘,同步Replication,异步Replication. Kafka使用异步刷盘方式,异步Replication. 总结:RocketMQ的同步刷盘在单机可靠性上比Kafka更高,不会因为操作系统Crash,导致数据丢失.

国信证券 RocketMQ + Spring Boot 在配置热加载的实践

- - IT瘾-dev
在进行微服务架构研发的过程中,不少研发人员都提出配置热加载的需求,一方面是为了提升研发效率,开发人员修改配置不需要重启服务;另一方面是为了线上紧急事件,可以快速恢复,例如线上数据库 down 了,可以紧急启动降级开关,通过配置热加载动态生效,降低处理事件的时间. 于是我采用 rocketmq 消息队列,实现一套解耦的配置热加载机制.

RocketMQ 在使用上的一些排坑和优化

- - 掘金架构
RocketMQ 在我们的项目中使用非常广泛,在使用的过程中,也遇到了很多的问题. 比如没有多环境的隔离,在多个版本同时开发送测的情况下,互相干扰严重. RocketMQ 的投递可能会失败,导致丢失消息. 另外开源版本的 RocketMQ 不支持任意时间精度的延时消息,仅支持特定的 level. 在使用的过程中,我们做了一些针对性的优化,整理出了这篇文章.

SpringCloud基础教程(五)-配置中心热生效和高可用

- - 掘金后端
 我的博客: 兰陵笑笑生,欢迎浏览博客.  上一章 SpringCloud基础教程(四)-配置中心入门当中,我们在对Eureka的有了基本的基础认识之上,深入的了解Eureka高可用集群和其他的生产环境中用到的一些配置. 本章将开始了解分布式环境下的配置中心.  在实际的项目运行中,我们会根据实际需求修改配置内容,那么有没有一种方式,能够在不启动服务组件的情况向让配置文件动态的生效呢,Spring Cloud Conifg中提供了一种方式了.

SpringCloud灰度发布实践(附源码) - 微服务实践 - SegmentFault 思否

- -
在平时的业务开发过程中,后端服务与服务之间的调用往往通过. resttemplate两种方式. 但是我们在调用服务的时候往往只需要写服务名就可以做到路由到具体的服务,这其中的原理相比大家都知道是. ribbon组件帮我们做了负载均衡的功能. 灰度的核心就是路由,如果我们能够重写ribbon默认的负载均衡算法是不是就意味着我们能够控制服务的转发呢.

netty无缝切换rabbitmq、activemq、rocketmq实现聊天室单聊、群聊功能

- - SegmentFault 最新的文章
netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否有效,以及处理登录认证的UserAuthHandler和消息处理MessageHandler. //将多个消息转换成单一的消息对象. //支持异步发送大的码流,一般用于发送文件流. //检测链路是否读空闲,配合心跳handler检测channel是否正常.