当前位置: 移动技术网 > IT编程>开发语言>Java > Spring中使用atomikos+druid实现经典分布式事务的方法

Spring中使用atomikos+druid实现经典分布式事务的方法

2019年07月19日  | 移动技术网IT编程  | 我要评论

js相册,xingyangerxiao,胡闹王妃惹爱怜

经典分布式事务,是相对互联网中的柔性分布式事务而言,其特性为acid原则,包括原子性(atomictiy)、一致性(consistency)、隔离性(isolation)、持久性(durabilit):

  • 原子性:事务是一个包含一系列操作的原子操作。事务的原子性确保这些操作全部完成或者全部失败。
  • 一致性:一旦事务的所有操作结束,事务就被提交。然后你的数据和资源将处于遵循业务规则的一直状态。
  • 隔离性:因为同时在相同数据集上可能有许多事务处理,每个事务应该与其他事务隔离,避免数据破坏。
  • 持久性:一旦事务完成,他的结果应该能够承受任何系统错误(想象一下在事务提交过程中机器的电源被切断的情况)。通常,事务的结果被写入持续性存储。

xa是啥?

xa是由x/open组织提出的分布式事务的架构(或者叫协议)。xa架构主要定义了(全局)事务管理器(transaction manager)和(局部)资源管理器(resource manager)之间的接口。xa接口是双向的系统接口,在事务管理器(transaction manager)以及一个或多个资源管理器(resource manager)之间形成通信桥梁。也就是说,在基于xa的一个事务中,我们可以针对多个资源进行事务管理,例如一个系统访问多个数据库,或即访问数据库、又访问像消息中间件这样的资源。这样我们就能够实现在多个数据库和消息中间件直接实现全部提交、或全部取消的事务。xa规范不是java的规范,而是一种通用的规范,

目前各种数据库、以及很多消息中间件都支持xa规范。

jta是满足xa规范的、用于java开发的规范。所以,当我们说,使用jta实现分布式事务的时候,其实就是说,使用jta规范,实现系统内多个数据库、消息中间件等资源的事务。

jta(java transaction api),是j2ee的编程接口规范,它是xa协议的java实现。它主要定义了:

  • 一个事务管理器的接口javax.transaction.transactionmanager,定义了有关事务的开始、提交、撤回等>操作。
  • 一个满足xa规范的资源定义接口javax.transaction.xa.xaresource,一种资源如果要支持jta事务,就需要让它的资源实现该xaresource接口,并实现该接口定义的两阶段提交相关的接口。如果我们有一个应用,它使用jta接口实现事务,应用在运行的时候,就需要一个实现jta的容器,一般情况下,这是一个j2ee容器,像jboss,websphere等应用服务器。但是,也有一些独立的框架实现了jta,例如 atomikos, bitronix 都提供了jar包方式的jta实现框架。这样我们就能够在tomcat或者jetty之类的服务器上运行使用jta实现事务的应用系统。在上面的本地事务和外部事务的区别中说到,jta事务是外部事务,可以用来实现对多个资源的事务性。它正是通过每个资源实现的xaresource来进行两阶段提交的控制。感兴趣的同学可以看看这个接口的方法,除了commit, rollback等方法以外,还有end(), forget(), issamerm(), prepare()等等。光从这些接口就能够想象jta在实现两阶段事务的复杂性。

本篇以spring mvc+maven+atomikos+druid+mybatis演示分布式事务的实现。

mave 的pom.xml

<properties>
 <jdk.version>1.8</jdk.version>
 <!-- 注mysql的版本和druid的版本一定要搭配,否则会有问题,目前这两个版本是搭配好的 -->
 <mysql.version>8.0.11</mysql.version>
 <druid.version>1.1.17</druid.version>
 <spring.version>5.1.8.release</spring.version>
 <cglib.version>3.2.12</cglib.version>
 <atomikos.version>5.0.0</atomikos.version>
 <aspectjweaver.version>1.9.4</aspectjweaver.version>
 <aspectjrt.version>1.5.4</aspectjrt.version>
 <jta.version>1.1</jta.version>
 <mybatise.version>3.2.0</mybatise.version>
 <mybatis.spring>1.2.0</mybatis.spring>
 <log4j.version>1.2.17</log4j.version>
 <junit.version>4.12</junit.version>
 <cglib.version>3.2.4</cglib.version>
