分布式事务框架seata落地实践

标签: 未分类 前端 | 发表时间:2021-06-22 18:54 | 作者:youdao
出处:http://techblog.youdao.com

前言

seata是阿里巴巴研发的一套开源分布式事务框架,提供了AT、TCC、SAGA 和 XA 几种事务模式。本文以精品课项目组的物流后台服务为例,介绍seata框架落地的过程,遇到的问题以及解决方案。

作者/ 邓新伟

编辑/ 网易有道

有道精品课教务系统是基于springcloud的分布式集群服务。在实际业务中,存在许多分布式事务场景。然而传统的事务框架是无法实现全局事务的。长期以来,我们的分布式场景的一致性,往往指的是放弃强一致性,保证最终一致性。

我们从调研中发现,seata框架既可以满足业务需求,灵活兼容多种事务模式,又可以实现数据强一致性。

本文以 物流业务为例,记录了在实际业务中落地seata框架落地的过程中遇到的一些问题以及解决方案,供大家学习讨论~欢迎大家在留言区讨论交流

1. 基础信息

  • · seata版本:1.4

  • · 微服务框架:springcloud

  • · 注册中心:consul

2.基本框架

2.1 基本组件

seata框架分为3个组件:

  • · TC (Transaction Coordinator) -事务协调者 (即seata-server)

维护全局和分支事务的状态,驱动全局事务提交或回滚。

  • · TM (Transaction Manager) -事务管理器 (在client上,发起事务的服务)

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

  • · RM (Resource Manager) - 资源管理器 (在client)

管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚

2.2. 部署seata-server(TC)

在官网下载 seata 服务端,解压后执行bin/seata-server.sh即可启动。

seata-server 有2个配置文件:registry.conf 与 file.conf。而 registry.conf 文件决定了 seata-server 使用的注册中心配置和配置信息获取方式。

我们使用 consul 做注册中心,因此需要在registry.conf文件中,需要修改以下配置:

代码1

其中需要注意的是,如果需要高可用部署,seata获取配置信息的方式就必须是注册中心,此时file.conf就没用了。

(当然,需要事先把file.conf文件中的配置信息迁移到consul中)

  store {
  ## store mode: file、db、redis
  mode = "db"

... ...
  ## database store property
  ## 如果使用数据库模式,需要配置数据库连接设置
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://***线上数据库地址***/seata"
    user = "******"
    password = "******"
    minConn = 5
    maxConn = 100
    ## 这里的三张表需要提前在数据库建好
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }
... ...
}

