Springcloud + RocketMQ 解决分布式事务

标签: springcloud rocketmq 分布 | 发表时间:2020-03-25 10:47 | 作者:holyplace
出处:https://juejin.im/tag/%E6%9E%B6%E6%9E%84

开篇思考

  1. 为什么要分布式事务?
  2. 分布式事务有哪些实现方式?哪种可靠?
  3. 分布式哪些环节会出问题?出了问题怎么应对?

站在巨人的肩膀观察和思考

随着互联网时代的高速发展,分布式成了大型系统的标配,这是时代发展的选择。大型分布式系统不是每个公司和开发人员都能够涉及的领域,因为大型系统后面都 隐藏着众多代名词:复杂,昂贵,高科技,人才云集,大战略。。。

大部分领头互联网公司甚至依托自己的分布式经验逐步建立自己的体系,并使用这套体系搭建自己的平台对内,甚至对外提供服务, 就像现在众多的云平台提供的服务,甚至有些大战略提出促进发展:大中台小前台、大炮台支援单兵作战等等。

这里提到了中台的概念,这个概念很广,都是以用户为中心的,分布式只是其中的一小部分运用,之所以强行和分布式挂钩,是想说明现在的发展趋势变了, 我们的眼界是有限的,但是完全可以站在巨人的肩膀上,利用他们的高度来提升自己的眼界,来思考,我们到底应该怎么做,怎么做适合 我们自身发展。

我认为思考能力永远是一个程序员魅力所在,善于发现和思考,能够不断的帮助我们提升。

中台齿轮图

微服务架构的优势和问题

优势:

  • 扩展性强:可根据业务需要增加服务,不影响现有的服务架构
  • 单一隔离:服务和服务之前通过远程调用,单个服务只提供单一职责的功能,开发人员可以一人一服务进行开发,互不影响
  • 高可用:服务可以集群部署,单个应用失败通过重试和熔断等措施可以提高稳定性
  • 技术选型灵活:可以根据团队特点、业务需求选择合适的技术栈,提高开发效率
  • 突破性能瓶颈:可以集群方式部署,解决单体访问峰值等处理能力的性能瓶颈
  • 降低运维成本:应对不同的场景,例如双十一,可以动态增减服务集群数量,做到按需付费,减少开支

问题:

  • 复杂度高:整体架构设计十分复杂,需要考虑到整体性、经济性、稳定性
  • 容易混肴服务界限:哪些服务需要划分微服务,哪些可以作为子模块,需要认真思考
  • 难以确保一致性:因为是链路调用,一个请求会经过多个服务,难以确保执行结果达到一致性

CAP & BASE

因为写过这两个理论的具体介绍,这里简单再说明下。还有疑问的可以看介绍 链接

distribute-cap

这两是分布式架构发展至今形成的理论基础,只要分布式都绕不开的原则。CAP 教会我们如何在设计的时候取舍,是要高可用,还是强一致性? 这个看业务的具体情况和需求分析,如果是关于资金的流转的 例如:银行转账系统,肯定会要求保证一致性的场景更多,钱嘛,你懂得,不能多也不能少,少了错了就是大事。
如果只是一些简单的商品查询,可能用户更希望是可用性,也就是我随时查看都可以看到商品,但是商品信息可以在短时间内不一致。

也就是任何的架构设计,都是根据业务场景来分析的。通常 CAP 理论不够完善明确具体实现,这时候就可以用到 BASE 理论,简单的说就是
通过基本可用和软状态,来达到最终一致性的状态。

举个例子:秒杀服务,商品秒杀成功,商品库存 -1,但是订单入库需要调用订单服务,但是我们必须要先执行减库存操作成功后再执行,
这个时候可以先提交,然后发送到可靠消息系统 MQ,MQ 发送到订单服务进行入库操作,如果入库失败则进行重试

