RocketMQ事务消息原理和实现

    技术2022-07-12  87

    写在前面

    本文参考RocketMQ github文档和《RocketMQ技术内幕》书籍相关内容。

     

    事务消息原理

    RocketMQ事务消息流程概要

    RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

    上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

     

    1.事务消息发送及提交:

    (1) 发送消息(half消息)。

    (2) 服务端响应消息写入结果。

    (3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

    (4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)【实际使用时这一步一般都是不进行提交或回滚】

     

    2.补偿流程:

    (1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

    (2) Producer收到回查消息,检查回查消息对应的本地事务的状态

    (3) 根据本地事务状态,重新Commit或者Rollback

    其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

     

    RocketMQ事务消息设计

    1.事务消息在一阶段对用户不可见

    在RocketMQ事务消息流程中,1阶段时,消息就会发送到broker端,但是在消息提交之前,该消息对消费者是不可见的。这样就是避免消费者消费到事务未提交的数据,类似于数据库隔离级别的 读已提交 级别,避免脏读。

    实现原理是这样的:

        如果消息是half消息,则备份该消息的主题与消费队列信息,然后将该消息topic改为RMQ_SYS_TRANS_HALF_TOPIC,消费队列置为0.

    由于消费者并未订阅该主题,因此不会拉取到该消息。

        然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

    2.Commit和Rollback操作以及Op消息的引入

        在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;

        如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。

        RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

    3.事务提交处理

        首先从结束事务请求命令中获取消息的物理偏移量( commitlogOffset ),其实现逻辑TransactionalMessageService#.commitMessage 实现    然后恢复消息的主题 消费队列,构建新的消息对象,由 TransactionalMessageService#endMessageTransaction 实现    然后将消息再次存储在 commitlog 文件中,此时的消息主题则为业务方发送的消息,将被转发到对应的消息消费队列,供消息消费者消费,其实现由 TransactionalMessageService#sendFinalMessage 实现    消息存储后,删除 prepare 消息,其实现方法并不是真正的删除,而是将 prepare消息存储到 RMQ_SYS_TRANS_OP_HALF TOPIC 主题中,表示该事务消息( prepare 状态的消息)已经处理过(提交或回滚),为未处理的事务进行事务回查提供查找依据    事务的回滚与提交的唯一差别是无须将消息恢复原主题,直接删除 prepare 消息即可,同样是将预处理消息存储在 RMQ_SYS_TRANS_OP_HALF _TOPIC 主题中,表示已处理过该消息

    4.补偿机制,事务回查

        如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。

        RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。

        Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。

        值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。

        

    事务消息发送过程【从源码入手】

    TransactionMQProducer

    RocketMQ提供的事务消息发送API是TransactionMQProducer类

    该类主要有下面两个关键属性

    private ExecutorService executorService;//事务状态回查异步执行线程池 private TransactionListener transactionListener;//事务监听器

    其中transactionListener是我们创建事务消息生产者时需要注册的,用以执行和回查本地事务。

    public interface TransactionListener {     /**      * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.      * 当发送事务性prepare(half)消息成功时,将调用此方法以执行本地事务。      * @param msg Half(prepare) message      * @param arg Custom business parameter      * @return Transaction state      */     LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);     /**      * When no response to prepare(half) message. broker will send check message to check the transaction status, and this      * method will be invoked to get local transaction status.      * 本地事务状态回查      * @param msg Check message      * @return Transaction state      */     LocalTransactionState checkLocalTransaction(final MessageExt msg); }

    事务消息的发送实际调用的还是DefaultMQProducerImpl的sendMessageInTransaction()方法

    public TransactionSendResult sendMessageInTransaction(final Message msg,     final LocalTransactionExecuter localTransactionExecuter, final Object arg)     throws MQClientException {     //获取事务监听器,如果没有注册事务监听器的话则直接抛出异常     TransactionListener transactionListener = getCheckListener();     if (null == localTransactionExecuter && null == transactionListener) {         throw new MQClientException("tranExecutor is null", null);     }     // ignore DelayTimeLevel parameter     //清除延时属性     if (msg.getDelayTimeLevel() != 0) {         MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);     }     Validators.checkMessage(msg, this.defaultMQProducer);     //为消息设置属性     SendResult sendResult = null;     //设置TRAN_MSG 为 true     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");     //设置PGROUP     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());     try {         //发送消息         sendResult = this.send(msg);     } catch (Exception e) {         throw new MQClientException("send message Exception", e);     }     LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;     Throwable localException = null;     switch (sendResult.getSendStatus()) {         //消息发送成功则执行本地事务         case SEND_OK: {             try {                 if (sendResult.getTransactionId() != null) {                     msg.putUserProperty("__transactionId__", sendResult.getTransactionId());                 }                 String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                 if (null != transactionId && !"".equals(transactionId)) {                     msg.setTransactionId(transactionId);                 }                 if (null != localTransactionExecuter) { //这个已经不使用了,使用下面的新的事务API                     localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);                 } else if (transactionListener != null) {                     log.debug("Used new transaction API");                     //执行本地事务                     localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                 }                 if (null == localTransactionState) {                     localTransactionState = LocalTransactionState.UNKNOW;                 }                 if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {                     log.info("executeLocalTransactionBranch return {}", localTransactionState);                     log.info(msg.toString());                 }             } catch (Throwable e) {                 log.info("executeLocalTransactionBranch exception", e);                 log.info(msg.toString());                 localException = e;             }         }         break;         //如果消息发送失败,则设置本次事务状态为回滚         case FLUSH_DISK_TIMEOUT:         case FLUSH_SLAVE_TIMEOUT:         case SLAVE_NOT_AVAILABLE:             localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;             break;         default:             break;     }     try {         //结束事务,根据本地事务执行状态执行提交、回滚或暂不处理         this.endTransaction(sendResult, localTransactionState, localException);     } catch (Exception e) {         log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);     } ... }

     

    TransactionListener#executeLocalTransaction()方法返回的执行状态有以下三种

    public enum LocalTransactionState {     COMMIT_MESSAGE,//提交事务,     ROLLBACK_MESSAGE,//回滚事务     UNKNOW,//结束事务,但不做任何处理 }

    一般在使用时,我们在executeLocalTransaction()方法中,返回UNKNOW,根据《RocketMQ技术内幕》书中所说

    由于this.endTransaction 方法执行时,其业务事务还未提交,故而在调用executeLocalTranscation方法时返回UNKNOW,事务具体提交还是回滚通过事务状态回查方法来获取。

     

    Processed: 0.025, SQL: 9