service {
  #vgroup->rgroup
  vgroupMapping.tx-seata="seata-server"
  default.grouplist="127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

其中, global_tablebranch_tablelock_table三张表需要提前在数据库中建好。

2.3 配置client端(RM与TM)

每个使用seata框架的服务都需要引入seata组件:

代码3

每个服务都同样需要配置file.conf与registry.conf文件,放在resource目录下。registry.conf与server的保持一致。在file.conf文件中,除了db配置外,还需要进行client参数的配置:

代码4

在application.yml文件中添加seata配置:

代码5

另外,还需要替换项目的数据源,

代码6

至此,client端的配置也已经完成了。

3. 功能演示

一个分布式的全局事务,整体是两阶段提交的模型。

全局事务是由若干分支事务组成的,

分支事务要满足两阶段提交的模型要求,即需要每个分支事务都具备自己的:

  • · 一阶段 prepare 行为

  • · 二阶段commit 或 rollback 行为

根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode **和 TCC (Branch) Transaction Mode.**

3.1 AT模式

AT 模式基于支持本地ACID事务的关系型数据库:

  • · 一阶段 prepare行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
  • · 二阶段 commit 行为:马上成功结束,自动异步批量清理回滚日志。
  • · 二阶段 rollback 行为:通过回滚日志,自动生成补偿操作,完成数据回滚

直接在需要添加全局事务的方法中加上注解@GlobalTransactional

代码7

注意:同@Transactional一样,@GlobalTransactional若要生效也要满足:

  • · 目标函数必须为public类型

  • · 同一类内方法调用时,调用目标函数的方法必须通过springBeanName.method的形式来调用,不能使用this直接调用内部方法

3.2TCC模式

TCC 模式是支持把自定义的分支事务纳入到全局事务的管理中。

  • · 一阶段 prepare 行为:调用自定义的 prepare 逻辑。
  • · 二阶段 commit 行为:调用自定义的 commit 逻辑。
  • · 二阶段 rollback 行为:调用自定义的 rollback 逻辑。

首先编写一个TCC服务接口:

  @LocalTCC
public interface BusinessAction {
    @TwoPhaseBusinessAction(name = "doBusiness", commitMethod = "commit", rollbackMethod = "rollback")
    boolean doBusiness(BusinessActionContext businessActionContext,
                       @BusinessActionContextParameter(paramName = "message") String msg);

    boolean commit(BusinessActionContext businessActionContext);

    boolean rollback(BusinessActionContext businessActionContext);
}

其中,BusinessActionContext为全局事务上下文,可以从此对象中获取全局事务相关信息(如果是发起全局事务方,传入null后自动生成),然后实现该接口:

  
@Slf4j
@Service
public class BusinessActionImpl implements BusinessAction {

    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean doBusiness(BusinessActionContext businessActionContext, String msg) {
        log.info("准备do business:{}",msg);
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {
        log.info("business已经commit");
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {
        log.info("business已经rollback");
        return true;
    }
}

最后,开启全局事务方法同AT模式。

代码10

4. 遇到的问题

4.1 client TM/RM 无法注册到TC

在部署seata项目时常常会遇到这样的问题:在本地调试时一切正常,但是当试图部署到线上时,总是在clinet端提示注册TC端失败。

  • · 这是因为client需要先通过服务发现,找到注册中心里seata-server的服务信息,然后再与seata-server建立连接。不过线上的consul采用了多数据中心模式,在调用consul api时,必须加上dc参数项,否则将无法返回正确的服务信息;然而,seata提供的consul服务发现组件似乎并不支持dc参数的配置。

  • · 还有一个原因也会导致client无法连接到TC:seata的consul客户端在调用服务状态监控api时,使用了wait与index参数,从而使consul查询进入了阻塞查询模式。此时client对consul中要查询的key做监听,只有当key发生变化或者达到最大请求时间时,才会返回结果。貌似由于consul版本的问题,这个阻塞查询并没有监听到key的变化,反而会让服务发现的线程陷入无限等待之中,自然也就无法让client获取到server的注册信息了。

4.2高可用部署

seata服务的高可用部署 只支持注册中心模式。因此,我们需要想办法将file.conf文件以键值对的形式存到consul中。

遗憾的是,consul并没有显式支持namespace,我们只能在put请求中用“/”为分隔符起到类似的效果。当然,seata框架也没有考虑到这一点。所以我们需要修改源码中的Configuration接口与RegistryProvider接口的consul实现类,增加namespace属性

4.3 global_log与branch_log

TC在想mysql插入日志数据时,偶尔会报:

代码11

application_data字段其实就是对业务数据的记录。官方给出的建表语句是这样的:

代码12

显然,VARCHAR(2000)的大小是不合适的, utf8的格式也是不合适的。所以我们需要修改seata关于 数据源连接的部分代码:

  // connectionInitSql设置
    protected Set<string> getConnectionInitSqls(){
        Set</string><string> set = new HashSet<>();
        String connectionInitSqls = CONFIG.getConfig(ConfigurationKeys.STORE_DB_CONNECTION_INIT_SQLS);
        if(StringUtils.isNotEmpty(connectionInitSqls)) {
            String[] strs = connectionInitSqls.split(",");
            for(String s:strs){
                set.add(s);
            }
        }
        // 默认支持utf8mb4
        set.add("set names utf8mb4");
        return set;
    }
</string>

5.自定义开发

5.1利用SPI机制编写自定义组件

seata基于java的spi机制提供了自定义实现接口的功能,我们只需要在自己的服务中,根据seata的接口写好自己的实现类即可。

SPI(Service Provider Interface)是JDK内置的服务发现机制,用在不同模块间通过接口调用服务,避免对具体服务服务接口具体实现类的耦合。比如JDBC的数据库驱动模块,不同数据库连接驱动接口相同但实现类不同,在使用SPI机制以前调用驱动代码需要直接在类里采用Class.forName(具体实现类全名)的方式调用,这样调用方依赖了具体的驱动实现,在替换驱动实现时要修改代码。

ConsulRegistryProvider为例:

  • ConsulRegistryServiceImpl
  // 增加DC和namespace
    private static String NAMESPACE;
    private static String DC;

    private ConsulConfiguration() {
        Config registryCongig = ConfigFactory.parseResources("registry.conf");
        NAMESPACE = registryCongig.getString("config.consul.namespace");
        DC = CommonSeataConfiguration.getDatacenter();
        consulNotifierExecutor = new ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM, Integer.MAX_VALUE,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
                new NamedThreadFactory("consul-config-executor", THREAD_POOL_NUM));
    }
    ... ...
// 同时在getHealthyServices中,删除请求参数wait&index    
    /**
     * get healthy services
     *
     * @param service
     * @return
     */
    private Response<List<HealthService>> getHealthyServices(String service, long index, long watchTimeout) {
        return getConsulClient().getHealthServices(service, HealthServicesRequest.newBuilder()
                .setTag(SERVICE_TAG)
                .setDatacenter(DC)
                .setPassing(true)
                .build());
    }
  • ConsulRegistryProvider 注意order要大于seata包中的默认值1,seata类加载器会优先加载order更大的实现类

代码15

  • 然后在META-INF 的services目录下添加:io.seata.discovery.registry.RegistryProvider

代码16

这样就可以替换seata包中的实现了。

5.2 common-seata工具包

对于这些自定义实现类,以及一些公共client配置,我们可以统一封装到一个工具包下:

图1

这样,其他项目只需要引入这个工具包,就可以无需繁琐的配置,直接使用了。

gradle引入common包:

代码17

6. 落地实例

以一个物流场景为例: 业务架构

  • · logistics-server (物流服务)

  • · logistics-k3c-server (物流-金蝶客户端,封装调用金蝶服务的api)

  • · elasticsearch

业务背景:logistics 执行领用单新增,在 elasticsearch 中更新数据,同时通过 rpc 调用 logistics-k3c 的金蝶出库方法,生成金蝶单据,

图2

问题:如果elasticsearch单据更新出现异常,金蝶单据将无法回滚,造成数据不一致的问题。

在部署完seata线上服务后,只需要在logistics与logistics-k3c中分别引入 common-seata工具包

logistics服务

logicstics 服务

logistics-k3c

由于我们新增单据接口是调用金蝶的服务,所以这里使用TCC模式构建事务接口

  • · 首先创建StaffoutstockCreateAction接口
  @LocalTCC
public interface StaffoutstockCreateAction {
    @TwoPhaseBusinessAction(name = "staffoutstockCreate")
    boolean create(BusinessActionContext businessActionContext,
                       @BusinessActionContextParameter(paramName = "staffOutStock") StaffOutStock staffOutStock,
                       @BusinessActionContextParameter(paramName = "materialNum") List<Triple<Integer, Integer, Integer>> materialNum);

    boolean commit(BusinessActionContext businessActionContext);

    boolean rollback(BusinessActionContext businessActionContext);

}
  • · 接口实现StaffoutstockCreateActionImpl
  @Slf4j
@Service
public class StaffoutstockCreateActionImpl implements StaffoutstockCreateAction {

    @Autowired
    private K3cAction4Staffoutstock k3cAction4Staffoutstock;

    @SneakyThrows
    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean create(BusinessActionContext businessActionContext, StaffOutStock staffOutStock, List<Triple<Integer, Integer, Integer>> materialNum) {
        //金蝶单据新增
        k3cAction4Staffoutstock.staffoutstockAuditPass(staffOutStock, materialNum);
        return true;
    }

    @SneakyThrows
    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {
        Map<String, Object> context = businessActionContext.getActionContext();
        JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock");
        // 如果尝试新增成功,commit不做任何事
        StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class);
        log.info("staffoutstock {} commit successfully!", staffOutStock.id);
        return true;
    }

    @SneakyThrows
    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {
        Map<String, Object> context = businessActionContext.getActionContext();
        JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock");
        StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class);
        // 这里调用金蝶单据删除接口进行回滚
        k3cAction4Staffoutstock.staffoutstockRollback(staffOutStock);
        log.info("staffoutstock {} rollback successfully!", staffOutStock.id);
        return true;
    }
}
  • · 封装为业务方法
  /**
     * 项目组领用&报废的审核通过:新增其他出库单
     * 该方法使用seata-TCC方案实现全局事务
     * @param staffOutStock
     * @param materialNum
     */

    @Transactional
    public void staffoutstockAuditPassWithTranscation(StaffOutStock staffOutStock,
                                                      List<Triple<Integer, Integer, Integer>> materialNum){
        staffoutstockCreateAction.create(null, staffOutStock, materialNum);
    }
  • · k3c API实现类

