tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。 此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。 因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。 出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
version: "3.1" services: seata-server: image: seataio/seata-server:latest hostname: seata-server ports: - "7091:7091" - "8091:8091" environment: - SEATA_PORT=8091 - STORE_MODE=file http://localhost:7091/#/Overview default username and password is admin/admin
订单服务调用库存服务和账户余额服务进行相应的扣减,并且最终生成订单
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>seata</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>seata-order</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.2</version> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-http</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.8</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project> package com.et.seata.order.controller; import com.et.seata.order.service.OrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.io.IOException; import java.util.HashMap; import java.util.Map; @RestController public class HelloWorldController { @Autowired private OrderService orderService; @PostMapping("/create") public Map<String, Object> createOrder(@RequestParam("userId") Long userId, @RequestParam("productId") Long productId, @RequestParam("price") Integer price) throws IOException { Map<String, Object> map = new HashMap<>(); map.put("msg", "HelloWorld"); map.put("reuslt", orderService.createOrder(userId,productId,price)); return map; } } package com.et.seata.order.service; import com.alibaba.fastjson.JSONObject; import com.et.seata.order.dao.OrderDao; import com.et.seata.order.dto.OrderDO; import io.seata.core.context.RootContext; import io.seata.integration.http.DefaultHttpExecutor; import io.seata.spring.annotation.GlobalTransactional; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; /** * @author liuhaihua * @version 1.0 * @ClassName OrderServiceImpl * @Description todo * @date 2024/08/08/ 13:53 */ @Slf4j @Service public class OrderServiceImpl implements OrderService{ @Autowired OrderDao orderDao; @Override @GlobalTransactional // <1> public Integer createOrder(Long userId, Long productId, Integer price) throws IOException { Integer amount = 1; // 购买数量,暂时设置为 1。 log.info("[createOrder] 当前 XID: {}", RootContext.getXID()); // <2> 扣减库存 this.reduceStock(productId, amount); // <3> 扣减余额 this.reduceBalance(userId, price); // <4> 保存订单 log.info("[createOrder] 保存订单"); return this.saveOrder(userId,productId,price,amount); } private Integer saveOrder(Long userId, Long productId, Integer price,Integer amount){ // <4> 保存订单 OrderDO order = new OrderDO(); order.setUserId(userId); order.setProductId(productId); order.setPayAmount(amount * price); orderDao.saveOrder(order); log.info("[createOrder] 保存订单: {}", order.getId()); return order.getId(); } private void reduceStock(Long productId, Integer amount) throws IOException { // 参数拼接 JSONObject params = new JSONObject().fluentPut("productId", String.valueOf(productId)) .fluentPut("amount", String.valueOf(amount)); // 执行调用 HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8082", "/stock", params, HttpResponse.class); // 解析结果 Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity())); if (!success) { throw new RuntimeException("扣除库存失败"); } } private void reduceBalance(Long userId, Integer price) throws IOException { // 参数拼接 JSONObject params = new JSONObject().fluentPut("userId", String.valueOf(userId)) .fluentPut("price", String.valueOf(price)); // 执行调用 HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8083", "/balance", params, HttpResponse.class); // 解析结果 Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity())); if (!success) { throw new RuntimeException("扣除余额失败"); } } } server: port: 8081 # 端口 spring: application: name: order-service datasource: url: jdbc:mysql://127.0.0.1:3306/seata_order?useSSL=false&useUnicode=true&characterEncoding=UTF-8 driver-class-name: com.mysql.jdbc.Driver username: root password: 123456 # Seata 配置项,对应 SeataProperties 类 seata: application-id: ${spring.application.name} # Seata 应用编号,默认为 ${spring.application.name} tx-service-group: ${spring.application.name}-group # Seata 事务组编号,用于 TC 集群名 # 服务配置项,对应 ServiceProperties 类 service: # 虚拟组和分组的映射 vgroup-mapping: order-service-group: default # 分组和 Seata 服务的映射 grouplist: default: 127.0.0.1:8091 package com.et.seata.product.controller; import com.et.seata.product.dto.ProductReduceStockDTO; import com.et.seata.product.service.ProductService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @RestController @Slf4j public class ProductController { @Autowired ProductService productService; @PostMapping("/stock") public Boolean reduceStock(@RequestBody ProductReduceStockDTO productReduceStockDTO) { log.info("[reduceStock] 收到减少库存请求, 商品:{}, 价格:{}", productReduceStockDTO.getProductId(), productReduceStockDTO.getAmount()); try { productService.reduceStock(productReduceStockDTO.getProductId(), productReduceStockDTO.getAmount()); // 正常扣除库存,返回 true return true; } catch (Exception e) { // 失败扣除库存,返回 false return false; } } } package com.et.seata.product.service; import com.et.seata.product.dao.ProductDao; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service @Slf4j public class ProductServiceImpl implements ProductService { @Autowired private ProductDao productDao; @Override @Transactional // <1> 开启新事物 public void reduceStock(Long productId, Integer amount) throws Exception { log.info("[reduceStock] 当前 XID: {}", RootContext.getXID()); // <2> 检查库存 checkStock(productId, amount); log.info("[reduceStock] 开始扣减 {} 库存", productId); // <3> 扣减库存 int updateCount = productDao.reduceStock(productId, amount); // 扣除成功 if (updateCount == 0) { log.warn("[reduceStock] 扣除 {} 库存失败", productId); throw new Exception("库存不足"); } // 扣除失败 log.info("[reduceStock] 扣除 {} 库存成功", productId); } private void checkStock(Long productId, Integer requiredAmount) throws Exception { log.info("[checkStock] 检查 {} 库存", productId); Integer stock = productDao.getStock(productId); if (stock < requiredAmount) { log.warn("[checkStock] {} 库存不足,当前库存: {}", productId, stock); throw new Exception("库存不足"); } } } package com.et.seata.product.dao; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update; import org.springframework.stereotype.Repository; @Mapper @Repository public interface ProductDao { /** * 获取库存 * * @param productId 商品编号 * @return 库存 */ @Select("SELECT stock FROM product WHERE id = #{productId}") Integer getStock(@Param("productId") Long productId); /** * 扣减库存 * * @param productId 商品编号 * @param amount 扣减数量 * @return 影响记录行数 */ @Update("UPDATE product SET stock = stock - #{amount} WHERE id = #{productId} AND stock >= #{amount}") int reduceStock(@Param("productId") Long productId, @Param("amount") Integer amount); } package com.et.seata.balance.controller; import com.et.seata.balance.dto.AccountReduceBalanceDTO; import com.et.seata.balance.service.AccountService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; @RestController @Slf4j public class AccountController { @Autowired private AccountService accountService; @PostMapping("/balance") public Boolean reduceBalance(@RequestBody AccountReduceBalanceDTO accountReduceBalanceDTO) { log.info("[reduceBalance] 收到减少余额请求, 用户:{}, 金额:{}", accountReduceBalanceDTO.getUserId(), accountReduceBalanceDTO.getPrice()); try { accountService.reduceBalance(accountReduceBalanceDTO.getUserId(), accountReduceBalanceDTO.getPrice()); // 正常扣除余额,返回 true return true; } catch (Exception e) { // 失败扣除余额,返回 false return false; } } } package com.et.seata.balance.service; import com.et.seata.balance.dao.AccountDao; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @Service @Slf4j public class AccountServiceImpl implements AccountService { @Autowired private AccountDao accountDao; @Override @Transactional(propagation = Propagation.REQUIRES_NEW) // <1> 开启新事物 public void reduceBalance(Long userId, Integer price) throws Exception { log.info("[reduceBalance] 当前 XID: {}", RootContext.getXID()); // <2> 检查余额 checkBalance(userId, price); log.info("[reduceBalance] 开始扣减用户 {} 余额", userId); // <3> 扣除余额 int updateCount = accountDao.reduceBalance(price); // 扣除成功 if (updateCount == 0) { log.warn("[reduceBalance] 扣除用户 {} 余额失败", userId); throw new Exception("余额不足"); } log.info("[reduceBalance] 扣除用户 {} 余额成功", userId); } private void checkBalance(Long userId, Integer price) throws Exception { log.info("[checkBalance] 检查用户 {} 余额", userId); Integer balance = accountDao.getBalance(userId); if (balance < price) { log.warn("[checkBalance] 用户 {} 余额不足,当前余额:{}", userId, balance); throw new Exception("余额不足"); } } } package com.et.seata.balance.dao; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update; import org.springframework.stereotype.Repository; @Mapper @Repository public interface AccountDao { /** * 获取账户余额 * * @param userId 用户 ID * @return 账户余额 */ @Select("SELECT balance FROM account WHERE id = #{userId}") Integer getBalance(@Param("userId") Long userId); /** * 扣减余额 * * @param price 需要扣减的数目 * @return 影响记录行数 */ @Update("UPDATE account SET balance = balance - #{price} WHERE id = 1 AND balance >= ${price}") int reduceBalance(@Param("price") Integer price); }
可以看到控制台输出回滚日志 2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=172.22.0.3:8091:27573281007513609,branchId=27573281007513610,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata_storage,applicationData=null 2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.22.0.3:8091:27573281007513609 27573281007513610 jdbc:mysql://127.0.0.1:3306/seata_storage 2024-08-08 22:00:59.503 INFO 35051 --- [tch_RMROLE_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.22.0.3:8091:27573281007513609 branch 27573281007513610, undo_log deleted with GlobalFinished 2024-08-08 22:00:59.511 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked