跨数据库事务研究

两种方案:

1、分布式事务jta

2、事务补偿

3二阶段提交

分布式事务,记得google有篇关于存储的论文专门讲这个。分布式事务要保证的100%一致性基本不可能,特别是异构数据库。我的建议是降低实时性要求,通过对账,应答的方式识别业务失败,再进行修复,这样更具可操作性。

你可以把对每个库的操作都独立开来,一个发生异常,其他都还原。涉及还原的问题就看你自己怎么解决了,有的可以用rollback,有的则需要事先备份到临时表的。

为什么要替代分布式事务?

当我们系统的数据量很大,大都需要对数据库进行分割,部署多台数据库实例,这样就避免不了某些操作需要同时修改几个数据库实例

里的数据,为了保证数据准确性和一致性,我们大都使用分布式事务来实现(非常经典的两阶段提交协议)。

分布式事务最大的优点就是简化应用开发,对于时间紧迫并且性能要求不高的系统可以大大的提高开发效率,这也是大多开发者沉醉于

其中的主要原因。但没有十全十美的,有利必有弊,虽然开发便捷了,但是也严重的损害了系统的可用性,高性能和可扩展性,尤其对于

海量数据复杂的系统体现就更明显。

系统的可用性

系统的可用性就相当于参加分布式事务的各个数据库实例的可用性之积,数据库实例越多,可用性下降的越明显;因为参加分布式事务

的所有数据库实例都可以正常工作下,这个分布式事务才算完成,如果有一个数据库实例有故障,那这个分布式事务都会失败。

高效能和可伸缩性

对于一个分布式事务总的持续时间是操作各个数据库实例的时间之和,因为在分布式事务中每个操作是顺序执行的,这样每个事务的响

应时间就会很长;还有对于一个OLTP系统,事务都很小,一般几毫秒,当涉及到分布式事务时,节点间的网络通信时间占事务总响应时

间的比例也是不容忽视的。还有由于事务时间相对于变长了,锁定的资源的时间也就变长了。从而严重影响系统的并发性,吞吐率和可

伸缩性。

事务补偿机制

事务补偿即在事务链中的任何一个正向事务操作,都必须存在一个完全符合回滚规则的可逆事务。如果是一个完整的事务链,则必须事务链中的每一个业务服务或操作都有对应的可逆服务。对于Service服务本身无状态,也不容易实现前面讨论过的通过DTC或XA机制实现的跨应用和资源的事务管理,建立跨资源的事务上下文。因此也较难以实现真正的预提交和正式提交的分离。

在这种情况下以上面例子来说,首先调用取款服务,完全调用成功并返回,数据已经持久化。然后调用异地的存款服务,如果也调用成功,则本身无任何问题。如果调用失败,则需要调用本地注册的逆向服务(本地存款服务),如果本地存款服务调用失败,则必须考虑重试,如果约定重试次数仍然不成功,则必须log到完整的不一致信息。也可以是将本地存款服务作为消息发送到消息中间件,由消息中间件接管后续操作。

在上面方式中可以看到需要手工编写大量的代码来处理以保证事务的完整性,我们可以考虑实现一个通用的事务管理器,实现事务链和事务上下文的管理。对于事务链上的任何一个服务正向和逆向操作均在事务管理和协同器上注册,由事务管理器接管所有的事务补偿和回滚操作。

基于消息的最终一致性

在这里首先要回答的是我们需要时实时一致性还是最终一致性的问题,如果需要的是最终一致性,那么BASE策略中的基于消息的最终一致性是比较好的解决方案。这种方案真正实现了两个服务的真正解耦,解耦的关键就是异步消息和消息持久化机制。

还是以上面的例子来看。对于转账操作,原有的两个服务调用变化为第一步调用本地的取款服务,第二步发送异地取款的异步消息到消息中间件。如果第二步在本地,则保证事务的完整性基本无任何问题,即本身就是本地事务的管理机制。只要两个操作都成功即可以返回客户成功。