代码22

这样,一个 基于TCC的全局事务链路就建立起来了。

当全局事务 执行成功时,我们可以在 server 中看到打印的日志:

图3

如果全局事务 执行失败,会进行回滚,此时会执行接口中的rollback,调用金蝶接口删除生成的单据,

图4

7. 总结

本文以seata框架的部署与使用为主线,记录了 seata框架运用的一些 关键步骤与技术细节,并针对项目落地时遇到的一些的技术问题提供了解决方案。

在后续的推文中,我们还将继续以 seata 框架的源码解析为主线,向大家介绍 seata 实现分布式事务的核心原理与技术细节。

相关 [分布 框架 seata] 推荐:

分布式事务框架seata落地实践

- - 有道技术沙龙博客
seata是阿里巴巴研发的一套开源分布式事务框架,提供了AT、TCC、SAGA 和 XA 几种事务模式. 本文以精品课项目组的物流后台服务为例,介绍seata框架落地的过程,遇到的问题以及解决方案. 有道精品课教务系统是基于springcloud的分布式集群服务. 在实际业务中,存在许多分布式事务场景.

Seata-AT 如何保证分布式事务一致性

- - 掘金 后端
作者 | 陈健斌(funkye) github id: a364176773 来源| 阿里巴巴云原生公众号. Seata 是一款开源的分布式事务解决方案,star 高达 18100+,社区活跃度极高,致力于在微服务架构下提供高性能和简单易用的分布式事务服务,本文将剖析 Seata-AT 的实现原理,让用户对 AT 模式有更深入的认识.

