基于本地日志的可靠消息型事务


一、背景

业务经常会使用消息中间件进行上下游解耦或者异步削峰填谷的目的。通常处理流程是如下:

  1. 操作本地数据库,完成自身业务逻辑
  2. 发送消息,通知下游业务

上述过程可能会出现操作数据库成功,发送消息失败的情形,这就导致上下游数据不一致了。为了解决上述问题,业务可以配置定时任务,对发送失败的消息进行重试,直到消息发送成功,当然下游业务必须保证接口幂等。
该方案虽然能实现最终一致性,每条业务线实现上述方案,研发成本比较高,故我们希望调研一套通用的可靠消息通知型事物。

二、目标

设计一套基于本地事务的可靠消息通知型事物方案,降低业务研发成本;同时尽可能降低对业务代码的侵入性,业务通过注解即可接入。

三、技术设计

/**
 * Created by yeming on 2021/3/2.
 */
@Service
public class LocalTransactionRetryService {
    @Autowired
    private OrderService orderService;
  
    @SwanReliableNotifyTransactional(
            domain = "yeming_test_retry",
            callbackMethod = "sendBizMsg",
            retryFor = RuntimeException.class
    )
    public Object reliableInform(Object param) {
        Object object = orderService.process();
        RetryContext context = RetryTransactionContextUtil.getRetryContext();
        context.putMtraceParams("key", object.toString());
        return object;
    }

    public void sendBizMsg(LocalRetryTransaction transaction, Throwable e) {
        RetryContext context = RetryTransactionContextUtil.getRetryContext();
        Map<String, String> mtraceParams =  context.getMtraceParams();
        String value = mtraceParams.get("key");
        System.out.println("sendBizMsg success");
    }
}

方案整体上分为以下步骤:

  1. 框架启动:扫描特定注解的方法,基于AOP对该方法织入统一的事务处理逻辑;
  2. 事务运行时:首先运行上述切面逻辑,开启Spring事务,在执行业务方法reliableInform成功后,在同一个事务里面插入事务日志,方法执行完成后,Spring事务自动关闭;然后通过反射调用callbackMethod指定的方法,成功后删除事务日志
  3. 事务异常重试(可选):扫描步骤2中的事务日志表,若扫描到事务日志,则调用sendBizMsg方法,直到该方法成功后,删除事务日志。
  4. 若事务多次重试失败,事务日志会上传服务端,并大象通知,业务可以在Swan管理平台人工重试

3.1、事务启动过程

  1. 框架启动时候,扫描是否有方法标注了@SwanReliableNotifyTransactional
  2. 注解,如果没有则跳过后续AOP处理流程,继续后续启动过程
  3. 扫描到上述注解后,解析出注解的全局事务名称domain和回调方法callbackMethod(该方法就配置成背景中提到的发送消息方法的名称)
  4. 检查是否配置了Spring事务管理器 DataSourceTransactionManager ,如果没有配置该注解,则直接抛出异常,终止启动流程(由于本方案基于Spring事务实现)
  5. 获取事务管理器关联的DataSource
  6. 判断DataSource类型,这里我们只支持Zebra ShardDataSource和GroupDataSource ,不支持 ZebraRoutingDataSource在内的其他DataSource
  7. 如果是GroupDataSource,或者虽然是ShardDataSource,但是是单库分表,则获取该DataSource对应的 SqlSessionFactory ,然后构建操作事务日志的Dao,方便后续操作事务日志
  8. 如果是ShardDataSource,我们需要通过反射获取ShardDataSource 中关联的真实操作不同分库的 GroupDataSource,并基于AOP对GroupDataSource 的 getConnection 方法进行拦截

重点介绍下上述 getConnection 方法:我们每次的数据库操作,都会首先调用DataSource的 getConnection 方法获取一个链接,上文提到我们基于Spring事务的方案,Spring事务每次调用 getConnection方法获取链接后,会将该链接绑定到当前线程,后续同一个线程中的操作使用同一根链接自然能保证事务。对于分库分表事务 ShardDataSource,后续同线程调用 getConnection方法 获取到的是 ShardConnection,这里由于还执行到解析业务分表规则逻辑,故不知道后续会访问哪一个分库,故我们采用代理 ShardDataSource 中不同分库对应的 GroupDataSource 对象的方案

  1. 如果调用了GroupDataSource的 非 getConnection方法,则执行被代理对象自身方法
  2. 如果调用了GroupDataSource的getConnection方法,在获取到connection后,将该connection绑定到当前线程,后续插入事务日志直接从当前线程获取连接,即可保证在同一事务。