这里有个中间状态,库存信息入库而订单服务可能失败或者进行重试状态,这里要么重试成功,最终订单成功入库;要么彻底失败,回滚库存
入库信息改为初始状态。

上面的整体流程基本就是下面介绍的基于 RocketMQ 进行的分布式事务流程。

基于可靠消息(RocketMQ)最终一致性方案

distribute-cap

RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源。
​RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与
producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ 本身提供的存储机制为事务消息提供了
持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。

使用场景分析

流程举例(中国银行 -- 转账 -- 人民银行):

  1. 中国银行要扣款 - 100 ,准备 HalfMsg,消息中携带中国银行 - 100 的信息
  2. 中国银行 HalfMsg 成功发送后,执行数据库本地事务,在自己的库中 - 100
  3. 查看本地事务执行情况,成功则 commit 消息;失败则回滚不发送消息
  4. 人民银行系统订阅了 MQ,MQ 确保消息发送到人民银行系统
  5. 人民银行系统执行本地事务,数据库中 + 100

问题分析:

  1. half 消息没有发送成功:不会进入本地事务执行逻辑,且 MQ 没有消息
  2. 本地事务失败,rollback half 消息:MQ 只有 half 消息,不会被其他服务消费,此时可以回滚清除消息
  3. 本地事务成功,half 消息 commit 失败:MQ 定时查询本地事务状态,本地事务成功则继续 commit MQ 消息
  4. 消费者消费 MQ 消息失败:事务没有执行,或者事务执行失败,基于 ACK 的重试机制重试,知道消费成功,如果还是不行,记录日志持久化,预警查看问题是执行回滚还是更新

如何在项目中使用

以上主要流程都是 RocketMQ 实现,对用户使用来说,用户需要实现的部分是:①本地事务执行 ;②本地事务回查方法
因此代码中的实现只需关注本地事务的执行状态即可。下面贴出具体实现方式

pom 依赖

springcloud pom 的基础上,添加依赖,完整的 springcloud alibaba pom 可以参考
这里

    
 <dependency> <groupId>com.alibaba.cloudgroupId> <artifactId>spring-cloud-starter-stream-rocketmqartifactId> dependency>  
复制代码

模块添加

    
@SpringBootApplication  
@EnableDiscoveryClient  
@EnableFeignClients  
@MapperScan("com.holy.nacosconsumerone.dao")  
@EnableBinding({ MySource.class })  
public class NacosConsumerOneApplication {  
  
 public static void main(String[] args) { SpringApplication.run(NacosConsumerOneApplication.class, args); }}  
复制代码

mysource

  public interface MySource {  
  
 @Output("output1") MessageChannel output1();  
 @Output("output2") MessageChannel output2();  
 @Output("output3") MessageChannel output3();  
 @Output("output4") MessageChannel output4();  
 }```  
  
sendservice   
```java  
/**  
 * @author */@Service  
public class SenderService {  
  
 @Autowired private MySource source;  
 public void send(String msg) throws Exception { source.output1().send(MessageBuilder.withPayload(msg).build()); }  
 public  void sendTransactionalMsg(T msg, int num) throws Exception { MessageBuilder builder = MessageBuilder.withPayload(msg) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON); builder.setHeader("tx-header-num", String.valueOf(num)); builder.setHeader(RocketMQHeaders.TAGS, "binder"); Message message = builder.build(); source.output2().send(message); }  
}  
复制代码

划重点 : MQ 事务监听类实现 RocketMQLocalTransactionListener

  import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;  
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;  
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;  
import org.springframework.messaging.Message;  
  
