当前位置: 代码迷 >> 综合 >> Seata AT模式的整体流程
  详细解决方案

Seata AT模式的整体流程

热度:68   发布时间:2024-03-10 01:27:27.0

AT模式的整体流程

at模式的整体流程

先从官网借一张图,简单描述AT模式的角色和流程
Seata 目前有四种模式,都是基于 TC(事物协调者),TM(事物管理器),RM(资源管理器) 这三个角色完成的

  1. 事务管理器发起全局事物
  2. 通过RPC调用微服务A
  3. 微服务A开启事物, 通过解析SQL,通过Druid数据源的API驱动快照的生成 。首先查询数据库获得当前数据的快照(前置镜像), 执行数据操作(更新,删除,插入), 查询数据库获得操作执行后的快照(后置镜像)
  4. 微服务A向TC发起分支事物注册, 执行回滚日志的插入, 将当前事物提交
  5. 微服务B 和微服务A一样,执行2-5的流程
  6. 如果微服务调用的过程中没有报错, 由事物管理器发起全局事物提交, 否则发起全局事物回滚
  7. 如果是成功:TC向微服务A和B发送分支事物提交请求, 将提交请求通过offer的方式插入到阻塞队列。有 AsyncWorker 异步批量处理事务的提交, AsyncWorker 异步批量删除回滚日志
  8. 如果是失败:TC向微服务A和B发送分支事物回滚请求。 根据 全局事物ID(XID)和分支事物ID(BranchId)查询回滚日志表(undo_log)获得回滚日志。 如果回滚日志存在:将后置镜像与当前数据对比,如果数据一致表示可以回滚(没有发生脏写),通过回滚日志的前置镜像生成回滚SQL, 执行数据回滚,然后删除回滚日志。 如果回滚日志不存在:插入一条状态为全局事物已完成(数据库的值是: 1 )的回滚日志, 避免另一个线程提交成功
  9. 如果失败回滚成功,向TC响应回滚成功, 如果回滚失败,向TC响应回滚失败并重试,TC会重试发起分支回滚请求(seata服务端有一个定时器,一秒调用一次,会遍历所有需要回滚的会话发起分支回滚请求)

AT模式的分支事物

一阶段提交

AT模式的一阶段流程由 数据源代理+SQL识别器 的方式实现

首先回忆jdbc的执行流程

	//通过数据源获取连接Connection connection = dataSource.getConnection();// 获得 声明PrepareStatement pst = connection.prepareStatement();// 执行SQL语句pst.executeUpdate();// 提交事务connection.commit();

AT模式对 DataSource,Connection,Statement 都做了代理

  1. dataSource 被DataSourceProxy代理, dataSource.getConnection 获得的对象是 ConnectionProxy 对象, connection.prepareStatement 获得的是 PreparedStatementProxy 对象
  2. prepareStatement.executeUpdate() 做了特殊了处理, 通过Duird数据源提供的API创建Seata的SQL识别器,SQL识别器提供了识别SQL语句的功能,用于支持Executor创建前置镜像,后置镜像。
  3. executor 构建前置镜像, 执行业务SQL,构建后置镜像, 通过前置镜像和后置镜像,XID等数据构建回滚日志对象,添加到ConnectionProxy的上下文
  4. connectionProxy.commit, 注册分支事物, 根据connectionProxy的上下文对象将回滚日志生成SQL,执行回滚日志SQL,真实连接提交,如果配置了一阶段提交报告(client.rm.reportSuccessEnable=true,默认是false),则向TC发送一阶段提交完成的请求

prepareStatement.executeUpdate

 public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,StatementProxy<S> statementProxy,StatementCallback<T, S> statementCallback,Object... args) throws SQLException {
    // 不需要全局锁或者不是分支模式,执行原始的 statement.executeif (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
    // Just work as original statementreturn statementCallback.execute(statementProxy.getTargetStatement(), args);}String dbType = statementProxy.getConnectionProxy().getDbType();// 通过SQL访问者工厂创建SQL识别器, Seata在1.3.0的识别器是面向Druid编程if (CollectionUtils.isEmpty(sqlRecognizers)) {
    sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(),dbType);}// 执行器Executor<T> executor;if (CollectionUtils.isEmpty(sqlRecognizers)) {
    executor = new PlainExecutor<>(statementProxy, statementCallback);} else {
    if (sqlRecognizers.size() == 1) {
    SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);switch (sqlRecognizer.getSQLType()) {
    case INSERT:// 1.3.0 支持Mysql,Oracle,PGSql 的插入执行器executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,new Class[]{
    StatementProxy.class, StatementCallback.class, SQLRecognizer.class},new Object[]{
    statementProxy, statementCallback, sqlRecognizer});break;case UPDATE:executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case DELETE:executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case SELECT_FOR_UPDATE:executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;default:executor = new PlainExecutor<>(statementProxy, statementCallback);break;}} else {
    executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);}}T rs;try {
    // 执行器去执行rs = executor.execute(args);} catch (Throwable ex) {
    if (!(ex instanceof SQLException)) {
    // Turn other exception into SQLExceptionex = new SQLException(ex);}throw (SQLException) ex;}return rs;}

executor 的执行

 @Overridepublic T execute(Object... args) throws Throwable {
    String xid = RootContext.getXID();// 绑定全局事物ID到代理连接if (xid != null) {
    statementProxy.getConnectionProxy().bind(xid);}// 设置全局锁的状态statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());// 执行return doExecute(args);}@Overridepublic T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();// 判断当前连接是否开启了自动提交, 这里看executeAutoCommitFalse的部分。 // 开启自动提交的部分关掉自动提交,然后调用了下面的部分,然后恢复自动提交为trueif (connectionProxy.getAutoCommit()) {
    return executeAutoCommitTrue(args);} else {
    return executeAutoCommitFalse(args);}}// 执行自动提交protected T executeAutoCommitFalse(Object[] args) throws Exception {
    if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && getTableMeta().getPrimaryKeyOnlyName().size() > 1) {
    throw new NotSupportYetException("multi pk only support mysql!");}// 抽象方法, 子类Mysql,Oracle,PGSql 会知道如何构建前置镜像TableRecords beforeImage = beforeImage();// 执行业务SQLT result = statementCallback.execute(statementProxy.getTargetStatement(), args);// 通过前置镜像构建后置镜像TableRecords afterImage = afterImage(beforeImage);// 通过前置镜像和后置镜像生成回滚日志,插入到代理连接的上下文prepareUndoLog(beforeImage, afterImage);return result;}protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
    // 如果前置镜像为空,并且后置镜像也是空,就不用构建回滚日志了if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
    return;}ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;String lockKeys = buildLockKey(lockKeyRecords);// 添加lockKeyconnectionProxy.appendLockKey(lockKeys);// 构建回滚日志SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);// 将回滚日志添加到代理连接的上下文中connectionProxy.appendUndoLog(sqlUndoLog);}