for(final Map.Entry<String, DataSource> entry : dataSourcePool.entrySet()){
    ProxyFactory proxyFactory=new ProxyFactory();
    proxyFactory.setTarget(entry.getValue());

    MethodInterceptor methodInterceptor = new MethodInterceptor() {
        @Override
        //只有ShardDataSource才会调用invoke方法
        public Object invoke(MethodInvocation invocation) throws Throwable {
            //仅仅拦截getConnection方法
            if(invocation.getMethod().getName().equals("getConnection")){
                //只有SwanReliableNotifyTransactional标注的方法才会把链接绑定到当前线程
                if(LocalTransactionContext.isLocalTransactionStart()){
                    Connection connection = entry.getValue().getConnection();
                    LocalTransactionContext.bindConnection(connection);
                    LocalTransactionContext.bindDataSource(entry.getValue());

                    return connection;
                }
            }

            //非SwanReliableNotifyTransactional标注的方法走原有逻辑
            return invocation.proceed();
        }
    };

    proxyFactory.addAdvice(methodInterceptor);
    Object proxy = proxyFactory.getProxy();
    map.put(entry.getKey(), proxy);
}

3.2、事务运行时

  1. 判断当前方法是否存在事务注解,若不存在,则执行被代理对象自身方法
  2. 开启Spring事务(这里利用Spring事务传播特性,如果业务方法使用Spring默认注解,将会自动添加到同一事务),执行业务操作,然后获取当前线程绑定的Connection,若是分库分表情况,由于框架启动时候我们代理了ShardDataSource中的GroupDataSource getConnection方法,该方法获取Connect后会绑定到当前线程,故此时我们能获取到与业务操作同库的Connection,基于该Connection插入事务日志(分库分表场景业务方法必须保证只能访问同一个库);若获取不到Connection,说明是单库场景,我们直接使用事务日志Dao插入事务日志。上述操作保证事务日志的写入和业务在同一个事务中,若失败,则抛出异常,事务回滚
    @Transactional
    public Object createTransaction(MethodInvocation userMethodInvocation, LocalRetryTransaction transaction) throws Throwable {
      //执行事务注解标记的业务真实方法逻辑
      Object object = userMethodInvocation.proceed();
    
      Connection connection = LocalTransactionContext.getConnection();
      if(connection != null){
        //分库分表情况下直接获取线程绑定的GroupConnection操作
        SortedMap<Integer, Object> params = buildShardParams(transaction);
        JdbcUtils.executeUpdate(connection, RetryDBRepository.INSERT_SQL, params);
      }else{
        //非分库分表情况走原有swanLocalTransactionDao逻辑
        Map<String, Object> params = buildGroupParams(transaction);
        swanLocalTransactionDao.insertTransactionLog(params);
      }
    
      return  object;
    }
  3. 执行业务定义的callBack方法,若执行失败,则抛出异常(这里业务评估下是否需要抛出异常)。如果业务对性能有要求,callBack方法可以支持异步操作
  4. 删除事务日志,若执行失败,catch住异常。

上述步骤2执行成功后,业务逻辑实际成功,后续3和4由框架重试保证最终执行成功,故要求业务callback方法支持幂等。


文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
IDEA调试Elasticsearch源码 IDEA调试Elasticsearch源码
本文首先会介绍在Mac上如何通过docker来安装es单机版本和集群版本,然后重点叙述如何使用IDEA编译Elasticsearch源码并进行调试
2021-10-22
下一篇 
Mybatis如何从DAO到SQL Mybatis如何从DAO到SQL
MyBatis 是一款优秀的持久层框架,它支持自定义 SQL、存储过程以及高级映射。在项目中通过配置不同的mapper和xml,然后即可方便的对数据库进行操作。对于Mybatis的使用方法大家应该都很熟悉,本文不再赘述,本文重点叙述Mybatis的原理,详细分析Mybatis如何完成从mapper到sql的转化过程
2021-04-03
  目录