</properties>
<dependencies>
 <dependency>
 <groupid>org.mybatis</groupid>
 <artifactid>mybatis</artifactid>
 <version>${mybatise.version}</version>
 </dependency>
 <dependency>
 <groupid>org.mybatis</groupid>
 <artifactid>mybatis-spring</artifactid>
 <version>${mybatis.spring}</version>
 </dependency>
 <dependency>
 <groupid>com.atomikos</groupid>
 <artifactid>atomikos-util</artifactid>
 <version>${atomikos.version}</version>
 </dependency>
 <dependency>
 <groupid>com.atomikos</groupid>
 <artifactid>transactions</artifactid>
 <version>${atomikos.version}</version>
 </dependency>
 <dependency>
 <groupid>com.atomikos</groupid>
 <artifactid>transactions-jta</artifactid>
 <version>${atomikos.version}</version>
 </dependency>
 <dependency>
 <groupid>com.atomikos</groupid>
 <artifactid>transactions-jdbc</artifactid>
 <version>${atomikos.version}</version>
 </dependency>
 <dependency>
 <groupid>com.atomikos</groupid>
 <artifactid>transactions-api</artifactid>
 <version>${atomikos.version}</version>
 </dependency>
 <dependency>
 <groupid>javax.transaction</groupid>
 <artifactid>jta</artifactid>
 <version>${jta.version}</version>
 </dependency>
 <dependency>
 <groupid>cglib</groupid>
 <artifactid>cglib-nodep</artifactid>
 <version>${cglib.version}</version>
 </dependency>
 <dependency>
 <groupid>org.springframework</groupid>
 <artifactid>spring-test</artifactid>
 <version>${spring.version}</version>
 </dependency>
 <dependency>
 <groupid>org.springframework</groupid>
 <artifactid>spring-web</artifactid>
 <version>${spring.version}</version>
 </dependency>
 <dependency>
 <groupid>org.springframework</groupid>
 <artifactid>spring-tx</artifactid>
 <version>${spring.version}</version>
 </dependency>
 <dependency>
 <groupid>org.springframework</groupid>
 <artifactid>spring-beans</artifactid>
 <version>${spring.version}</version>
 </dependency>
 <dependency>
 <groupid>org.springframework</groupid>
 <artifactid>spring-jdbc</artifactid>
 <version>${spring.version}</version>
 </dependency>
 <dependency>
 <groupid>org.springframework</groupid>
 <artifactid>spring-webmvc</artifactid>
 <version>${spring.version}</version>
 </dependency>
 <dependency>
 <groupid>org.springframework</groupid>
 <artifactid>spring-orm</artifactid>
 <version>${spring.version}</version>
 </dependency>
 <dependency>
 <groupid>org.springframework</groupid>
 <artifactid>spring-context-support</artifactid>
 <version>${spring.version}</version>
 </dependency>
 <!-- spring aop依赖jar -->
 <dependency>
 <groupid>org.aspectj</groupid>
 <artifactid>aspectjweaver</artifactid>
 <version>${aspectjweaver.version}</version>
 </dependency>

 <dependency>
 <groupid>aspectj</groupid>
 <artifactid>aspectjrt</artifactid>
 <version>${aspectjrt.version}</version>
 </dependency>
 <!-- cglib -->
 <dependency>
 <groupid>cglib</groupid>
 <artifactid>cglib</artifactid>
 <version>${cglib.version}</version>
 </dependency>
 <dependency>
 <groupid>mysql</groupid>
 <artifactid>mysql-connector-java</artifactid>
 <version>${mysql.version}</version>
 </dependency>
 <dependency>
 <groupid>com.alibaba</groupid>
 <artifactid>druid</artifactid>
 <version>${druid.version}</version>
 </dependency>
 <dependency>
 <groupid>junit</groupid>
 <artifactid>junit</artifactid>
 <version>${junit.version}</version>
 <scopte>test</scope>
 </dependency>
</dependencies>

spring-application-context.xml

<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" 
  xmlns:p="http://www.springframework.org/schema/p"
  xmlns:tx="http://www.springframework.org/schema/tx"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:aop="http://www.springframework.org/schema/aop"
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xmlns:task="http://www.springframework.org/schema/task"
  xsi:schemalocation="http://www.springframework.org/schema/beans 
  http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
  http://www.springframework.org/schema/mvc
  http://www.springframework.org/schema/mvc/spring-mvc-3.1.xsd
  http://www.springframework.org/schema/context 
  http://www.springframework.org/schema/context/spring-context-3.1.xsd
  http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
  http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
  http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd">
  
   <!-- 1. 数据源配置 -->
  <context:property-placeholder location="classpath*:*.properties" file-encoding="utf8" />
 <bean id="utf8" class="java.lang.string">
 <constructor-arg value="utf-8"></constructor-arg>
 </bean>
  <!-- 开启异步任务(同时开启定时器注解扫描) -->
  <task:annotation-driven />
  <!-- 使用@aspectj风格的切面声明 -->
  <!-- <aop:aspectj-autoproxy/> -->
  <!-- 使用annotation自动注册bean -->
  <!-- 在主容器中不扫描@controller注解,在springmvc中只扫描@controller注解 -->
  <context:component-scan base-package="net.xiake6"><!-- base-package 如果多个,用“,”分隔 --> 
    <context:exclude-filter type="annotation" expression="org.springframework.stereotype.controller" />
  </context:component-scan> 
  <!-- 引入mybatis配置 -->
  <!-- <import resource="spring-mybatis-atomikos-druid.xml"/> -->
  <import resource="spring-mybatis-atomikos-druid.xml"/>
