利用Spring提供的事务监听ApplicationEvent完成事件

    技术2022-07-11  102

    前段时间开发项目时,碰到一个数据库事务还没提交,但是发送MQ已经被消费者消费,导致了数据不同步问题。

     具体是这样子的,一般我们会在@Service类中去处理数据库的操作及其他服务处理,一般都会在涉及到数据库的增删改的方法上添加@Transactional注解,表示这个方法被托管给spring处理事务。当整个流程执行完毕没有发生异常时,事务才会进行提交,这时候我在方法结束前,添加了MQ发送

    this.amqpTemplate.convertAndSend(AmqpExchange.XXX_CHANGE, AmqpExchange.XXX_CHANGE + "_ROUTING", message);

    这时候问题就出现了,当你发送的是数据id时,消费者需要使用这个id去数据库中查询一遍,但我们的代码虽然已经执行完毕,但事务还没有提交,数据库中关于这条数据并没有发生改变,所以这时候消费者获取到的数据就是我们还未操作之前的数据,这时候就会有问题。

    那么解决这个问题的方式很简单,就是使用Spring为我们提供的事务监听:

    首先需要定义一个事务监听事件,继承自ApplicationEvent:

    import org.springframework.context.ApplicationEvent; /** * 事务监听事件 * @author hexm * @date 2020/6/9 14:15 */ public class AlterTransactionalEvent extends ApplicationEvent { public AlterTransactionalEvent(Apply source) { super(source); } @Override public Apply getSource() { return (Apply) super.getSource(); } //注意这里我们添加了一个没有形参和返回值的方法接口,方便我们后续使用lambda表达式直接使用 @FunctionalInterface public interface Apply { /** * 执行 */ void apply(); } }

    然后我们接着创建一个监听类来监听这个事件:

    import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; /** * 事务监听 * * @author hexm * @date 2020/6/9 14:18 */ @Component public class AlterTransactionListener { @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void onHandler(AlterTransactionalEvent event) { AlterTransactionalEvent.Apply apply = event.getSource(); if (apply != null) { // 因为我们传入的是一个lambda表达式,所以可以像使用匿名函数一样去调用这个执行过程 apply.apply(); } } } phase = TransactionPhase.AFTER_COMMIT 表示触发事件后,提交已成功完成,这个也是默认项,不添加也可以。 此外还有BEFORE_COMMIT提交前、AFTER_ROLLBACK回滚、AFTER_COMPLETION不管是不是成功提交都执行。

    定义完成了监听器之后,就可以愉快的使用了:

    @Service public class Test{ @Autowired private AmqpTemplate amqpTemplate; @Autowired private ApplicationEventPublisher applicationEventPublisher; @Transactional(rollbackFor = Exception.class) public void deleteById(Long id){ // 执行数据库操作 ... //发送事务监听的事件,这里充分利用jdk8的lambda表达式的优势,将处理任务一并发送 applicationEventPublisher.publishEvent(new AlterTransactionalEvent(() -> { // 触发事务完成事件,执行发送MQ Message msg = new Message (id); //这个Message是自定义的,实际上可以是任意类型 this.amqpTemplate.convertAndSend(AmqpExchange.XXX_CHANGE, AmqpExchange.XXX_CHANGE + "_ROUTING", msg); })); } }

    这样就完成了事件监听的过程,需要注意的是,这里注入的是ApplicationEventPublisher 类来推送事件

    Processed: 0.012, SQL: 9