由于解耦,我们看到客户得到成功返回的时候,如果是上面一种情况则异地卡马上就能查询账户存款增加。而第二种情况则不一定,因为本身是一种异步处理机制。消息中间件得到消息后会去对消息解析,然后调用异地银行提供的存款服务进行存款,如果服务调用失败则进行重试。

异地银行存款操作不应该长久地出现异常而无法使用,因此一旦发现异常我们可以迅速的解决,消息中间件中异常服务自然会进行重试以保证事务的最终一致性。这种方式假设问题一定可以解决,在不到万不得已的情况下本地的取款服务一般不进行可逆操作。

在本地取款到异地存款两个服务调用之间,会存在一个真空期,这段时间相关现金不在任何一个账户,而只是在一个事务的中间状态,但是客户并不关心这个,只要在约定的时间保证事务最终的一致性即可。

关于等幂操作的问题

重复调用多次产生的业务结果与调用一次产生的业务结果相同,简单点讲所有提供的业务服务,不管是正向还是逆向的业务服务,都必须要支持重试。因为服务调用失败这种异常必须考虑到,不能因为服务的多次调用而导致业务数据的累计增加或减少。

关于是否可以补偿的问题

在这里我们谈的是多个跨系统的业务服务组合成一个分布式事务,因此在对事务进行补偿的时候必须要考虑客户需要的是否一定是最终一致性。客户对中间阶段出现的不一致的承受度是如何的。

在上面的例子来看,如果采用事务补偿机制,基本可以是做到准实时的补偿,不会有太大的影响。而如果采用基于消息的最终一致性方式,则可能整个周期比较长,需要较长的时间才能给得到最终的一致性。比如周六转款,客户可能下周一才得到通知转账不成功而进行了回退,那么就必须要考虑客户是否能给忍受。

其次对于前面讨论,如果真正需要的是实时的一致性,那么即使采用事务补偿机制,也无法达到实时的一致性。即很可能在两个业务服务调用中间,客户前台业务操作对持久化的数据进行了其它额外的操作。在这种模式下,我们不得不考虑需要在数据库表增加业务状态锁的问题,即整个事务没有完整提交并成功前,第一个业务服务调用虽然持久化在数据库,但是仍然是一个中间状态,需要通过业务锁来标记,控制相关的业务操作和行为。但是在这种模式下无疑增加了整个分布式业务系统的复杂度。

消息队列-状态表方案和分布式事务的对比

对于时间紧迫或者对性能要求不高的系统,应采用分布式事务加快开发效率;对于时间需求不是很紧,对性能要求很高的系统,

应考虑使用消息队列方案。所以时间与便捷,性能与扩展是需要仔细衡量的,找好中间的平衡点;对于原来使用分布式事务,

且系统已趋于稳定,性能要求高的系统,则可以使用消息队列-状态表方案进行重构来优化性能。

两阶段提交(2PC)

阿里也在用二阶段提交

两阶段提交协议可以保证数据的强一致性,许多分布式关系型数据管理系统采用此协议来完成分布式事务。它是协调所有分布式原子事务参与者,并决定提交或取消(回滚)的分布式算法。同时也是解决一致性问题的算法。该算法能够解决很多的临时性系统故障(包括进程、网络节点、通信等故障),被广泛地使用。但是,它并不能够通过配置来解决所有的故障,在某些情况下它还需要人为的参与才能解决问题。

顾名思义,两阶段提交分为以下两个阶段:

1)PreparePhase(准备节点)

2)CommitPhase(提交阶段)

1)PreparePhase

在请求阶段,协调者将通知事务参与者准备提交或取消事务,然后进入表决过程。在表决过程中,参与者将告知协调者自己的决策:同意(事务参与者本地作业执行成功)或取消(本地作业执行故障)。

