使用nhmicro提供的micro-datasource嵌入式的解决微服务架构中分布式事务问题

服务器

浏览数:62

2020-6-22

应用原理: 使用micro-datasource数据源使事务与线程解耦,通过groupid在其他线程进行事务提交或回滚。 多个系统需要统一提交时,通过activemq发送提交消息(含有groupid),各系统收到消息后进行统一提交或回滚。

micro-datasource数据源与Mybatis或hibernate或jdbcTemplate等orm框架可以整合使用 原理是micro-datasource包中提供了路由数据源方案,通过aop动态切换普通数据源和分布式数据源 使用普通数据源时仍接受传统事务管理器管理

jar包下载: 需要使用nh-micro-datasource.jar 依赖

log4j.jar\org.springframework.beans.jar\org.springframework.aop.jar\org.springframework.core.jar\aopalliance.jar

<dependency>
<groupId>com.github.jeffreyning</groupId>
<artifactId>nh-micro-datasource</artifactId>
<version>1.0.0-RELEASE</version>
</dependency>

jms通知功能需要使用nh-micro-datasource-msg.jar 依赖geronimo-j2ee-management.jar/geronimo-jms.jar/activemq-core.jar

<dependency>
<groupId>com.github.jeffreyning</groupId>
<artifactId>nh-micro-datasource-msg</artifactId>
<version>1.0.0-RELEASE</version>
</dependency>

分布式数据源配置样例:

<!– micro分布式数据源 -->
     <bean id="local_xa_dataSource" class="com.nh.micro.datasource.MicroXaDataSourceFactory" factory-method="createDataSource" init-method=“init”>
       <!– 多个micro分布式数据源实例时可设置不同的dataSourceId 默认为default -->
        <constructor-arg value=“default”/>
       <property name="url" value="${database.url}" />
        <property name="username" value="${database.user}" />
        <property name="password" value="${database.password}" />
        <property name="minSize" value=“5" />
        <property name="maxSize" value=“20" />
       <property name=“dirverClassName” value=“com.mysql.jdbc.Driver” />
       <property name=“validationQuery” value=“select 'x' from dual” />
     </bean>
 <!-- micro动态切换数据源配置 -->
  <bean id="dynamic_xa_dataSource" class="com.nh.micro.datasource.MicroDynamicDataSource" >
  <property name="targetDataSources">   
  <map key-type="java.lang.String">
<!– 设置目标数据源为分布式事务数据源 --> 
  <entry key="local_xa_dataSource" value-ref="local_xa_dataSource"></entry>    
  </map>  
  </property> 
<!-- 默认目标数据源为主库普通数据源 --> 
  <property name="defaultTargetDataSource" ref="dataSource"/>
  </bean>
 <!-- define the Mybatis SqlSessionFactory -->
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<!– Mybatis引入micro动态切换数据源实例 -->
        <property name="dataSource" ref="dynamic_xa_dataSource"/>
        <property name="typeAliasesPackage" value="foo.model"/>
    </bean>
 <aop:config>
        <aop:pointcut id="testPointcut" expression="execution( * foo.repository.TestRep.*(..))"/>
        <aop:advisor pointcut-ref="testPointcut" advice-ref="dataSourceAdvice"/>
    </aop:config>

设置切换分布式数据源的aop:

<!– 设置service层或dao层aop用来动态切换数据源 -->
<bean class="com.nh.micro.datasource.DataSourceAdvice" id="dataSourceAdvice">
        <property name="readMethodList">
            <list>
            </list>
        </property>
</bean>
<aop:config>
        <aop:pointcut id="testPointcut" expression="execution( * foo.repository.TestRep.*(..))"/>
        <aop:advisor pointcut-ref="testPointcut" advice-ref="dataSourceAdvice"/>
    </aop:config>

代码中通过注解设置哪些方法需切换为分布式数据源:

//Mybatis的Dao接口代码示例,使用@ChangeDataSource注解决定是否切换为分布式事务数据源
package foo.repository;
import java.util.Map;
import com.nh.micro.datasource.ChangeDataSource;
public interface TestRep {
@ChangeDataSource(name="local_xa_dataSource")
public int updateInfo(Map paramMap);
@ChangeDataSource(name="local_xa_dataSource")
public int insertInfo(Map paramMap);
}

执行过程样例:

//设置xaGroupId和xaBranchId
MicroXaDataSource.setXid(groupId,branchId);
//从Spring中取dao接口对象调用相关业务方法
TestRep testRep=MicroContextHolder.getContext().getBean("testRep");
Map paramMap=new HashMap();
paramMap.put("meta_key", metaKey);
paramMap.put("id", id);
testRep.insertInfo(paramMap);
//可以在其他的线程中根据xaGroupId提交或回滚分布式事务
MicroXaDataSourceFactory.getDataSourceInstance(“default”).commit(groupid);

设置事务提交消息接收和发送对象:

//设置activemq发送对象,发送commit/rollback命令给其他系统
<bean class="com.nh.micro.datasource.msg.MicroDataSourceJmsReceiver" init-method="init">
<property name="jmsUrl" value="tcp://10.10.xx.xx:61616"></property>
</bean>
//通知其他系统提交的命令是(groupid为参数)
//commit
MicroDataSourceJmsSender.sendXaMsg("commit", groupId);
//rollback
MicroDataSourceJmsSender.sendXaMsg(“rollback", groupId);
//设置activemq接收对象,接收从其他系统发来的commit/rollback命令,MicroDataSourceJmsReceiver内部收到消息后会负责根据groupid提交或回滚事务
<bean class="com.nh.micro.datasource.msg.MicroDataSourceJmsSender">
<property name="jmsUrl" value="tcp://10.10.xx.xx:61616"></property>
</bean>

作者:杰睿宁