引言:Mycat已经成为了一个强大的开源分布式数据库中间件产品。面对企业应用的海量数据事务处理,是目前最好的开源解决方案。但是如果想让多台机器中的数据保存一致,比较常规的解决方法是引入“协调者”来统一调度所有节点的执行。
本文选自《分布式数据库架构及企业实践——基于Mycat中间件》。mycat也是用相似的原理实现二阶段提交的,无论是raincat还是myth都会从线程的上下文找到分布式事物的事物群归属(messagegroup)(包括buddo,springcloud),无论是否是远程调用都会带上,其实mycat》raincat(raincat只有事物特性)
mycat相当于一个虚拟节点包含多数据节点,其对服务相当于一个虚拟数据库分布式系统公用这个数据库,外部看是一个数据库的分布式事物,其实mycat内部做了多个数据库(分片)的分布式事物,myth等的分布式事物是也可实现多个库之间的事物一致
随着并发量、数据量越来越大及业务已经细化到不能再按照业务划分,我们不得不使用分布式数据库提高系统的性能。在分布式系统中,各个节点在物理上都是相对独立的,每个节点上的数据操作都可以满足 ACID。但是,各独立节点之间无法知道其他节点事务的执行情况,如果想让多台机器中的数据保存一致,就必须保证所有节点上的数据操作要么全部执行成功,要么全部不执行,比较常规的解决方法是引入“协调者”来统一调度所有节点的执行。
XA 规范
X/Open 组织(即现在的 Open Group)定义了分布式事务处理模型。X/Open DTP 模型(1994)包括应用程序(AP)、事务管理器(TM)、资源管理器(RM)、通信资源管理器(CRM)四部分。事务管理器(TM)是交易中间件,资源管理器(RM)是数据库,通信资源管理器(CRM)是消息中间件。通常把一个数据库内部的事务处理看作本地事务,而分布式事务处理的对象是全局事务。全局事务是指在分布式事务处理环境中,多个数据库可能需要共同完成一个工作,这个工作就是一个全局事务。在一个事务中可能更新几个不同的数据库,此时一个数据库对自己内部所做操作的提交不仅需要本身的操作成功,还需要全局事务相关的其他数据库的操作成功。如果任一数据库的任一操作失败,则参与此事务的所有数据库所做的所有操作都必须回滚。XA就是X/Open DTP 定义的交易中间件与数据库之间的接口规范(即接口函数),交易中间件用它来通知数据库事务的开始、结束、提交、回滚等,XA 接口函数由数据库厂商提供,根据这一思想衍生出二阶段提交协议和三阶段提交协议。
二阶段提交
所谓的两个阶段是指准备阶段和提交阶段。
准备阶段指事务协调者(事务管理器)向每个参与者(资源管理器)发送准备消息,每个参与者要么直接返回失败消息(如权限验证失败),要么在本地执行事务,写本地的 redo 和undo日志但不提交,可以进一步将准备阶段分为以下三步。
(1)协调者节点向所有参与者节点询问是否可以执行提交操作(vote),并开始等待各参与者节点的响应。
(2)参与者节点执行询问发起为止的所有事务操作,并将 undo 信息和 redo 信息写入日志。
(3)各参与者节点响应协调者节点发起的询问。如果参与者节点的事务操作实际执行成功,则它返回一个“同意”消息;如果参与者节点的事务操作实际执行失败,则它返回一个“中止”消息。
提交阶段指如果协调者收到了参与者的失败消息或者超时,则直接向每个参与者发送回滚(Rollback)消息,否则发送提交(Commit)消息,参与者根据协调者的指令执行提交或者回滚操作,释放所有事务在处理过程中使用的锁资源。
二阶段提交所存在的缺点如下。
(1)同步阻塞问题,在执行过程中所有参与节点都是事务阻塞型的,当参与者占用公共资源时,其他第三方节点访问公共资源时不得不处于阻塞状态。
(2)单点故障,由于协调者的重要性,一旦协调者发生故障,则参与者会一直阻塞下去。
(3)数据不一致,在二阶段提交的第 2 个阶段中,当协调者向参与者发送 commit 请求之后发生了局部网络异常或者在发送 commit 请求的过程中协调者发生了故障,则会导致只有一部分参与者接收到了 commit 请求,而在这部分参与者在接收到 commit 请求之后就会执行commit操作,其他部分未接收到 commit 请求的机器则无法执行事务提交,于是整个分布式系统便出现了数据不一致的现象。
由于二阶段提交存在诸如同步阻塞、单点问题、数据不一致、宕机等缺陷,所以,研究者们在二阶段提交的基础上做了改进,提出了三阶段提交。
三阶段提交
三阶段提交(Three-phase commit,3PC),也叫作三阶段提交协议(Three-phase commitprotocol),是二阶段提交(2PC)的改进版本。三阶段提交把二阶段提交的准备阶段再次一分为二,这样三阶段提交就有 CanCommit、PreCommit、DoCommit 三个阶段。
(1)CanCommit 阶段:三阶段提交的 CanCommit 阶段其实和二阶段提交的准备阶段很像,协调者向参与者发送 commit 请求,参与者如果可以提交就返回 Yes 响应,否则返回 No 响应。
(2)PreCommit 阶段:协调者根据参与者的反应情况来决定是否可以记录事务的 PreCommit操作。根据响应情况,有以下两种可能。
- 假如协调者从所有参与者那里获得的反馈都是 Yes 响应,则执行事务。
- 假如有任何一个参与者向协调者发送了 No 响应,或者等待超时之后协调者都没有接到参与者的响应,则执行事务的中断。
(3)DoCommit阶段:该阶段进行真正的事务提交,也可以分为执行提交、中断事务两种执行情况。
执行提交的过程如下。
- 协调者接收到参与者发送的ACK响应后,将从预提交状态进入提交状态,并向所有参与者发送doCommit请求。
- 事务提交参与者接收到doCommit请求之后,执行正式的事务提交,并在完成事务提交之后释放所有的事务资源。
- 事务提交完之后,向协调者发送ACK响应。
- 协调者接收到所有参与者的ACK响应之后,完成事务。中断事务的过程如下。
- 协调者向所有参与者发送abort请求。
- 参与者接收到 abort 请求之后,利用其在第 2 个阶段记录的 undo 信息来执行事务的回滚操作,并在完成回滚之后释放所有的事务资源。
- 参与者完成事务回滚之后,向协调者发送 ACK 消息。
- 协调者接收到参与者反馈的 ACK 消息之后,执行事务的中断。
Mycat 中分布式事务的实现
Mycat在1.6版本以后已经完全支持 XA 分布式强事务类型了,先通过一个简单的示例来了解Mycat中XA的用法。
用户应用侧(AP)的使用流程如下:
(1)set autocommit=0
在应用层需要设置事务不能自动提交;
(2)set xa=on
在 SQL 中设置 XA 为开启状态;
(3)执行 SQL
insert into travelrecord(id,name) values(1,’N’),(6000000,’A’),(321,’D’),(13400000,’C’),(59,’E’);
(4)commit 或者 rollback
对事务进行提交(提交成功或者回滚异常)。
完整的流程图如图所示。
Mycat 内部实现侧的实现流程如下:
(1)set autocommit=0
将 MysqlConnection 中的 autocommit 设置为 false;
(2)set xa=on
在Mycat中开启 XA 事务管理器,用 MycatServer.getInstance().genXATXID()生成 XID,用XA START XID 命令进行 XA 事务开始标记,继续拼装 SQL 业务(Mycat 会将上面的 insert 数据分片到不同的节点上),拼装 XA END XID,XA PREPARE XID 最后进行 1pc 提交并记录日志到 tm.log 中,如果 1pc 阶段有异常,则直接回滚事务 XA ROLLBACK xid。
(3)在多节点 MySQL 中全部进行 2pc 提交(XA COMMIT),提交成功后,事务结束;如果有异常,则对事务进行重新提交或者回滚。
Mycat 中的 XA 分布式事务的异常处理流程如下:
(1)一阶段 commit 异常:如果 1pc 提交任意一个 mysql 节点无法提交或者异常,则全部节点的事务进行回滚,抛出异常给应用侧事务回滚。
(2)Mycat Crash Recovery
Mycat 崩溃以后,根据 tm.log 事务日志再进行重启恢复,mycat 启动后执行事务日志查找各个节点中已经 prepared 的 XA 事务,进行 commit 或者 rollback。
1. 相关类说明
通过用户应用侧发送 set xa = on ; SQL 开启 Mycat 内部 XA 事务管理器的功能,事务管理器将对 MySQL 数据库进行 XA 方式的事务管理,具体事务管理功能的实现代码如下:
- MySQLConnection:数据库连接。
- NonBlockingSession:用户连接 Session。
- MultiNodeCoordinator:协调者。
- CommitNodeHandler:分片提交处理。
- RollbackNodeHandler:分片回滚处理。
2. 代码解析
XA 事务启动的源码如下:
public class MySQLConnection extends BackendAIOConnection { //设置开启事务 private void getAutocommitCommand(StringBuilder sb, boolean autoCommit) { if (autoCommit) { sb.append("SET autocommit=1;"); } else { sb.append("SET autocommit=0;"); } } public void execute(RouteResultsetNode rrn, ServerConnection sc,boolean autocommit) throws UnsupportedEncodingException { if(!modifiedSQLExecuted && rrn.isModifySQL()) { modifiedSQLExecuted = true; } //获取当前事务 ID String xaTXID = sc.getSession2().getXaTXID(); synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),autocommit); } …… ……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 MySQLConnection.java源码 }
用户应用侧设置手动提交以后,Mycat 会在当前连接中加入
SET autocommit=0;
将该语句加入到 StringBuffer 中,等待提交到数据库。
用户连接 Session 的源码如下:
public class NonBlockingSession implements Session { …… ……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 NonBlockingSession.java 源码 } SET XA = ON ;语句分析
用户应用侧发送该语句到 Mycat 中,由 SQL 语句解析器解析后交由 SetHandle 进行处理c.getSession2().setXATXEnabled (true);
调用 NonBlockSession 中的 setXATXEnable d 方法设置 XA 开关启动,并生成 XID,代码如下:
public void setXATXEnabled(boolean xaTXEnabled) { LOGGER.info("XA Transaction enabled ,con " + this.getSource()); if (xaTXEnabled && this.xaTXID == null) { xaTXID = genXATXID(); } }
另外,NonBlockSession 会接收来自于用户应用侧的 commit, 调用 commit 方法进行处理事务提交的逻辑。
在 commit()方法中,首先会 check 节点个数,一个节点和多个节点分为不同的处理过程,这里只讲下多个节点的处理方法 checkDistriTransaxAndExecute();
该方法会对多个节点的事务进行提交。
协调者的源码如下:
public class MultiNodeCoordinator implements ResponseHandler { …… ……//省略此处代码,建议读者参考 GitHub 仓库 MyCAT-Server 项目的 MultiNodeCoordinator.java 源码 }
在 NonBlockSession 的 checkDistriTransaxAndExecute()方法中, NonBlockSession 会话类会调用专门进行多节点协同的 MultiNodeCoordinator 类进行具体的处理,在 MultiNodeCoordinator类中,executeBatchNodeCmd 方法加入 XA 1PC 提交的处理,代码片段如下:
for (RouteResultsetNode rrn : session.getTargetKeys()) { …… if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE){ //recovery Log participantLogEntry[started] = new ParticipantLogEntry(xaTxId,conn.getHost(),0,conn.getSchema(),((MySQLConnection) conn).getXaStatus()); String[] cmds = new String[]{"XA END " + xaTxId,"XA PREPARE " + xaTxId}; if (LOGGER.isDebugEnabled()) { LOGGER.debug("Start execute the batch cmd : "+ cmds[0] + ";" +cmds[1]+","+"current connection:"+conn.getHost()+":"+conn.getPort()); } mysqlCon.execBatchCmd(cmds); } …… }
在 MultiNodeCoordinator 类的 okResponse 方法中,则进行 2pc 的事务提交
MySQLConnection mysqlCon = (MySQLConnection) conn; switch (mysqlCon.getXaStatus()){ case TxState.TX_STARTED_STATE: if (mysqlCon.batchCmdFinished()){ String xaTxId = session.getXaTXID(); String cmd = "XA COMMIT " + xaTxId; if (LOGGER.isDebugEnabled()) { LOGGER.debug("Start execute the cmd :"+cmd+",current host:"+mysqlCon.getHost()+":"+mysqlCon.getPort()); } //recovery log CoordinatorLogEntry coordinatorLogEntry =inMemoryRepository.get(xaTxId); for(int i=0; i<coordinatorLogEntry.participants.length;i++){ LOGGER.debug("[In MemoryCoordinatorLogEntry]"+coordinatorLogEntry.participants[i]); if(coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())){ coordinatorLogEntry.participants[i].txState =TxState.TX_PREPARED_STATE; } } inMemoryRepository.put(session.getXaTXID(),coordinatorLogEntry); fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries()); //send commit mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE); mysqlCon.execCmd(cmd); } return; …… }
分片事务提交处理的源码如下:
public class CommitNodeHandler implements ResponseHandler { //结束 XA public void commit(BackendConnection conn) { …… ……//省略此处代码,建议读者参考 GitHub 仓库 MyCAT-Server 项目的 CommitNodeHandler.java源码 } //提交 XA @Override public void okResponse(byte[] ok, BackendConnection conn) { …… ……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 CommitNodeHandler.java 源码 }
在 Mycat 中同样支持单节点 MySQL 数据库的 XA 事务处理,在 CommitNodeHandler 类中就是对单节点的 XA 二阶段处理,处理方式与 MultiNodeCoordinator 类同,通过 commit 方法进行 1pc 的提交,而通过 okResponse 的方法进行 2pc 阶段的事务提交。
分片事务回滚处理的源码如下:
public class RollbackNodeHandler extends MultiNodeHandler { …… ……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 RollbackNodeHandler.java 源码 }
在 RollbackNodeHandler 的 rollback 方法中加入了对 XA 事务的 rollback 处理,用户应用侧发起的 rollback 会在这个方法中进行处理。
for (final RouteResultsetNode node : session.getTargetKeys()) { …… //support the XA rollback MySQLConnection mysqlCon = (MySQLConnection) conn; if(session.getXaTXID()!=null) { String xaTxId = session.getXaTXID(); mysqlCon.execCmd("XA END " + xaTxId + ";"); mysqlCon.execCmd("XA ROLLBACK " + xaTxId + ";"); }else { conn.rollback(); } …… }
同样,该方法会对所有的 MySQL 数据库节点发起 xa rollback 指令。