/**  
 * 对需要使用分布式事务的消息发送接口监听 * 根据事务消息分组来致性 * ①本地事务先执行,根据业务情况执行提交、回滚操作 * ②本地事务回查 * * @author */@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5,  
 maximumPoolSize = 10)public class TransactionListenerImpl implements RocketMQLocalTransactionListener {  
  
 /** * 执行本地事务 * ①事务执行成功,commit * ②事务执行失败,rollback * ③回查发送消息,unknown * * @param msg  
 * @param arg  
 * @return */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { Object num = msg.getHeaders().get("tx-header-num"); try { // 本地业务代码,事务执行 if ("1".equals(num)) { System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown"); return RocketMQLocalTransactionState.UNKNOWN; } else if ("2".equals(num)) { throw new Exception("Exception for RocketMQ rollback"); } System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit"); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { System.out.println(e.getMessage()); System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback"); return RocketMQLocalTransactionState.ROLLBACK; }  
 }  
 /** * 执行本地事务回查,当状态为 UNKNOW 会执行这个方法,回查间隔时间差不多一分钟。 * * 业务代码用来检查事务当前状态,是否执行完成,如果完成就执行 COMMIT * * @param msg  
 * @return */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 检查本地事务 System.out.println("check: " + new String((byte[]) msg.getPayload())); return RocketMQLocalTransactionState.COMMIT; }  
}  
复制代码

controller 测试:

  /**  
 * @author holy */@RestController  
@RequestMapping("/rocketMQ")  
public class RocketMQController {  
  
 @Resource private SenderService senderService;  
 @GetMapping(value = "/transactionMsg") public Object rocketMQTX(int num,String msg) { try { senderService.sendTransactionalMsg(msg,num); } catch (Exception e) { e.printStackTrace(); } return "OK" + num; }  
}  
复制代码

配置文件:

  spring:
    application:
        name: service-consumer
    cloud:
        stream:
          bindings:
              output1:
                  content-type: application/json
                  destination: test-topic
              output2:
                  content-type: application/json
                  destination: TransactionTopic
              output3:
                  content-type: text/plain
                  destination: pull-topic
          rocketmq:
              bindings:
                  output1:
                      producer:
                          group: binder-group
                          sync: true
                  output2:
                      producer:
                          group: myTxProducerGroup
                          transactional: true
                  output3:
                      producer:
                          group: pull-binder-group
              binder:
                name-server: 192.168.244.89:9876
复制代码

完整的代码的可以看我的完整项目: 项目地址

后续思考

本地事务复杂,执行查询时间太久如何处理?

针对不同的情况,其实我们只要认真分析场景,自然可以设计出对应的解决办法。
有些时候我们更希望通过一张事务执行情况表来判断事务的整体实行情况,比如业务比较复杂的时候,需要更新很多的表信息,这时候
使用事务表根据 TransactionID 来记录事务执行情况反而更贴合实际的使用场景。

如果我不是用 rocketMQ ,可以通过其他的 MQ 来实现分布式事务处理吗?

其实这个也是没有问题,大体思路就是封装新服务,专门用来检查事务执行情况,根据事务状态来决定是否发送消息到 MQ。

有好的建议欢迎在下方留言,大家一起讨论一起进步。优秀的评论会被选出并赠送奖励

喜欢文章请关注我

程序领域
程序领域(id:think-holy) 作者:holy 程序汪一只

公众号 赞赏码

相关 [springcloud rocketmq 分布] 推荐:

Springcloud + RocketMQ 解决分布式事务

- - 掘金架构
分布式事务有哪些实现方式. 随着互联网时代的高速发展,分布式成了大型系统的标配,这是时代发展的选择. 大型分布式系统不是每个公司和开发人员都能够涉及的领域,因为大型系统后面都 隐藏着众多代名词:复杂,昂贵,高科技,人才云集,大战略. 大部分领头互联网公司甚至依托自己的分布式经验逐步建立自己的体系,并使用这套体系搭建自己的平台对内,甚至对外提供服务, 就像现在众多的云平台提供的服务,甚至有些大战略提出促进发展:大中台小前台、大炮台支援单兵作战等等.

分布式开放消息系统(RocketMQ)的原理与实践

