RocketMQ 9:事务消息

    技术2022-07-20  83

    1.事务消息实现思想

    RocketMQ 事务消息,是指发送消息事件和其他事件需要同时成功或同失败。比如银行转账, A 银行的某账户要转一万元到 B 银行的某账户。A 银 行发送“B 银行账户增加一万元” 这个消息,要和“从 A 银行账户扣除一万元”这个操作同时成功或者同时失败。 RocketMQ 采用两阶段提交的方式实现事务消息,TransactionMQProducer 处理上面情况的流程是,先发一个“准备从 B 银行账户增加一万元”的消息, 发送成功后做从 A 银行账户扣除一万元的操作 ,根据操作结果是否成功,确定之前的“准备从 B 银行账户增加一万元”的消息是做 commit 还是 rollback , RocketMQ 实现的具体流程如下: 1)发送方向 RocketMQ 发送“待确认”(Prepare)消息; 2 ) RocketMQ 将收到的“待确认”(一般写入一个 HalfTopic 主题<RMQ_SYS_TRANS_HALF_TOPIC>)消息持化成功后, 向发送方回复消息已经发送成功,此 时第一阶段消息发送完成。 发送方开始执行本地事件逻辑; 3)发送方根据事件执行结果向 RocketMQ 发送二次确认( Commit 还是 Rollback)消息 RocketMQ 收到 Commit 则将第一阶段消息标记为可投递(这些 消息才会进入生产时发送实际的主题 RealTopic),订阅方将能够收到该消息;收到 Rollback 状态则删除第一阶段的消息,订阅方接收不到该消息; 4)如果出现异常情况,步骤 3 提交的二次确认最终未到达 RocketMQ,服务器在经过固定时间段后将对“待确认”消息、发起回查请求; 5)发送方收到消息回查请求后(如果发送一阶段消息的 Producer 不能工作,回查请求将被发送到和 Producer 在同一个 Group 里的其他 Producer ), 通过检查对应消息的本地事件执行结果返回 Commit Roolback 状态。

    2.两阶段提交

    提交半事务是一个阶段,提交全事务和事务回查是另外一个阶段,所以称之为两阶段提交。

    3.事务状态回查机制

    RocketMQ 通过 TransactionalMessageCheckService 线程定时去检测 RMQ_SYS_ TRANS_ HALF_TOPIC 主题中的消息,回查消息的事务状态TransactionalMessageCheckService 的检测频率默认为 1 分钟,可通过在 broker.conf 文件中设置 transactionChecklnterval 来改变默认值,单位为毫秒。

    4.创建事务监听类TransactionListenerImpl,实现TransactionListener

    package org.example.transaction; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * 事务监听机制,主要是执行事务以及事务回查 **/ public class TransactionListenerImpl implements TransactionListener { //确保原子操作 private AtomicInteger transactionIndex = new AtomicInteger(0); //使用transactionId private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>(); //执行事务(图中第3阶段) public LocalTransactionState executeLocalTransaction(Message message, Object o) { //执行本地事务开始 int value = transactionIndex.getAndIncrement(); //业务处理 System.out.println("执行本地事务开始:" + value); int status = value % 3; localTrans.put(message.getTransactionId(), status); switch (status) { case 0: //LocalTransactionState.UNKNOW表示未知的事件,需要RocketMQ进一步服务业务进行确认该交易的处理 return LocalTransactionState.UNKNOW;//消息0,3,6,9进入事务回查 case 1: return LocalTransactionState.COMMIT_MESSAGE;//消息1,4,7成功commit case 2: return LocalTransactionState.ROLLBACK_MESSAGE;//消息2,5,8被抛弃 default: return LocalTransactionState.COMMIT_MESSAGE; } //执行本地事务结束 } //该方法用于RocketMQ与业务确认未提交事务的状态(一分钟执行一次)(图中第7阶段) public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("事务回查-----UNKNOW"); Integer status = localTrans.get(messageExt.getTransactionId()); //业务处理(1分钟) // int mod = messageExt.getTransactionId().hashCode() % 2; // if (status != null) { // switch (mod) { // case 0: // return LocalTransactionState.ROLLBACK_MESSAGE; // case 1: // return LocalTransactionState.COMMIT_MESSAGE; // default: // return LocalTransactionState.COMMIT_MESSAGE; // } // } return LocalTransactionState.COMMIT_MESSAGE; } }

    5.创建生产者TransactionProducer

    package org.example.transaction; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.concurrent.*; /** * 事务消息的生产者 **/ public class TransactionProducer { public static void main(String[] args) throws Exception { TransactionListener transactionListener = new TransactionListenerImpl(); //支持事务的生产者 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer"); producer.setNamesrvAddr("192.168.42.112:9876"); //设置用于事务消息处理的线程池 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(200), new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client_transaction_msg_check_thread"); return thread; } }); producer.setExecutorService(executorService); //设置事务监听器,监听器需要实现org.apache.rocketmq.client.producer.TransactionListener接口 //监听器中实现需要处理的业务逻辑,以及MQ中未确认的事务与业务的确认逻辑 producer.setTransactionListener(transactionListener); producer.start(); //生成不同的Tag,用于模拟不同的处理场景 String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); //以事务发送消息,并在事务消息被成功预写入到RocketMQ中后,执行用户定义的业务逻辑 //业务逻辑执行完之后,再实现业务消息的提交逻辑 SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println(sendResult.toString()); Thread.sleep(10); } //延长生产时间,用于调用事务回查checkLocalTransaction方法 for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }

    6.创建消费者TransactionConsumer

    package org.example.transaction; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; /** * 消费者-事务消息的消费者 */ public class TransactionConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_producer"); consumer.setNamesrvAddr("192.168.42.112:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("TransactionTopic", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { private Random random = new Random(); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { //设置自动提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(new Date()+"获取到消息开始消费:"+msg+",content:"+new String(msg.getBody())); } try { //模拟业务处理 TimeUnit.SECONDS.sleep(random.nextInt(5)); } catch (Exception e) { e.printStackTrace(); //返回处理失败,该消息后续可以继续被消费 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } //返回处理成功,该消息就不会被再次投递过来了 return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started"); } }

    7.启动消费者,再启动生产者,通过生产者打印可以看出有4次“事务回查-----UNKNOW”的打印,分别表示消息0,3,6,9进入回查

    8.查看消费者打印

    通过打印可以看出,消息1,4,7成功commit;消息0,3,6,9第一次进入事务回查,第二次成功commit;最后都被消费者消费

    Processed: 0.010, SQL: 9