spring+atomikos+mytatis+jpa实现分布式事务管理实战篇

最近在工作中,由于业务的发展需求,需要搭建一个类似中台的项目(拥有独立的数据库),那么跟我们项目中原本存在库,一共两个库,而数据库服务器为MYSQL,这时候就涉及到分布式事务的管理了。我也在网上找了很多解决方案,但貌似aomikos+mybatis+jta的解决方案比较少,只能参考网上的案例来集成代码到我们的项目当中,运行起来发现不定时抛出一些莫名其妙的异常,凭着自己的感觉一步步把这些坑给修补了,跑了两天也没看到异常抛出,程序也正常执行,注:我是在dubbo服务接口层以及spring quartz应用中都集成了atomikos+jta。下面来看一下集成的步骤

1)首先引入JAR包,我使用的是MAVEN来管理项目以及JAR包

<!-- transaction -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-tx</artifactId>
		</dependency>
		<dependency>
			<groupId>javax.transaction</groupId>
			<artifactId>jta</artifactId>
			<version>1.1</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>atomikos-util</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions-jta</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions-jdbc</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions-api</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>cglib</groupId>
			<artifactId>cglib-nodep</artifactId>
		</dependency>
		<!-- transaction -->

2)在你的项目class包中加入jta.properties配置文件,由于MAVEN JAVA项目,src/main/resources下面存放的在编译时,会放到class里,所以我们放在

spring+atomikos+mytatis+jpa实现分布式事务管理实战篇

而jta.properties的配置内容如下,按需调整吧

com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
com.atomikos.icatch.console_file_name = ${log.dir}jta/tm.out
com.atomikos.icatch.log_base_name = tmlog
com.atomikos.icatch.tm_unique_name = com.atomikos.spring.jdbc.tm
com.atomikos.icatch.console_log_level=ERROR
com.atomikos.icatch.enable_logging=false

配置完这个之后,下面就是数据源以及事务的配置了,数据源配置如下:

<bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
		init-method="init" destroy-method="close" abstract="true">
		<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
		<property name="poolSize" value="10" />
		<!--min-pool-size 最小连接数-->
		<property name="minPoolSize" value="10" />
		<!--max-pool-size 最大连接数 -->
		<property name="maxPoolSize" value="30" />
		<!--获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回-->
		<property name="borrowConnectionTimeout" value="60" />
		 <!-- 如果不设置这个值,Atomikos使用默认的300秒(即5分钟),那么在处理大批量数据读取的时候,
		 一旦超过5分钟,就会抛出类似 Resultset is close 的错误 -->
		<property name="reapTimeout" value="20" />
		<!-- max-idle-time 最大闲置时间,超过最小连接池连接的连接将将关闭 -->
		<property name="maxIdleTime" value="60" />
		<!-- maintenance-interval 连接回收时间 -->
		<property name="maintenanceInterval" value="60" />
		<!-- login-timeout java数据库连接池,最大可等待获取datasouce的时间 -->
		<property name="loginTimeout" value="60" />
		<property name="testQuery" value="SELECT 1" />
		<!-- max-lifetime 连接最大存活时间 -->
		<property name="maxLifetime" value="60"></property>
	</bean>
	<!-- WS数据源配置, 使用DBCP数据库连接池 -->
	<bean id="saleDataSource" parent="abstractXADataSource">
	 <property name="uniqueResourceName" value="saleDB" /> 
	 <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
	 <property name="xaProperties">
 <props>
 <prop key="url">${jdbc.url}</prop>
 <prop key="password">${jdbc.password}</prop>
 <prop key="user">${jdbc.username}</prop>
				<prop key="autoReconnect">true</prop>
				<prop key="pinGlobalTxToPhysicalConnection">true</prop>
 </props>
 </property>
	</bean>

	<!-- 主数据源配置, 使用DBCP数据库连接池 -->
	<bean id="masterDataSource" parent="abstractXADataSource">
	 <property name="uniqueResourceName" value="masterDB" /> 
	 <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" /> 
	 <property name="xaProperties">
 <props>
 <prop key="url">${csmjdbc.url}</prop>
 <prop key="password">${jdbc.password}</prop>
 <prop key="user">${jdbc.username}</prop>
				<prop key="autoReconnect">true</prop>
				<prop key="pinGlobalTxToPhysicalConnection">true</prop>
 </props>
 </property>
	</bean>

	<!-- 主库MyBatis配置 -->
	<bean id="sqlSessionFactoryMain" class="org.mybatis.spring.SqlSessionFactoryBean">
		<property name="dataSource" ref="masterDataSource" />
		<!-- 自动扫描entity目录, 省掉Configuration.xml里的手工配置 -->
		<property name="typeAliasesPackage" value="${type.aliases.package:com.csair}" />
		<!-- 显式指定Mapper文件位置 -->
		<property name="mapperLocations" value="classpath*:/main/dao/mapper/**/*.xml" />
		<property name="typeHandlersPackage"
			value="${type.handlers.package:com.csair.diamond.repository.mybatis.handler}" />
		<property name="configLocation" value="classpath:/META-INF/mybatis-config.xml" />
	</bean>


	<!-- WS库的MyBatis配置 -->
	<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
		<property name="dataSource" ref="saleDataSource" />
		<!-- 自动扫描entity目录, 省掉Configuration.xml里的手工配置 -->
		<property name="typeAliasesPackage" value="${type.aliases.package:com.csair}" />
		<!-- 显式指定Mapper文件位置 -->
		<property name="mapperLocations" value="classpath*:/dao/mapper/**/*.xml" />
		<property name="typeHandlersPackage"
			value="${type.handlers.package:com.csair.diamond.repository.mybatis.handler}" />
		<property name="plugins">
			<list>
				<ref bean="mybatisSqlInjectionHandlerInterceptor" />
				<ref bean="mybatisStatementHandlerInterceptor" />
				<ref bean="mybatisResultSetHandlerInterceptor" />
			</list>
		</property>
		<property name="configLocation" value="classpath:/META-INF/mybatis-config.xml" />
	</bean>
	<bean id="mybatisStatementHandlerInterceptor"
		class="com.csair.diamond.repository.mybatis.interceptor.StatementHandlerInterceptor">
		<property name="dialectClass"
			value="com.csair.diamond.repository.mybatis.MySqlDialect" />
	</bean>
	<bean id="mybatisResultSetHandlerInterceptor"
		class="com.csair.diamond.repository.mybatis.interceptor.ResultSetHandlerInterceptor">
	</bean>
	<bean id="mybatisSqlInjectionHandlerInterceptor"
		class="com.csair.diamond.repository.mybatis.interceptor.SqlInjectionHandlerInterceptor">
	</bean>
	<!-- 扫描basePackage下所有以@Repository标识的 接口 -->
	<bean name = "mapperScannerConfigurer" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
		<property name="basePackage"
			value="${mapper.scanner.base.package:com.csair.csm.dao}" />
		<property name="annotationClass"
			value="com.csair.diamond.repository.annotation.Repository" />
		<!-- <property name="sqlSessionFactory" ref="sqlSessionFactory" /> -->
		<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
	</bean>

	<bean name = "mainMapperScannerConfigurer" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
		<property name="basePackage"
			value="com.csair.csm.main.dao" />
		<property name="annotationClass"
			value="com.csair.diamond.repository.annotation.Repository" />
		<property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryMain" />
	</bean>