Seata分布式事务解决方案详解

- - 掘金 架构
什么是分布式事务,这里就不做解释了,介绍一下下面的常用分布式事务解决方案. Seata分布式事务框架:阿里巴巴2019年开源的分布式事务解决方案. 本文会详细分析AT和TCC原理以及对比,SAGA和XA暂时不在本文讨论中,后续会补上. 提一嘴,Saga不存在并发执行问题,因为Saga本质上是一个责任链模式,在同一个线程上有严格的先后执行驱动顺序.

ShardingSphere x Seata,一致性更强的分布式数据库中间件

- - IT瘾-dev
日前,分布式数据库中间件 ShardingSphere 将Seata 分布式事务能力进行整合,旨在打造一致性更强的分布式数据库中间件. 数据库领域,分布式事务的实现主要包含:两阶段的 XA 和 BASE 柔性事务. XA 事务底层,依赖于具体的数据库厂商对 XA 两阶段提交协议的支持. 通常,XA 协议通过在 Prepare 和 Commit 阶段进行 2PL(2 阶段锁),保证了分布式事务的 ACID,适用于短事务及非云化环境(云化环境下一次 IO 操作大概需要 20ms,两阶段锁会锁住资源长达 40ms,因此热点行上的事务的 TPS 会降到 25/s 左右,非云化环境通常一次 IO 只需几毫秒,因此锁热点数据的时间相对较低).

