读写分离情况下的读自己写一致性

问题

分布式系统的一致性模型包括:强一致性、弱一致性、最终一致性,以及一些最终一致性的变种,如因果一致性、读自己写一致性等。

有个项目,数据存放在主从同步的mysql数据库中,写操作统一落在主库上。由于主从数据库物理位置相距较远(分别在国内机房和国外机房),同步的网络延迟较大,所以有可能因为网络延迟的原因,在读写分离的情况下,读不到自己刚才所写的数据(读从库)。例如用户在国外机房的应用中写完数据后,在延迟时间窗口内立即读,此时写数据还未同步到从库,读从库失败。也就是通常的读写分离方案在这种场景下解决不了读自己写一致性问题。读自己写一致性中的‘自己’可以是进程或系统、也可以是用户。在我们的项目中,’自己‘指的是系统中的用户,用户在系统中发布消息后,要能在消息列表中看到刚才发布的消息。

 
读写分离情况下的读自己写一致性
  

方案

   在通常的读写分离基础上考虑同步延迟窗口:用户在写操作(insert、update、delete)之后进行读(查询),如果是在写操作的同步延迟窗口之内读,则读取主库,其他情况下读从库。如果一个事务中有写操作,不管是否有读操作,肯定是操作主库。

   
读写分离情况下的读自己写一致性
 

 

Java实现

    使用spring AbstractRoutingDataSource(用于读写分离),Java 注解(Annotation)和aop和thread local。

   Spring提供了AbstractRoutingDataSource抽象类用于多数据源的访问也就是分库,可以继承该类覆盖来实现主从数据库的分库访问。
determineCurrentLookupKey
()方法来实现读写分离。

   RwResourceDesc注解如下,它用在service façade类(façade设计模式)的方法上,用于描述service方法涉及到哪些表或资源的读写。Controller类会调用service façade类的方法,façade类再调用service层的方法。

@Target({ElementType.METHOD})

@Retention(RetentionPolicy.RUNTIME)

public @interface RwResourceDesc {

       String[] readResources() default ""; //读哪些表

       String[] writeResources() default "";//写哪些表

}

例如OrderFacede.java中的下单方法,读product表,写order表

@RwResourceDesc(readResources = { "product" }, writeResources = { "order"} )

public void placeOrder(…) throws Exception {…}

 


读写分离情况下的读自己写一致性
  

Spring数据访问的xml配置大致如下:

<!-- 前面介绍的aop advice  -->

<bean id="contextAwareAdvice class="com.xxx.aop.advice.RWContextAwareAdvice"></bean>

   

<tx:advice id="txAdvice" transaction-manager="transactionManager">

<!-- the transactional semantics... -->

  <tx:attributes>

     <!-- all methods starting with 'get' are read-only -->

     <tx:method name="get*" read-only="true" propagation="SUPPORTS"/>

     <tx:method name="load*" read-only="true" propagation="SUPPORTS"/>

     <tx:method name="list*" read-only="true" propagation="SUPPORTS"/>

     <!-- other methods use the default transaction settings (see below) -->

     <tx:method name="*" propagation="REQUIRED"/>

  </tx:attributes>

</tx:advice>

      

<aop:config>

   <aop:pointcut id="serviceFacadeOperation" expression="execution(* com.xxx.service.facade.*.*(..))"/>

   <!—注意order属性的大小,决定advice的调用顺序  -->

   <aop:advisor  order="1" advice-ref="contextAwareAdvice"  pointcut-ref="serviceFacadeOperation"/>

   <aop:advisor  order="2" advice-ref="txAdvice" pointcut-ref="serviceFacadeOperation"/>

</aop:config>

 

 

aop advice类:

public class RWContextAwareAdvice implements MethodInterceptor {

  //注入该属性

  private ContextAwareManager contextAwareManager;

      

  public void setContextAwareManager(ContextAwareManager contextAwareManager) {

            this.contextAwareManager = contextAwareManager;

  }

 

  public Object invoke(MethodInvocation invocation) throws Throwable {

     ContextAware context = contextAwareManager.getContext();

     RwResourceDesc rwResourceDesc =                          invocation.getMethod().getAnnotation(RwResourceDesc.class);

     if(null != rwResourceDesc){

         //如果有写,则强制读主库

         if(!rwResourceDesc.writeResources()[0].isEmpty()){

             //设置操作主库标志

             context.forceMaster();

         }

         else{

              //没有写,只有读。判断是读主库还是读从库

              if(!rwResourceDesc.readResources()[0].isEmpty()){

         //以context id加readResources中的表名作为key,如果redis中存在该key,                           if(context.matchChangedResourceTypeForRead(

                       rwResourceDesc.readResources())){

                        //读操作在‘自己’的写操作时间窗口之内,读主库

                     context.forceMaster();

                    }

              }

          }

      }

            

     try{

         Object result = invocation.proceed();

         return result;

     }

     finally{

         if(null != rwResourceDesc){

            //如果有写,则记录'自己'写了哪些资源,将key、value放到redis中,过期时间同value。

            if(!rwResourceDesc.writeResources()[0].isEmpty()){

                             context.recordChangedResourceType(rwResourceDesc.writeResources());

                             }

            }

        //复位thread local中的context id和读主库标志。

        contextAware.resetContext();

        }

             

  }

}

 

  使用RwResourceDesc注解来标示方法中读写的表有哪些,如果是使用myibatis作为dao框架,也可以写个myibatis拦截器捕获sql语句来判断读写的表。

相关推荐