SpringCloudAlibaba-Seata分布式事务

一、什么是Seata

  Seata是一款开源的分布式事务解决方案,提供分布式框架的分布式事务处理,为用户提供了 AT,TCC,SAGA和XA事务模式,为用户打造一站式的分布式事务解决方案

  在了解分布式事务之前我们要先了解一个协议,就是2PC 两阶段提交协议,目前市面上面的多种分布式解决方案也都是采用了这种协议,那什么是 2PC 两阶段提交协议呢。

  2PC 就是将事务的提交分为两个阶段:perpare(预处理)、commit(提交)两个阶段。

  在分布式事务中会先由事务协调者进行预处理阶段,就是先向事务的参与者发出通知,事务的参与者接到通知后会进行预处理判断是否满足提交事务的条件,预处理后会返回给事务协调者一个应答。当所有的参与者都处理完成并回复应答就会进行下一个阶段,也就是提交或回滚的阶段

图片加载失败

二、Seata的三大角色

  1. TC(Transaction coordinator) - 事务协调者

    维护全局和分支事务的状态,驱动全局事务提交或回滚

  2. TM(Transaction Manager) - 事务管理器

    定义全局事务的范围:开始、提交、回滚全局事务

  3. RM(Resource Manager) - 资源管理器

    管理分支实物处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚

其中,TC 为单独部署的 Server 服务端,TM和RM为嵌入到应用中的 Client 客户端

三、AT模式与TCC模式

1、AT模式(auto transcation)

  AT 模式是一种无入侵的分布式事务解决方案,阿里Seata框架实现了该模式。

  在 AT 模式下,用户只需要关注自己的 “业务SQL”,用户的“业务SQL”作为第一阶段,Seata框架会自动生成事务的二阶段提交和回滚操作。

1.一阶段

  在一阶段,Seata会拦截“业务SQL”,首先解析 SQL 语义,找到“业务SQL”要更新的业务数据,在业务数据被更新前,将其保存成一个“before image”,然后执行“业务SQL”更新业务数据,在业务数据被更新后,再将其保存成“after image”,最后生成一个行锁,避免其他业务操作读取到这条数据。以上操作全部在一个数据库事务中完成,这样保证了一阶段提交原子性。

  这就是一阶段提交,可以理解为在一阶段就已经将我们想要执行的SQL执行了,不过是同时保存了SQL执行前后的数据快照以及行锁。执行前后的快照数据就是保证数据的回滚、提交。行锁则是因为我们的SQL已经执行了,但是整个分布式事务还未完成,防止其他事务读取。

图片加载失败

2.二阶段提交

  二阶段如果是提交的话,因为“业务SQL”在一阶段已经提交至数据库,所以 Seata 框架只需要将一阶段保存的快照数据和行锁删除掉,就完成数据清理即可。

图片加载失败

3.二阶段回滚

  二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务SQL”,还原业务数据。回滚的方式就是使用一阶段中创建的“before image”快照数据来还原业务数据;但是还原之前要先校验脏写,对比“数据库当前业务数据”与“after image”,如果两份数据完全一致就证明没有脏写,可以还原业务数据,如果不一致就证明又脏写,出现脏写就需要转人工处理了。

  其实脏写基本上是不可能发生的,因为在一阶段中执行“业务SQL“后已经进行了行锁的操作。

图片加载失败

2、TCC模式(Try Confirm Cancel)

1.二阶段提交处理

  TCC 模式需要用户根据自己的业务场景实现Try、Confirm 和 Cancel 三个操作;事务发起方在第一阶段执行 Try 方式,在第二阶段提交执行 Confirm 方法,二阶段回滚执行 Cancel 方法。

  就是 TCC 模式就是在第一阶段会通知所有事务的参与者尝试执行,准备所需资源。第二阶段执行业务修改数据库,发生错误进行回滚,恢复已修改数据库。

  例如:支付订单与减库存的业务场景

  • 一阶段:将订单的状态修改为 “支付中”,库存服务记录需要减少的库存数量
  • 二阶段:
    • 提交:将订单的状态修改为 “已支付”,库存服务则减少相应的库存数量
    • 取消:如果在一阶段或者二阶段提交的业务处理中发生错误,则进行回滚操作,也就是将订单状态恢复,将已经减少的库存加回去或者清空一阶段中记录的需要减少的库存数量。

图片加载失败

四、Seata服务端搭建

  上面提到过 Seata 共有三个角色:TC、TM、RM,TC(Server端)为单独的服务部署,TM和RM由业务系统服务集成

Seata-server下载地址:这里可以根据框架使用的CloudAlibaba版本对应选择适配的版本下载使用