- - 编程语言 - ITeye博客
备注:1.如果您此前未接触过RocketMQ,请先阅读附录部分,以便了解RocketMQ的整体架构和相关术语2.文中的MQServer与Broker表示同一概念.   分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点. 而谈到消息系统的设计,就回避不了两个问题:  .

RocketMQ与Kafka对比(18项差异)

- - 开源软件 - ITeye博客
为了方便大家选型,整理一份RocketMQ与Kafka的对比文档,文中如有错误之处,欢迎来函指正. RocketMQ支持异步实时刷盘,同步刷盘,同步Replication,异步Replication. Kafka使用异步刷盘方式,异步Replication. 总结:RocketMQ的同步刷盘在单机可靠性上比Kafka更高,不会因为操作系统Crash,导致数据丢失.

基于springcloud实现的灰度发布

- -
基于springcloud实现的灰度发布. gray-config-server 配置中心. 端口:6007,方便起见直接读取配置文件,生产环境可以读取git. 先启动配置中心,所有服务的配置(包括注册中心的地址)均从配置中心读取. gray-xxx-service 服务消费者. 调用服务提供者和服务提供者,验证是否进入灰度服务.

SpringCloud项目接入Jaeger(下) - 掘金

- -
spring-cloud-sleuth这个组件时,会面临两个问题. 首先是日志中无法显示traceId和spanId这些链路信息,其次是不能在用. spring-cloud-sleuth所提供的方式进行链路传值. spring-cloud-sleuth是将traceId等链路信息保存在. slf4j的MDC(Mapped Diagnostic Contexts)中,然后通过%X{traceId}这种方式将traceId提取出来,比如打印到控制台的默认格式是:.

SpringCloud Gateway与k8s_zhangjunli的博客-CSDN博客

- -
接下来的内容由以下几部分组成:. 什么是SpringCloud Gateway. SpringCloud Gateway实战参考. kubernetes上的SpringCloud Gateway. 开发k8sgatewaydemo. 什么是SpringCloud Gateway. SpringCloud Gateway是SpringCloud技术栈下的网关服务框架,在基于SpringCloud的微服务环境中,外部请求会到达SpringCloud Gateway应用,该应用对请求做转发、过滤、鉴权、熔断等前置操作,一个典型的请求响应流程如下所示:.

国信证券 RocketMQ + Spring Boot 在配置热加载的实践

- - IT瘾-dev
在进行微服务架构研发的过程中,不少研发人员都提出配置热加载的需求,一方面是为了提升研发效率,开发人员修改配置不需要重启服务;另一方面是为了线上紧急事件,可以快速恢复,例如线上数据库 down 了,可以紧急启动降级开关,通过配置热加载动态生效,降低处理事件的时间. 于是我采用 rocketmq 消息队列,实现一套解耦的配置热加载机制.

RocketMQ 在使用上的一些排坑和优化

- - 掘金架构
RocketMQ 在我们的项目中使用非常广泛,在使用的过程中,也遇到了很多的问题. 比如没有多环境的隔离,在多个版本同时开发送测的情况下,互相干扰严重. RocketMQ 的投递可能会失败,导致丢失消息. 另外开源版本的 RocketMQ 不支持任意时间精度的延时消息,仅支持特定的 level. 在使用的过程中,我们做了一些针对性的优化,整理出了这篇文章.

消息幂等(去重)通用解决方案,RocketMQ

- - 薛定谔的风口猪
消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值. 我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”.

将应用从 SpringCloud 迁移到 k8s - Rason's Blog

- -
最近花了几天时间看了一下 k8s 和 istio 的文档,对容器化运维以及服务网格有了基础的了解. 俗话说读万卷书不如行万里路,于是先尝试用 minikube 练一下手,将现有了一个 Spring Cloud 项目迁移到 k8s 上来. 粗略地整理了一个整个流程,主要有以下几个改动点:. 安装 kubectl 和 minikube.