</beans>

spring-mybatis-atomikos-druid.xml

<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" 
  xmlns:jee="http://www.springframework.org/schema/jee"
  xmlns:tx="http://www.springframework.org/schema/tx" 
  xmlns:context="http://www.springframework.org/schema/context" 
  xmlns:aop="http://www.springframework.org/schema/aop"
  xsi:schemalocation="http://www.springframework.org/schema/beans 
  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
  http://www.springframework.org/schema/tx 
  http://www.springframework.org/schema/tx/spring-tx-3.0.xsd 
  http://www.springframework.org/schema/jee 
  http://www.springframework.org/schema/jee/spring-jee-3.0.xsd 
  http://www.springframework.org/schema/aop 
  http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
  http://www.springframework.org/schema/context 
  http://www.springframework.org/schema/context/spring-context-3.0.xsd"
  default-lazy-init="true">  

 <context:annotation-config />
 <!-- 使用druid使为xa数据源 -->
  <bean id="abstractxadatasource" class="com.atomikos.jdbc.atomikosdatasourcebean" init-method="init" destroy-method="close" abstract="true">
    <property name="xadatasourceclassname" value="com.alibaba.druid.pool.xa.druidxadatasource"/>
    <property name="xaproperties">
      <props>
       <prop key="driverclassname">${jdbc.driverclassname}</prop>
       <!-- 配置初始化大小、最小、最大 -->
  <prop key="initialsize">10</prop>
  <prop key="minidle">3</prop>
  <prop key="maxactive">100</prop>
  <!-- 配置获取连接等待超时的时间 -->
  <prop key="maxwait">60000</prop>
  <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
  <prop key="timebetweenevictionrunsmillis">60000</prop>
  <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
  <prop key="minevictableidletimemillis">300000</prop>
  <prop key="validationquery">select 'x'</prop>
  <prop key="testwhileidle">true</prop>
  <prop key="testonborrow">false</prop>
  <prop key="testonreturn">false</prop>
  <!-- 配置监控统计拦截的filters -->
  <prop key="filters">stat</prop>
      </props>
    </property>
  </bean>
  <!-- 配置数据源一 -->
  <bean id="datasourceone" parent="abstractxadatasource">
    <property name="uniqueresourcename">
      <value>datasourceone</value>
    </property>
    <property name="xaproperties">
      <props>
        <prop key="url">${jdbc.url}</prop>
        <prop key="username">${jdbc.username}</prop>
        <prop key="password">${jdbc.password}</prop>
      </props>
    </property>
  </bean>
  <!--配置数据源二-->
  <bean id="datasourcetwo" parent="abstractxadatasource">
    <property name="uniqueresourcename">
      <value>datasourcetwo</value>
    </property>
    <property name="xaproperties">
      <props>
        <prop key="url">${jdbc.two.url}</prop>
        <prop key="username">${jdbc.two.username}</prop>
        <prop key="password">${jdbc.two.password}</prop>
      </props>
    </property>
  </bean>
  
  <!--mybatis的相关配置-->
  <bean id="sqlsessionfactoryone" class="org.mybatis.spring.sqlsessionfactorybean">
    <property name="datasource" ref="datasourceone"/>
    <property name="mapperlocations" value="classpath*:mapping/ds1/*.xml"/>
  </bean>
  
  <bean id="sqlsessionfactorytwo" class="org.mybatis.spring.sqlsessionfactorybean">
    <property name="datasource" ref="datasourcetwo"/>
    <property name="mapperlocations" value="classpath*:mapping/ds2/*.xml"/>
  </bean>
  
  <!--配置mybatis映射文件自动扫描-->
  <bean class="org.mybatis.spring.mapper.mapperscannerconfigurer">
    <property name="basepackage" value="net.xiake6.dao.ds1"/>
    <property name="sqlsessionfactorybeanname" value="sqlsessionfactoryone"/>
  </bean>

  <bean class="org.mybatis.spring.mapper.mapperscannerconfigurer">
    <property name="basepackage" value="net.xiake6.dao.ds2"/>
    <property name="sqlsessionfactorybeanname" value="sqlsessionfactorytwo"/>
  </bean>

  <!--配置分布式事务-->
  <bean id="atomikostransactionmanager" class="com.atomikos.icatch.jta.usertransactionmanager" init-method="init" destroy-method="close">
    <property name="forceshutdown" value="false"/>
  </bean>
  <bean id="atomikosusertransaction" class="com.atomikos.icatch.jta.usertransactionimp">
    <property name="transactiontimeout" value="3000"/>
  </bean>
  <!--jta事务管理器-->
  <bean id="jtatransactionmanager" class="org.springframework.transaction.jta.jtatransactionmanager">
    <property name="transactionmanager">
      <ref bean="atomikostransactionmanager"/>
    </property>
    <property name="usertransaction">
      <ref bean="atomikosusertransaction"/>
    </property>
    <property name="allowcustomisolationlevels" value="true"/>
  </bean>
  
  
  <aop:config proxy-target-class="true"> 
    <aop:advisor pointcut="execution(* *net.xiake6.service..*(..))" advice-ref="txadvice" /> 
  </aop:config> 
  <tx:advice id="txadvice" transaction-manager="jtatransactionmanager"> 
    <tx:attributes> 
      <tx:method name="insert*" propagation="required" read-only="false" rollback-for="*exception"/>
      <tx:method name="add*" propagation="required" read-only="false" rollback-for="*exception" /> 
      <tx:method name="save*" propagation="required" read-only="false" rollback-for="*exception" />  
      <tx:method name="delete*" propagation="required" read-only="false" rollback-for="*exception" /> 
      <tx:method name="del*" propagation="required" read-only="false" rollback-for="*exception" /> 
      <tx:method name="update*" propagation="required" read-only="false" rollback-for="*exception" />
      <tx:method name="select*" propagation="required" read-only="true" />
      <tx:method name="query" propagation="required" read-only="true" />
    </tx:attributes> 
  </tx:advice>
  
  <!-- 配置事务管理 -->
  <tx:annotation-driven transaction-manager="jtatransactionmanager" />