为了完成准准备阶段,除了commitpointsite外,其它的数据库节点按照以下步骤执行:

每个节点检查自己是否被其它节点所引用,如果有,就通知这些节点准备提交(进入Prepare阶段)。

每个节点检查自己运行的事务,如果发现本地运行的事务没有修改数据的操作(只读),则跳过后面的步骤,直接返回一个readonly给全局协调器。

如果事务需要修改数据,则为事务分配相应的资源用于保证修改的正常进行。

当上面的工作都成功后,给全局协调器返回准备就绪的信息,反之,则返回失败的信息。

2)CommitPhase

在该阶段,协调者将基于第一个阶段的投票结果进行决策:提交或取消。当且仅当所有的参与者同意提交事务协调者才通知所有的参与者提交事务,否则协调者将通知所有的参与者取消事务。参与者在接收到协调者发来的消息后将执行响应的操作。

提交阶段按下面的步骤进行:

全局协调器通知commitpointsite进行提交。

commitpointsite提交,完成后通知全局协调器。

全局协调器通知其它节点进行提交。

其它节点各自提交本地事务,完成后释放锁和资源。

其它节点通知全局协调器提交完成。

3)结束阶段

全局协调器通知commitpointsite说所有节点提交完成。

commitpointsite数据库释放和事务相关的所有资源,然后通知全局协调器。

全局协调器释放自己持有的资源。

分布式事务结束

一般情况下,两阶段提交机制都能较好的运行,当在事务进行过程中,有参与者宕机时,重启以后,可以通过询问其他参与者或者协调者,从而知道这个事务到底提交了没有。当然,这一切的前提都是各个参与者在进行每一步操作时,都会事先写入日志。

唯一一个两阶段提交不能解决的困境是:当协调者在发出commit消息后宕机,而唯一收到这条命令的一个参与者也宕机了,这个时候这个事务就处于一个未知的状态,没有人知道这个事务到底是提交了还是未提交,从而需要数据库管理员的介入,防止数据库进入一个不一致的状态。当然,如果有一个前提是:所有节点或者网络的异常最终都会恢复,那么这个问题就不存在了,协调者和参与者最终会重启,其他节点也最终会收到commit的信息。这也符合CAP理论。

以下来自http://blog.csdn.net/ithomer/article/details/10859235

spring的分布式事务

分布式事务是指操作多个数据库之间的事务,spring的org.springframework.transaction.jta.JtaTransactionManager,提供了分布式事务支持。如果使用WAS的JTA支持,把它的属性改为WebSphere对应的TransactionManager。

在tomcat下,是没有分布式事务的,不过可以借助于第三方软件jotm(JavaOpenTransactionManager)和AtomikosTransactionsEssentials实现,在spring中分布式事务是通过jta(jotm,atomikos)来进行实现。

1、http://jotm.objectweb.org/

2、http://www.atomikos.com/Main/TransactionsEssentials

一、使用JOTM例子

(1)Dao及实现

GenericDao接口:

[java]viewplaincopyprint?

publicinterfaceGenericDao{

publicintsave(Stringds,Stringsql,Object[]obj)throwsException;

publicintfindRowCount(Stringds,Stringsql);

}

GenericDaoImpl实现:

[java]viewplaincopyprint?

publicclassGenericDaoImplimplementsGenericDao{

privateJdbcTemplatejdbcTemplateA;

privateJdbcTemplatejdbcTemplateB;

publicvoidsetJdbcTemplateA(JdbcTemplatejdbcTemplate){

this.jdbcTemplateA=jdbcTemplate;

}

publicvoidsetJdbcTemplateB(JdbcTemplatejdbcTemplate){

this.jdbcTemplateB=jdbcTemplate;

}

publicintsave(Stringds,Stringsql,Object[]obj)throwsException{

if(null==ds||"".equals(ds))return-1;

try{

if(ds.equals("A")){

returnthis.jdbcTemplateA.update(sql,obj);

}else{

returnthis.jdbcTemplateB.update(sql,obj);

}

}catch(Exceptione){

e.printStackTrace();

thrownewException("执行"+ds+"数据库时失败!");

}

}

publicintfindRowCount(Stringds,Stringsql){

if(null==ds||"".equals(ds))return-1;

if(ds.equals("A")){

returnthis.jdbcTemplateA.queryForInt(sql);

}else{

returnthis.jdbcTemplateB.queryForInt(sql);

}

}

}