Spring Cloud Alibaba | 微服务分布式事务之Seata - 极客挖掘机 - 博客园

- -
Spring Cloud Alibaba | 微服务分布式事务之Seata. 本篇实战所使用Spring有关版本:. 在构建微服务的过程中,不管是使用什么框架、组件来构建,都绕不开一个问题,跨服务的业务操作如何保持数据一致性. 首先,设想一个传统的单体应用,无论多少内部调用,最后终归是在同一个数据库上进行操作来完成一向业务操作,如图:.

分布式框架Dubbo

- - Linux - 操作系统 - ITeye博客
互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,Dubbo是一个分布式服务框架,在这种情况下诞生的. 现在核心业务抽取出来,作为独立的服务,使前端应用能更快速和稳定的响应. 大规模服务化之前,应用可能只是通过RMI或Hessian等工具,简单的暴露和引用远程服务,通过配置服务的URL地址进行调用,通过F5等硬件进行负载均衡.

Seata 在蚂蚁国际银行业务的落地实践

- - 掘金 架构
文|李乔(花名:南桥)、李宗杰(花名:白鹰). 李乔:蚂蚁集团高级开发工程师,负责蚂蚁境外银行支付结算系统开发. 李宗杰:蚂蚁集团技术专家,负责蚂蚁分布式事务中间件研发. 本文 11580 字 阅读 25 分钟. 蚂蚁国际境外银行业务正在部分迁移至阿里云,原内部使用的 SOFA 技术栈无法在阿里云上得到支持.

分布式流式处理框架:Storm

- - 标点符
Storm是一个免费开源、分布式、高容错的实时计算系统. 它与其他大数据解决方案的不同之处在于它的处理方式. Hadoop 在本质上是一个批处理系统,数据被引入 Hadoop 文件系统 (HDFS) 并分发到各个节点进行处理. 当处理完成时,结果数据返回到 HDFS 供始发者使用. Hadoop的高吞吐,海量数据处理的能力使得人们可以方便地处理海量数据.

分布式服务框架:Zookeeper

- - 标点符
Zookeeper是一个高性能,分布式的,开源分布式应用协调服务. 它提供了简单原始的功能,分布式应用可以基于它实现更高级的服务,比如同步,配置管理,集群管理,名空间. 它被设计为易于编程,使用文件系统目录树作为数据模型. 服务端跑在java上,提供java和C的客户端API. Zookeeper是Google的Chubby一个开源的实现,是高有效和可靠的协同工作系统,Zookeeper能够用来leader选举,配置信息维护等,在一个分布式的环境中,需要一个Master实例或存储一些配置信息,确保文件写入的一致性等.

分布式应用框架 Dapr

- - IT瘾-dev
微服务架构已成为构建云原生应用程序的标准,微服务架构提供了令人信服的好处,包括可伸缩性,松散的服务耦合和独立部署,但是这种方法的成本很高,需要了解和熟练掌握分布式系统. 为了使用所有开发人员能够使用任何语言和任何框架轻松地构建便携式微服务应用程序,无论是开发新项目还是迁移现有代码. Dapr是一种可移植的,事件驱动的,无服务器运行时,用于构建跨云和边缘的分布式应用程序.