</beans>

jdbc.properties

#mysql 6.*以上
jdbc.driverclassname = com.mysql.cj.jdbc.driver
jdbc.url = jdbc:mysql://127.0.0.1:3306/test?servertimezone=utc&useunicode=true&characterencoding=utf-8&pinglobaltxtophysicalconnection=true&usessl=false
jdbc.username =root
jdbc.password =root

jdbc.two.url = jdbc:mysql://127.0.0.1:3306/test?servertimezone=utc&useunicode=true&characterencoding=utf-8&pinglobaltxtophysicalconnection=true&usessl=false
jdbc.two.username =root
jdbc.two.password =root

jta.properties

com.atomikos.icatch.service=com.atomikos.icatch.standalone.usertransactionservicefactory
com.atomikos.icatch.console_file_name=tm.release.out
com.atomikos.icatch.log_base_name=tm.releaselog
com.atomikos.icatch.tm_unique_name=com.atomikos.spring.jdbc.tm.release
com.atomikos.icatch.console_log_level=info

testinsert.java

@contextconfiguration(value = {"classpath:spring-application-context.xml"})
@runwith(springjunit4classrunner.class)
public class testinsert {
 private logger logger = loggerfactory.getlogger(testinsert.class);
 @autowired
 private batchinsertservice batchinsertservice;

 @test
 public void insert(){
 
 long starttime = system.currenttimemillis();
 user user = new user();
 user.setname("user_"+(int)(math.random()*100));
 user.setage((int)(math.random()*100));
 
 custinfo info = new custinfo();
 info.setphone("123456789"+(int)(math.random()*100));
 batchinsertservice.insert(user,info);
 
 long endtime = system.currenttimemillis();
 logger.info("共耗时:{}毫秒",endtime -starttime);
 }
}

batchinsertservice.java

@service
public class batchinsertservice {
 private logger logger = loggerfactory.getlogger(batchinsertservice.class);
 @autowired
 private userservice userservice;
 @autowired
 private custinfoservice custinfoservice;
 @transactional(rollbackfor= {exception.class,runtimeexception.class})
 public void insert(user user,custinfo custinfo) {
 int insertuser = userservice.insert(user);
 logger.info("insertuser={}",insertuser);
 int insertcustinfo = custinfoservice.insert(custinfo);
 logger.info("insertcustinfo={}",insertcustinfo);
 }
}

userservice.java

@service
public class userservice {
 @autowired
 private usermapper usermapper;
 
 public int insert(user record) {
 int result = usermapper.insert(record);
 return result;
 }
 
}

custinfoservice.java

@service
public class custinfoservice {
 @autowired
 private custinfomapper custinfomapper;

 public int insert(custinfo record) {
 int result = custinfomapper.insert(record);
 long now = system.currenttimemillis();
 // 模拟一个异常
 if (now % 2 == 0) {
  throw new runtimeexception("custinfomapper throws test insert exception");
 }
 return result;
 }
}

mapper和bean等就不列出来了,完成的示例工程在github: https://github.com/fenglibin/druidwithatomikos

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网