事务的配置如下

<!-- atomikos事务管理器 -->
	<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
 <!-- close()时是否强制终止事务 -->
 <property name="forceShutdown">
 <value>true</value>
 </property>
 </bean>

 <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
 <property name="transactionTimeout" value="300" />
 </bean>
 	<!-- spring 事务管理器 --> 
 <bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
 <property name="transactionManager" ref="atomikosTransactionManager" />
 <property name="userTransaction" ref="atomikosUserTransaction" />
 <!-- 必须设置,否则程序出现异常 JtaTransactionManager does not support custom isolation levels by default -->
 <property name="allowCustomIsolationLevels" value="true"/> 
 </bean>
 	<tx:annotation-driven transaction-manager="springTransactionManager" proxy-target-class="true" />

spring-quartz配置文件如下:

<bean id="quartzSchedulerSupplier"
		class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
		<property name="dataSource">
			<ref bean="saleDataSource" />
		</property>
		<property name="applicationContextSchedulerContextKey" value="applicationContextKey" />
		<property name="configLocation" value="classpath:quartz.properties" />
		<property name="triggers">
			<list>
				<ref bean="triggerSupplier" />
				<ref bean="triggerProduct" />
				<ref bean="triggerPicture" />
			</list>
		</property>
	</bean>

我是使用事务的注解形式回滚事务,我原本是采用切入点来定义某个包下面的所有类以哪些方法开头是有事务并且回滚,哪些是没有事务的,但是集成spring quartz会发现事务开启不一致的问题,spring quartz也有自己的数据源,一套自己的事务,如果不用事务注解,定时任务会开启自己的本地事务,而定时任务调用服务又是一个XA的事务,这时候就会抛事务不统一。

Caused by: com.atomikos.datasource.ResourceException: XA resource 'mysql/first': resume for XID '3137322E31362E33312E38332E746D30303030313030303131:3137322E31362E33312E38332E746D31' raised -9: the XA resource is currently involved in a local (non-XA) transaction

解决方案:采用事务注解,并且在定时任务入口类标注上注解。

如果发现以下这种错误异常问题,主要是系统有两个事务,一个事务提交,一个事务未提交,导致事务冲突,建议采用事务注解在最外层方法标注上。