代理连接提交

 private void processGlobalTransactionCommit() throws SQLException {
    try {
    // 注册分支事物register();} catch (TransactionException e) {
    recognizeLockKeyConflictException(e, context.buildLockKeys());}try {
    // 插入回滚日志UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);// 真实连接提交targetConnection.commit();} catch (Throwable ex) {
    LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);report(false);throw new SQLException(ex);}// 是否报告一阶段提交完成,默认为falseif (IS_REPORT_SUCCESS_ENABLE) {
    report(true);}context.reset();}

二阶段提交

AT模式的资源管理器(RMHandlerAT) 接受事物协调者(TC)的分支提交请求

  1. 由资源管理器(RMHandlerAT)执行分支提交请求
  2. AT模式的资源管理器内部由异步工作器(asyncWorker)执行, 将请求用非阻塞(offer)的方式插入到blockingQueue中
  3. asyncWorker内部有一个定时器, 1秒钟执行一次(在上次执行完之后)。 定时器不停的用非阻塞的(poll)方式从阻塞队列中获取数据,然后批量删除回滚日志

数据源管理器的分支事物提交

	// dataSourceManager 的 branchCommit@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {
    // 由异步工作器代理,执行分支提交return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);}// asyncWorker 的 branchCommit@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {
    // 用非阻塞的方式二阶段上下文到阻塞队列if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
    LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);}return BranchStatus.PhaseTwo_Committed;}

批量删除回滚日志

  private void doBranchCommits() {
    if (ASYNC_COMMIT_BUFFER.isEmpty()) {
    return;}Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);List<Phase2Context> contextsGroupedByResourceId;while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
    // 从阻塞队列批量获取二阶段上下文Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();contextsGroupedByResourceId = CollectionUtils.computeIfAbsent(mappedContexts, commitContext.resourceId, key -> new ArrayList<>());contextsGroupedByResourceId.add(commitContext);}// 省略遍历 mappedContexts 获得xids,branchIds 的代码,和大量的try,catch 和无关代码//批量删除回滚日志, 构造一个删除语句: delete from undu_log where xid in (?) and branch_id in (?)UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);}

二阶段回滚

二阶段回滚由事物协调者(TC)发起, 微服务的资源管理器执行的操作

AT模式由 RMHandlerAT#handle(BranchRollbackRequest request) 处理

  1. 通过全局事物ID(xid)和分支事物id(branchId)查询回滚日志表(undo_log)获得回滚日志
  2. 通过数据库类型和回滚日志创建执行器(Executor)
  3. 由执行器驱动数据回滚, 首先进行数据验证,验证通过则回滚
    • 如果相等就不用执行数据回滚,然后对比前置镜像和当前对象,
    • 如果相等就不用执行数据回滚,
    • 如果后置镜像和当前对象不相等就抛出脏数据检查异常,
    • 如果后置镜像和当前对象相等,执行数据回滚。
  4. 如果查询到了回滚日志, 删除回滚日志。 如果没查询到回滚日志, 插入一条状态全局事物已完成的回滚日志 。

执行器的数据验证

  protected boolean dataValidationAndGoOn(Connection conn) throws SQLException {
    TableRecords beforeRecords = sqlUndoLog.getBeforeImage();TableRecords afterRecords = sqlUndoLog.getAfterImage();// 对比前置镜像和后置镜像, 相同则表示验证失败,验证失败就不做数据回滚Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);if (beforeEqualsAfterResult.getResult()) {
    if (LOGGER.isInfoEnabled()) {
    LOGGER.info("Stop rollback because there is no data change " +"between the before data snapshot and the after data snapshot.");}// no need continue undo.return false;}//查询当前数据TableRecords currentRecords = queryCurrentRecords(conn);// 对比后置镜像和当前数据Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);if (!afterEqualsCurrentResult.getResult()) {
    // 前置镜像和当前数据一样, 验证失败,数据不回滚Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);if (beforeEqualsCurrentResult.getResult()) {
    return false;} else {
    // 发生了脏写, 抛出异常throw new SQLException("Has dirty records when undo.");}}return true;}

删除或者插入并发日志

// 如果通过xid和branchId查询回滚日志表的数据是存在的
if (exists) {
    
//删除回滚日志deleteUndoLog(xid, branchId, conn);// 提交事务conn.commit();if (LOGGER.isInfoEnabled()) {
    LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,State.GlobalFinished.name());}} else {
    // 通过全局事物已完成的回滚日志, 全局事物已完成的状态码: 1insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);conn.commit();if (LOGGER.isInfoEnabled()) {
    LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,State.GlobalFinished.name());}}
  相关解决方案