(2)Service及实现

UserService接口:

[java]viewplaincopyprint?

publicinterfaceUserService{

publicvoidsaveUser()throwsException;

}

UserServiceImpl实现:

[java]viewplaincopyprint?

publicclassUserServiceImplimplementsUserService{

privateGenericDaogenericDao;

publicvoidsetGenericDao(GenericDaogenericDao){

this.genericDao=genericDao;

}

publicvoidsaveUser()throwsException{

Stringusername="user_"+Math.round(Math.random()*10000);

System.out.println(userName);

StringBuildersql=newStringBuilder();

sql.append("insertintot_user(username,gender)values(?,?);");

Object[]objs=newObject[]{userName,"1"};

genericDao.save("A",sql.toString(),objs);

sql.delete(0,sql.length());

sql.append("insertintot_user(name,sex)values(?,?);");

objs=newObject[]{userName,"男的"};//值超出范围

genericDao.save("B",sql.toString(),objs);

}

}

(3)applicationContext-jotm.xml

[java]viewplaincopyprint?

<?xmlversion="1.0"encoding="UTF-8"?>

<beansxmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:tx="http://www.springframework.org/schema/tx"

xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-2.5.xsd

http://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-2.5.xsd

http://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-2.5.xsd

http://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-2.5.xsd">

<description>springJTA</description>

<!--指定Spring配置中用到的属性文件-->

<beanid="propertyConfig"

class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

<propertyname="locations">

<list>

<value>classpath:jdbc.properties</value>

</list>

</property>

</bean>

<!--JOTM实例-->

<beanid="jotm"class="org.springframework.transaction.jta.JotmFactoryBean">

<propertyname="defaultTimeout"value="500000"/>

</bean>

<!--JTA事务管理器-->

<beanid="jtaTransactionManager"class="org.springframework.transaction.jta.JtaTransactionManager">

<propertyname="userTransaction"ref="jotm"/>

</bean>

<!--数据源A-->

<beanid="dataSourceA"class="org.enhydra.jdbc.pool.StandardXAPoolDataSource"destroy-method="shutdown">

<propertyname="dataSource">

<beanclass="org.enhydra.jdbc.standard.StandardXADataSource"destroy-method="shutdown">

<propertyname="transactionManager"ref="jotm"/>

<propertyname="driverName"value="${jdbc.driver}"/>

<propertyname="url"value="${jdbc.url}"/>

</bean>

</property>

<propertyname="user"value="${jdbc.username}"/>

<propertyname="password"value="${jdbc.password}"/>

</bean>

<!--数据源B-->

<beanid="dataSourceB"class="org.enhydra.jdbc.pool.StandardXAPoolDataSource"destroy-method="shutdown">

<propertyname="dataSource">

<beanclass="org.enhydra.jdbc.standard.StandardXADataSource"destroy-method="shutdown">

<propertyname="transactionManager"ref="jotm"/>

<propertyname="driverName"value="${jdbc2.driver}"/>

<propertyname="url"value="${jdbc2.url}"/>

</bean>

</property>

<propertyname="user"value="${jdbc2.username}"/>

<propertyname="password"value="${jdbc2.password}"/>

</bean>

<beanid="jdbcTemplateA"

class="org.springframework.jdbc.core.JdbcTemplate">

<propertyname="dataSource"ref="dataSourceA"/>

</bean>

<beanid="jdbcTemplateB"