Cannot call method 'commit' while a global transaction is runing(因为我是继承了定时任务,在方法内部手动开启事务,当我方法内部事务还未提交时,定时任务自己内部的事务就提交了,这时候就会抛这个异常,如果集成定时任务,建议不要手动开启事务)

spring+atomikos+mytatis+jpa实现分布式事务管理实战篇

如果在运行期抛[ERROR][2016-11-03 10:17:30,771][com.atomikos.recovery.imp.CachedRepository]Corrupted log file - restart JVM

com.atomikos.recovery.LogReadException: java.lang.ArrayIndexOutOfBoundsException: 1 at ,解决方案就是把atomikos的jar包版本升级到4.0.4即可。

如果在运行期发现一开始是正常,后期频繁抛如下的错误,根据我的填坑经验,这个是数据库连接池选型不对的问题,以及配置连接池属性跟数据库的配置不一致。

2019-05-06 17:57:37.865 [ Atomikos:2 ] - [ WARN ] [com.atomikos.recovery.xa.XaResourceRecoveryManager : 40] - Error while retrieving xids from resource - will retry later...

com.mysql.jdbc.jdbc2.optional.MysqlXAException: No operations allowed after connection closed.

at com.mysql.jdbc.jdbc2.optional.MysqlXAConnection.mapXAExceptionFromSQLException(MysqlXAConnection.java:608)

at com.mysql.jdbc.jdbc2.optional.MysqlXAConnection.recover(MysqlXAConnection.java:335)

at com.mysql.jdbc.jdbc2.optional.MysqlXAConnection.recover(MysqlXAConnection.java:255)

at com.atomikos.datasource.xa.RecoveryScan.recoverXids(RecoveryScan.java:32)

at com.atomikos.recovery.xa.XaResourceRecoveryManager.retrievePreparedXidsFromXaResource(XaResourceRecoveryManager.java:158)

at com.atomikos.recovery.xa.XaResourceRecoveryManager.recover(XaResourceRecoveryManager.java:67)

at com.atomikos.datasource.xa.XATransactionalResource.recover(XATransactionalResource.java:451)

at com.atomikos.icatch.imp.TransactionServiceImp.performRecovery(TransactionServiceImp.java:490)

at com.atomikos.icatch.imp.TransactionServiceImp.access$000(TransactionServiceImp.java:56)

at com.atomikos.icatch.imp.TransactionServiceImp$1.alarm(TransactionServiceImp.java:471)

at com.atomikos.timing.PooledAlarmTimer.notifyListeners(PooledAlarmTimer.java:95)

at com.atomikos.timing.PooledAlarmTimer.run(PooledAlarmTimer.java:82)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

一开始xaDataSourceClassName属性值我是采用com.alibaba.druid.pool.xa.DruidXADataSource

<bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
		init-method="init" destroy-method="close" abstract="true">
		<property name="xaDataSourceClassName" value="com.alibaba.druid.pool.xa.DruidXADataSource" />
		<property name="poolSize" value="10" />
		<property name="minPoolSize" value="10" />
		<property name="maxPoolSize" value="30" />
		<property name="borrowConnectionTimeout" value="60" />
		<property name="reapTimeout" value="20" />
		<property name="maxIdleTime" value="60" />
		<property name="maintenanceInterval" value="60" />
		<property name="loginTimeout" value="60" />
		<property name="testQuery" value="SELECT 1" />
	</bean>

后来采用xaDataSourceClassName为com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

<bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
		init-method="init" destroy-method="close" abstract="true">
		<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
		<property name="poolSize" value="10" />
		<!--min-pool-size 最小连接数-->
		<property name="minPoolSize" value="10" />
		<!--max-pool-size 最大连接数 -->
		<property name="maxPoolSize" value="30" />
		<!--获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回-->
		<property name="borrowConnectionTimeout" value="60" />
		 <!-- 如果不设置这个值,Atomikos使用默认的300秒(即5分钟),那么在处理大批量数据读取的时候,
		 一旦超过5分钟,就会抛出类似 Resultset is close 的错误 -->
		<property name="reapTimeout" value="20" />
		<!-- max-idle-time 最大闲置时间,超过最小连接池连接的连接将将关闭 -->
		<property name="maxIdleTime" value="60" />
		<!-- maintenance-interval 连接回收时间 -->
		<property name="maintenanceInterval" value="60" />
		<!-- login-timeout java数据库连接池,最大可等待获取datasouce的时间 -->
		<property name="loginTimeout" value="60" />
		<property name="testQuery" value="SELECT 1" />
		<!-- max-lifetime 连接最大存活时间 -->
		<property name="maxLifetime" value="60"></property>
	</bean>

作者:huangkejie

原文:https://my.oschina.net/u/3155476/blog/3047302

相关推荐