1.事务消息实现思想
RocketMQ 事务消息,是指发送消息事件和其他事件需要同时成功或同失败。比如银行转账, A 银行的某账户要转一万元到 B 银行的某账户。A 银 行发送“B 银行账户增加一万元” 这个消息,要和“从 A 银行账户扣除一万元”这个操作同时成功或者同时失败。
RocketMQ 采用两阶段提交的方式实现事务消息,TransactionMQProducer 处理上面情况的流程是,先发一个“准备从 B 银行账户增加一万元”的消息, 发送成功后做从 A 银行账户扣除一万元的操作 ,根据操作结果是否成功,确定之前的“准备从 B 银行账户增加一万元”的消息是做 commit 还是 rollback , RocketMQ 实现的具体流程如下: 1)发送方向 RocketMQ 发送“待确认”(Prepare)消息; 2 ) RocketMQ 将收到的“待确认”(一般写入一个 HalfTopic 主题<RMQ_SYS_TRANS_HALF_TOPIC>)消息持化成功后, 向发送方回复消息已经发送成功,此 时第一阶段消息发送完成。 发送方开始执行本地事件逻辑; 3)发送方根据事件执行结果向 RocketMQ 发送二次确认( Commit 还是 Rollback)消息 RocketMQ 收到 Commit 则将第一阶段消息标记为可投递(这些 消息才会进入生产时发送实际的主题 RealTopic),订阅方将能够收到该消息;收到 Rollback 状态则删除第一阶段的消息,订阅方接收不到该消息; 4)如果出现异常情况,步骤 3 提交的二次确认最终未到达 RocketMQ,服务器在经过固定时间段后将对“待确认”消息、发起回查请求; 5)发送方收到消息回查请求后(如果发送一阶段消息的 Producer 不能工作,回查请求将被发送到和 Producer 在同一个 Group 里的其他 Producer ), 通过检查对应消息的本地事件执行结果返回 Commit Roolback 状态。
2.两阶段提交
提交半事务是一个阶段,提交全事务和事务回查是另外一个阶段,所以称之为两阶段提交。
3.事务状态回查机制
RocketMQ 通过 TransactionalMessageCheckService 线程定时去检测 RMQ_SYS_ TRANS_ HALF_TOPIC 主题中的消息,回查消息的事务状态TransactionalMessageCheckService 的检测频率默认为 1 分钟,可通过在 broker.conf 文件中设置 transactionChecklnterval 来改变默认值,单位为毫秒。
4.创建事务监听类TransactionListenerImpl,实现TransactionListener
package org
.example
.transaction
;
import org
.apache
.rocketmq
.client
.producer
.LocalTransactionState
;
import org
.apache
.rocketmq
.client
.producer
.TransactionListener
;
import org
.apache
.rocketmq
.common
.message
.Message
;
import org
.apache
.rocketmq
.common
.message
.MessageExt
;
import java
.util
.concurrent
.ConcurrentHashMap
;
import java
.util
.concurrent
.atomic
.AtomicInteger
;
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex
= new AtomicInteger(0);
private ConcurrentHashMap
<String, Integer> localTrans
= new ConcurrentHashMap<String, Integer>();
public LocalTransactionState
executeLocalTransaction(Message message
, Object o
) {
int value
= transactionIndex
.getAndIncrement();
System
.out
.println("执行本地事务开始:" + value
);
int status
= value
% 3;
localTrans
.put(message
.getTransactionId(), status
);
switch (status
) {
case 0:
return LocalTransactionState
.UNKNOW
;
case 1:
return LocalTransactionState
.COMMIT_MESSAGE
;
case 2:
return LocalTransactionState
.ROLLBACK_MESSAGE
;
default:
return LocalTransactionState
.COMMIT_MESSAGE
;
}
}
public LocalTransactionState
checkLocalTransaction(MessageExt messageExt
) {
System
.out
.println("事务回查-----UNKNOW");
Integer status
= localTrans
.get(messageExt
.getTransactionId());
return LocalTransactionState
.COMMIT_MESSAGE
;
}
}
5.创建生产者TransactionProducer
package org
.example
.transaction
;
import org
.apache
.rocketmq
.client
.producer
.SendResult
;
import org
.apache
.rocketmq
.client
.producer
.TransactionListener
;
import org
.apache
.rocketmq
.client
.producer
.TransactionMQProducer
;
import org
.apache
.rocketmq
.common
.message
.Message
;
import org
.apache
.rocketmq
.remoting
.common
.RemotingHelper
;
import java
.util
.concurrent
.*
;
public class TransactionProducer {
public static void main(String
[] args
) throws Exception
{
TransactionListener transactionListener
= new TransactionListenerImpl();
TransactionMQProducer producer
= new TransactionMQProducer("transaction_producer");
producer
.setNamesrvAddr("192.168.42.112:9876");
ExecutorService executorService
= new ThreadPoolExecutor(2, 5, 100, TimeUnit
.SECONDS
,
new ArrayBlockingQueue<Runnable>(200), new ThreadFactory() {
public Thread
newThread(Runnable r
) {
Thread thread
= new Thread(r
);
thread
.setName("client_transaction_msg_check_thread");
return thread
;
}
});
producer
.setExecutorService(executorService
);
producer
.setTransactionListener(transactionListener
);
producer
.start();
String
[] tags
= new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i
= 0; i
< 10; i
++) {
Message msg
= new Message("TransactionTopic", tags
[i
% tags
.length
], "KEY" + i
,
("Hello RocketMQ" + i
).getBytes(RemotingHelper
.DEFAULT_CHARSET
));
SendResult sendResult
= producer
.sendMessageInTransaction(msg
, null
);
System
.out
.println(sendResult
.toString());
Thread
.sleep(10);
}
for (int i
= 0; i
< 100000; i
++) {
Thread
.sleep(1000);
}
producer
.shutdown();
}
}
6.创建消费者TransactionConsumer
package org
.example
.transaction
;
import org
.apache
.rocketmq
.client
.consumer
.DefaultMQPushConsumer
;
import org
.apache
.rocketmq
.client
.consumer
.listener
.ConsumeOrderlyContext
;
import org
.apache
.rocketmq
.client
.consumer
.listener
.ConsumeOrderlyStatus
;
import org
.apache
.rocketmq
.client
.consumer
.listener
.MessageListenerOrderly
;
import org
.apache
.rocketmq
.client
.exception
.MQClientException
;
import org
.apache
.rocketmq
.common
.consumer
.ConsumeFromWhere
;
import org
.apache
.rocketmq
.common
.message
.MessageExt
;
import java
.util
.Date
;
import java
.util
.List
;
import java
.util
.Random
;
import java
.util
.concurrent
.TimeUnit
;
public class TransactionConsumer {
public static void main(String
[] args
) throws MQClientException
{
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("transaction_producer");
consumer
.setNamesrvAddr("192.168.42.112:9876");
consumer
.setConsumeFromWhere(ConsumeFromWhere
.CONSUME_FROM_LAST_OFFSET
);
consumer
.subscribe("TransactionTopic", "*");
consumer
.registerMessageListener(new MessageListenerOrderly() {
private Random random
= new Random();
public ConsumeOrderlyStatus
consumeMessage(List
<MessageExt> msgs
, ConsumeOrderlyContext context
) {
context
.setAutoCommit(true);
for (MessageExt msg
: msgs
) {
System
.out
.println(new Date()+"获取到消息开始消费:"+msg
+",content:"+new String(msg
.getBody()));
}
try {
TimeUnit
.SECONDS
.sleep(random
.nextInt(5));
} catch (Exception e
) {
e
.printStackTrace();
return ConsumeOrderlyStatus
.SUSPEND_CURRENT_QUEUE_A_MOMENT
;
}
return ConsumeOrderlyStatus
.SUCCESS
;
}
});
consumer
.start();
System
.out
.println("Consumer Started");
}
}
7.启动消费者,再启动生产者,通过生产者打印可以看出有4次“事务回查-----UNKNOW”的打印,分别表示消息0,3,6,9进入回查
8.查看消费者打印
通过打印可以看出,消息1,4,7成功commit;消息0,3,6,9第一次进入事务回查,第二次成功commit;最后都被消费者消费