1、配置Seata-Server

  1. 先将我们下载好的Seata-Server解压,解压后对应的目录如下

    图片加载失败

  2. 修改服务配置文件

    • 我们进入到seata/conf/目录下,可以看到application.yml,这个文件名相信大家都非常熟悉。
    • 打开application.example.yml我们可以看到相应的示例配置,我们可以复制里面我们需要的配置使用
  3. 配置中心和注册中心配置框架对应的即可,我这边都是选择的nacos

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    server:
    port: 7091

    spring:
    application:
    name: seata-server

    logging:
    config: classpath:logback-spring.xml
    file:
    path: ${user.home}/logs/seata
    extend:
    logstash-appender:
    destination: 127.0.0.1:4560
    kafka-appender:
    bootstrap-servers: 127.0.0.1:9092
    topic: logback_to_logstash

    console:
    user:
    username: seata
    password: seata

    seata:
    config:
    # support: nacos, consul, apollo, zk, etcd3
    config:
    # support: nacos 、 consul 、 apollo 、 zk 、 etcd3
    type: nacos
    nacos:
    server-addr: 192.168.0.107:8848
    namespace:
    group: SEATA_GROUP
    username: nacos
    password: nacos
    ##if use MSE Nacos with auth, mutex with username/password attribute
    #access-key: ""
    #secret-key: ""
    data-id: seataServer.properties
    registry:
    # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    type: nacos
    preferred-networks: 30.240.*
    nacos:
    application: seata-server
    server-addr: 192.168.0.107:8848
    group: SEATA_GROUP
    namespace:
    cluster: default
    username: nacos
    password: nacos
    ##if use MSE Nacos with auth, mutex with username/password attribute
    #access-key: ""
    #secret-key: ""
    store:
    # support: file 、 db 、 redis
    mode: db
    session:
    mode: db
    lock:
    mode: db
    db:
    datasource: druid
    db-type: mysql
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://数据库ip:3306/seata-server??useUnicode=true&rewriteBatchedStatements=true&serverTimezone=GMT%2B8
    user: root
    password: root
    min-conn: 5
    max-conn: 100
    global-table: global_table
    branch-table: branch_table
    lock-table: lock_table
    distributed-lock-table: distributed_lock
    query-limit: 100
    max-wait: 5000
    # server:
    # service-port: 8091 #If not configured, the default is '${server.port} + 1000'
    security:
    secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
    tokenValidityInMilliseconds: 1800000
    ignore:
    urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login

  4. Server端存储模式支持三种:

    • file:(默认)单机模式,全局事务会话信息内存中读写并持久化到本地文件root.data,性能较高(默认)
    • db:高可用模式,全局事务会话信息通过db共享,性能差些
    • redis:Seata-Server1.3及以上版本支持,性能较高,但是存在事务信息丢失的风险,请提前配置适合当前场景的redis持久化配置
  5. 我这边使用的存储模式是db,所以还要在数据库总创建相应的数据库,对应的sql脚本我们也可以在我们的服务包中找到,目录为:/seata/script/server/db

  6. 修改配置中心的配置文件,目录为:/seata/conf/下的conf.txt文件,将其中的存储模式修改为和我们之前配置的yml一致。还有一点就是conf.txt配置文件中有一个项配置service.vgroupMapping.default_tx_group=default

  7. 将配置中心的配置文件加载到nacos中需要执行/seata/script/config-center/nacos/目录下的nacos-config.sh

    这里需要注意,执行这个脚本默认的nacos地址是本机。可以根据参数进行指定

    1
    sh xxxxx/script/config-center/nacos/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca -u username -w password

    也可以执行nacos-config-interactive.sh这个脚本手动输入。

  8. 启动成功后可以在 Nacos 的服务列表中看到我们的服务已经注册进去了。

2、在应用服务中集成seata-client

Seata-server 服务端已经启动成功,接下来只需要在我们的微服务中集成 Seata-client 就可以使用分布式事务了。

  1. 引入依赖

    1
    2
    3
    4
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    </dependency>
  2. 配置yml配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    seata:
    # 事务服务分组
    tx-service-group: default_tx_group
    # 注册中心配置
    registry:
    type: nacos
    nacos:
    application: seata-server
    group: SEATA_GROUP
    server-addr: 192.168.0.107:5000/nacos/
    username: nacos
    password: nacos
    # 配置中心配置
    config:
    type: nacos
    nacos:
    application: seata-server
    group: SEATA_GROUP
    server-addr: 192.168.0.107:5000/nacos/
    username: nacos
    password: nacos

    到这里 Seata 的服务端以及项目中集成的配置就都完成了,接下来我们就可以使用了。

五、Seata 分布式事务示例

  要使用 Seata 的分布式事务只需要在想要使用的地方使用 @GlobalTransactional() 注解即可,至于这个注解都可以加入什么参数自己点进去在结合一下数据库中的数据就明白了。