class="org.springframework.jdbc.core.JdbcTemplate">

<propertyname="dataSource"ref="dataSourceB"/>

</bean>

<!--事务切面配置-->

<aop:config>

<aop:pointcutid="pointCut"

expression="execution(*com.logcd.service..*.*(..))"/><!--包及其子包下的所有方法-->

<aop:advisorpointcut-ref="pointCut"advice-ref="txAdvice"/>

<aop:advisorpointcut="execution(**..common.service..*.*(..))"advice-ref="txAdvice"/>

</aop:config>

<!--通知配置-->

<tx:adviceid="txAdvice"transaction-manager="jtaTransactionManager">

<tx:attributes>

<tx:methodname="delete*"rollback-for="Exception"/>

<tx:methodname="save*"rollback-for="Exception"/>

<tx:methodname="update*"rollback-for="Exception"/>

<tx:methodname="find*"read-only="true"rollback-for="Exception"/>

</tx:attributes>

</tx:advice>

<beanid="genericDao"class="com.logcd.dao.impl.GenericDaoImpl"autowire="byName"></bean>

<beanid="userService"class="com.logcd.service.impl.UserServiceImpl"autowire="byName"></bean>

</beans>

(4)测试

[java]viewplaincopyprint?

publicclassTestUserService{

privatestaticUserServiceuserService;

@BeforeClass

publicstaticvoidinit(){

ApplicationContextapp=newClassPathXmlApplicationContext("applicationContext-jotm.xml");

userService=(UserService)app.getBean("userService");

}

@Test

publicvoidsave(){

System.out.println("begin...");

try{

userService.saveUser();

}catch(Exceptione){

System.out.println(e.getMessage());

}

System.out.println("finish...");

}

}

二、关于使用atomikos实现

(1)数据源配置

[java]viewplaincopyprint?

<beanid="dataSourceA"class="com.atomikos.jdbc.SimpleDataSourceBean"init-method="init"destroy-method="close">

<propertyname="uniqueResourceName">

<value>${datasource.uniqueResourceName}</value>

</property>

<propertyname="xaDataSourceClassName">

<value>${database.driver_class}</value>

</property>

<propertyname="xaDataSourceProperties">

<value>URL=${database.url};user=${database.username};password=${database.password}</value>

</property>

<propertyname="exclusiveConnectionMode">

<value>${connection.exclusive.mode}</value>

</property>

<propertyname="connectionPoolSize">

<value>${connection.pool.size}</value>

</property>

<propertyname="connectionTimeout">

<value>${connection.timeout}</value>

</property>

<propertyname="validatingQuery">

<value>SELECT1</value>

</property>

</bean>

(2)、事务配置

[java]viewplaincopyprint?

<beanid="atomikosTransactionManager"class="com.atomikos.icatch.jta.UserTransactionManager"

init-method="init"destroy-method="close">

<propertyname="forceShutdown"value="true"/>

</bean>

<beanid="atomikosUserTransaction"class="com.atomikos.icatch.jta.UserTransactionImp">

<propertyname="transactionTimeout"value="${transaction.timeout}"/>

</bean>

<!--JTA事务管理器-->

<beanid="springTransactionManager"class="org.springframework.transaction.jta.JtaTransactionManager">

<propertyname="transactionManager"ref="atomikosTransactionManager"/>

<propertyname="userTransaction"ref="atomikosUserTransaction"/>

</bean>

<!--事务切面配置-->

<aop:config>

<aop:pointcutid="serviceOperation"expression="execution(**..service*..*(..))"/>

<aop:advisorpointcut-ref="serviceOperation"advice-ref="txAdvice"/>

</aop:config>

<!--通知配置-->

<tx:adviceid="txAdvice"transaction-manager="springTransactionManager">

<tx:attributes>

<tx:methodname="*"rollback-for="Exception"/>

</tx:attributes>

</tx:advice>

相关推荐