下面只展示 Seata 的用法示例,至于其余配置就不在这里废话了。

  • 创建需要用的表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    create table `order`
    (
    id int auto_increment
    primary key,
    proid int null comment '商品id',
    total_money int null comment '总金额',
    status int null comment '0:待付款,1:已付款'
    );

    create table stock
    (
    id int auto_increment
    primary key,
    proid int null,
    count int null
    );
  • 创建 undo_log表(SEATA AT 模式需要 UNDO_LOG 表,在事务参与者也就是我们的应用服务连接的库中创建)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    -- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
    CREATE TABLE `undo_log` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `branch_id` bigint(20) NOT NULL,
    `xid` varchar(100) NOT NULL,
    `context` varchar(128) NOT NULL,
    `rollback_info` longblob NOT NULL,
    `log_status` int(11) NOT NULL,
    `log_created` datetime NOT NULL,
    `log_modified` datetime NOT NULL,
    `ext` varchar(100) DEFAULT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
  • 创建订单实体类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Data
    @TableName("`order`")
    public class Order {

    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;

    @TableField("proid")
    private Integer proid;

    @TableField("total_money")
    private Integer totalMoney;

    @TableField("status")
    private Integer status;
    }
  • 创建库存实体类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Data
    @TableName("stock")
    public class Stock {

    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;

    @TableField("proid")
    private Integer proid;

    @TableField("count")
    private Integer count;
    }
  • 因为是示例,而且用的是mybatisPlus,所以 searvice 接口、实现类和 mapper.xml 数据库映射文件就不创建了。

  • 创建下订单接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @GlobalTransactional()
    @GetMapping("/addOrder")
    public String addOrder() {
    Order order = new Order();
    order.setProid(11);
    order.setTotalMoney(1000);
    order.setStatus(0);
    Integer orderResult = orderMapper.insert(order);
    if (orderResult > 0) {
    log.info("----》订单插入成功");
    }
    String stockResult = stockService.rdStock(11, 1);
    // 模拟报错,实现事务回滚
    int i = 1 / 0;
    return stockResult;
    }
  • 创建扣减库存接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @GetMapping("/rdStock")
    public String rdStock(@RequestParam("proid") Integer proid, @RequestParam("num") Integer num) {
    Stock stock = stockMapper.selectOne(new LambdaQueryWrapper<>(Stock.class).eq(Stock::getProid, proid));
    System.out.println(stock);
    stock.setCount(stock.getCount() - num);
    Integer stockResult = stockMapper.updateById(stock);
    if (stockResult > 0) {
    return "库存更新成功";
    }
    return "库存更新失败";
    }
  • 创建 order 订单服务调用扣减库存接口的 Fegin 接口

    1
    2
    3
    4
    5
    @FeignClient(name = "stock", path = "/stock", fallback = StockSentinelFallback.class)
    public interface Stock {
    @GetMapping("/rdStock")
    public String rdStock(@RequestParam("proid") Integer proid, @RequestParam("num") Integer num);
    }

到这里就可以开始测试了:

  1. 首先我们可以先不使用 @GlobalTransactional() 而是使用 @Transactional() 本地事务来测试,你会发现库存是扣减了,但是订单并没有成功插入

  2. 再换成 @GlobalTransactional() 分布式事务来测试,你会发现 分布式事务 已经成功了。

  3. 接下来可以在订单接口中打断点 debug 一下,查看 seata-server 的数据库中的数据,主要查看一下三张表

    • global_table(全局事务表,当你调用接口的那一刻就在这个表中创建对应的记录, @GlobalTransactional() 会生成一个 xid 与之绑定)
    • branch_table(分支事务表,这个是当你服务中的事务参与者加入事务的时候就会在这张表创建对应的数据,就是,你新增了一个订单,后面的逻辑还没有执行,这个表里面就会有对应 订单这个事务参与者的信息)
    • lock_table(数据锁表,这张表主要记录了添加了行锁的表,在这里面你会看到一个 pk 的字段,这就是对应的主键)
  4. 当然还有 undo_log 表也要看一下

      undo_log表会根据事务参与者创建相应的数据,例如我们的事务参与者是 order 的新增,和 stock 的更新,那么对应的就会创建两条undo_log日志,我们可以查看 rollback_info 字段的值,你会发现记录了对应的表的 before_image 和 after_image 信息,那么当我们的程序报错 事务回滚的时候就会根据这个信息进行逆向执行。

  5. 如果我们的代码没有问题,事务成功提交后,seata-server中的数据以及undo_log的数据都会被删除掉

问题记录

  1. 启动seata服务端的时候可以是用-h 指定本机ip或者可以设置服务器本机,不然的话注册到nacos中会是 127.0.0.1,或者17.xx.xx.xx这种导致服务连接seata-server不成功