回顾整个下单流程,我们之前做了下单减缓存库存优化以及回补库存的操作,但是因为整个下单是属于一个transaction事务,如果用户下单成功,但是之后订单入库或返回前端的过程中失败,事务回滚,会导致少卖的现象,有可能造成库存堆积
我们的解决方法就是异步消息的发送要在整个事务提交成功后再发送
OrderServiceImpl
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { //异步更新库存 boolean mqResult = itemService.asyncDecreaseStock(itemId,amount); // if(!mqResult) { // itemService.increaseStock(itemId,amount); // throw new BusinessException(EmBusinessError.MQ_SEND_FAIL); // } } });ItemServiceImpl实现方法
@Override public boolean increaseStock(Integer itemId, Integer amount) throws BusinessException { redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()); return true; } @Override public boolean asyncDecreaseStock(Integer itemId, Integer amount) { boolean mqResult = mqProducer.asyncReduceStock(itemId,amount); return mqResult; }以上用到了TransactionSynchronizationManager来保证消息在事务提交后再发送。我们同样可以用rocketMQ自带的transactionMQProducer来发送事务型消息
在分布式系统中,我们常会遇到分布式事务的问题,除了之前用到的方法,我们还可以利用RocketMQ的事务型消息来解决分布式事务问题。首先来看RocketMQ消息的事务架构设计:
生产者执行本地事务,修改订单支付状态(下单),并且提交事务生产者发送事务消息到broker上,消息发送到broker上在没有确认之前,消息对于consumer是不可见状态(prepare状态)生产者确认事务消息,使得发送到broker上的事务消息对于消费者可见消费者获取到消息进行消费,消费完之后执行ack进行确认这中间可能会存在一个问题,生产者本地事务成功后,发送事务确认消息到broker上失败了怎么办?这个时候意味着消费者无法正常消费到这个消息。所以RocketMQ提供了消息回查机制(LocalTransactionState checkLocalTransaction(MessageExt messageExt) 方法,如果事务消息一直处于中间状态,broker会发起重试去查询broker上这个事务的处理状态。一旦发现事务处理成功,则把当前这条消息设置为可见。
RocketMQ事务消息有三种状态:
ROLLBACK_MESSAGE:回滚事务COMMIT_MESSAGE:提交事务UNKNOW:broker会定时回查Producer消息状态,直到彻底成功或失败由于分布式消息队列对于可靠性的要求比较高,所以需要保证生产者将消息发送到broker之后,保证消息是不出现丢失的,因此消息队列就少不了对于可靠性存储的要求
从主流的几种MQ消息队列采用的存储方式来看,主要会有三种
分布式KV存储,比如ActiveMQ中采用的levelDB、Redis, 这种存储方式对于消息读写能力要求不高的情况下可以使用文件系统存储,常见的比如kafka、RocketMQ、RabbitMQ都是采用消息刷盘到所部署的机器上的文件系统来做持久化,这种方案适合对于有高吞吐量要求的消息中间件,因为消息刷盘是一种高效率,高可靠、高性能的持久化方式,除非磁盘出现故障,否则一般是不会出现无法持久化的问题关系型数据库,比如ActiveMQ可以采用mysql作为消息存储,关系型数据库在单表数据量达到千万级的情况下IO性能会出现瓶颈,所以ActiveMQ并不适合于高吞吐量的消息队列场景。总的来说,对于存储效率,文件系统要优于分布式KV存储,分布式KV存储要优于关系型数据库消息的存储结构:
RocketMQ就是采用文件系统的方式来存储消息,消息的存储是由ConsumeQueue和CommitLog配合完成的。CommitLog是消息真正的物理存储文件。ConsumeQueue是消息的逻辑队列,有点类似于数据库的索引文件,里面存储的是指向CommitLog文件中消息存储的地址。每个Topic下的每个Message Queue都会对应一个ConsumeQueue文件
CommitLog:
CommitLog是用来存放消息的物理文件,每个broker上的commitLog本当前机器上的所有consumerQueue共享,不做任何的区分。CommitLog中的文件默认大小为1G,可以动态配置; 当一个文件写满以后,会生成一个新的commitlog文件。所有的Topic数据是顺序写入在CommitLog文件中的。
ConsumeQueue:
consumeQueue表示消息消费的逻辑队列,这里面包含MessageQueue在commitlog中的其实物理位置偏移量offset,消息实体内容的大小和Message Tag的hash值。
RocketMQ的消息存储采用的是混合型的存储结构,也就是Broker单个实例下的所有队列公用一个日志数据文件CommitLog。这个是和Kafka又一个不同之处。为什么不采用kafka的设计,针对不同的partition存储一个独立的物理文件呢?这是因为在kafka的设计中,一旦kafka中Topic的Partition数量过多,队列文件会过多,那么会给磁盘的IO读写造成比较大的压力,也就造成了性能瓶颈。所以RocketMQ进行了优化,消息主题统一存储在CommitLog中。当然它也有它的优缺点
优点在于:由于消息主题都是通过CommitLog来进行读写,ConsumerQueue中只存储很少的数据,所以队列更加轻量化。对于磁盘的访问是串行化从而避免了磁盘的竞争缺点在于:消息写入磁盘虽然是基于顺序写,但是读的过程确是随机的。读取一条消息会先读取ConsumeQueue,再读CommitLog,会降低消息读的效率。首先,在OrderController中先开启异步发送事务型消息的操作
//创建订单,开启异步发送事务型消息的操作 if(!mqProducer.transactionAsyncReduceStock(userModel.getId(),promoId,itemId,amount)){ throw new BusinessException(EmBusinessError.UNKNOW_ERROR,"下单失败"); }然后在mqProducer中实现transactionAsyncReduceStock方法,投递prepare消息
//事务型同步库存扣减消息 public boolean transactionAsyncReduceStock(Integer userId,Integer promoId,Integer itemId,Integer amount) { Map<String,Object> bodyMap = new HashMap<>(); bodyMap.put("itemId",itemId); bodyMap.put("amount",amount); Map<String,Object> argsMap = new HashMap<>(); argsMap.put("itemId",itemId); argsMap.put("amount",amount); argsMap.put("userId",userId); argsMap.put("promoId",promoId); //投放消息 Message message = new Message(topicName,"increase",JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8"))); TransactionSendResult sendResult = null; try { //投递prepare消息 sendResult = transactionMQProducer.sendMessageInTransaction(message,argsMap); } catch (MQClientException e) { e.printStackTrace(); return false; } if(sendResult.getLocalTransactionState()==LocalTransactionState.ROLLBACK_MESSAGE) { return false; }else if(sendResult.getLocalTransactionState()==LocalTransactionState.COMMIT_MESSAGE){ return true; }else { return false; } }在MqProducer方法内部初始化方法中实现transactionMQProducer,
@PostConstruct public void init() throws MQClientException { //mq producer初始化 producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr(nameAddr); producer.start(); transactionMQProducer = new TransactionMQProducer("transaction_producer_group"); transactionMQProducer.setNamesrvAddr(nameAddr); transactionMQProducer.start(); transactionMQProducer.setTransactionListener(new TransactionListener() { //发送事务型消息,消息的类型是prepare,不会被consumer立即执行 @Override public LocalTransactionState executeLocalTransaction(Message message, Object arg) { //真正要做的事,创建订单 Integer itemId = (Integer) ((Map)arg).get("itemId"); Integer userId = (Integer) ((Map)arg).get("userId"); Integer promoId = (Integer) ((Map)arg).get("promoId"); Integer amount = (Integer) ((Map)arg).get("amount"); try { //这里进行订单创建 orderService.createOrder(userId,itemId,promoId,amount); } catch (BusinessException e) { e.printStackTrace(); //失败事务回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //根据是否扣减库存成功,来判断要返回COMMIT,ROLLBACK还是UNKNOW String jsonString = new String(msg.getBody()); Map<String,Object> map = JSON.parseObject(jsonString); Integer itemId = (Integer) map.get("itemId"); Integer amount = (Integer) map.get("amount"); return null; } });上面有个问题就是回调checkLocalTransaction函数时,无法仅仅通过itemId和amount来确定库存是否扣减成功,所有要引入库存流水的概念
操作流水的数据类型:
主业务数据:master data ,比如商品模型itemModel操作型数据:log data新建表stock_log,生成表结构,ItemService接口中实现初始化库存流水
MqProducer
@PostConstruct public void init() throws MQClientException { //mq producer初始化 producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr(nameAddr); producer.start(); transactionMQProducer = new TransactionMQProducer("transaction_producer_group"); transactionMQProducer.setNamesrvAddr(nameAddr); transactionMQProducer.start(); transactionMQProducer.setTransactionListener(new TransactionListener() { //发送事务型消息,消息的类型是prepare,不会被consumer立即执行 @Override public LocalTransactionState executeLocalTransaction(Message message, Object arg) { //真正要做的事,创建订单 Integer itemId = (Integer) ((Map)arg).get("itemId"); Integer userId = (Integer) ((Map)arg).get("userId"); Integer promoId = (Integer) ((Map)arg).get("promoId"); Integer amount = (Integer) ((Map)arg).get("amount"); String stockLogId = (String) ((Map)arg).get("stockLogId"); try { //这里进行订单创建 orderService.createOrder(userId,itemId,promoId,amount,stockLogId); } catch (BusinessException e) { e.printStackTrace(); //失败事务回滚,设置对应的stockLog为回滚状态 StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); stockLogDO.setStatus(3); stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { //根据是否扣减库存成功,来判断要返回COMMIT,ROLLBACK还是UNKNOW String jsonString = new String(msg.getBody()); Map<String,Object> map = JSON.parseObject(jsonString); Integer itemId = (Integer) map.get("itemId"); Integer amount = (Integer) map.get("amount"); String stockLogId = (String) map.get("stockLogId"); StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); if(stockLogDO == null) { return LocalTransactionState.UNKNOW; } if(stockLogDO.getStatus()==2){ return LocalTransactionState.COMMIT_MESSAGE; }else if(stockLogDO.getStatus()==1){ return LocalTransactionState.UNKNOW; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); } //事务型同步库存扣减消息 public boolean transactionAsyncReduceStock(Integer userId,Integer promoId,Integer itemId,Integer amount,String stockLogId) { Map<String,Object> bodyMap = new HashMap<>(); bodyMap.put("itemId",itemId); bodyMap.put("amount",amount); bodyMap.put("stockLogId",stockLogId); Map<String,Object> argsMap = new HashMap<>(); argsMap.put("itemId",itemId); argsMap.put("amount",amount); argsMap.put("userId",userId); argsMap.put("promoId",promoId); argsMap.put("stockLogId",stockLogId); //投放消息 Message message = new Message(topicName,"increase",JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8"))); TransactionSendResult sendResult = null; try { //投递prepare消息 sendResult = transactionMQProducer.sendMessageInTransaction(message,argsMap); } catch (MQClientException e) { e.printStackTrace(); return false; } if(sendResult.getLocalTransactionState()==LocalTransactionState.ROLLBACK_MESSAGE) { return false; }else if(sendResult.getLocalTransactionState()==LocalTransactionState.COMMIT_MESSAGE){ return true; }else { return false; } }设计原则:
宁可少卖,不能超卖
方案
(1)redis可以比实际数据库少
(2)超时释放(针对消息一直卡死在初始状态,会造成订单大量废弃,设置超时时间)
之前的设计还存在一个问题,当库存售罄时,还会初始化库存流水这个操作,导致之后下单失败
所以对库存售罄的情况做一个处理
库存售罄标识售罄后不去操作后续流程售罄后通知各系统售罄回补上新在ItemServiceImpl的减缓存库存中,若result == 0 ,redis内打上已售罄标识。在之后初始化库存流水之前,判断redis内是否有此key,如果有,直接返回库存不足
decreaseStock
//减缓存库存 long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1); if(result > 0 ) { //更新库存成功 return true; }else if(result == 0) { //打上库存已售罄标识 redisTemplate.opsForValue().set("promo_item_stock_invalid_"+itemId,"true"); //更新库存成功 return true; }else { //更新库存失败 increaseStock(itemId,amount); return false; }OrderController在加入库存流水init状态前判断是否已售罄
//判断是否库存已售罄,若对应的售罄key存在,直接返回下单失败 if(redisTemplate.hasKey("promo_item_stock_invalid_"+itemId)){ throw new BusinessException(EmBusinessError.STOCK_NOT_ENOUGH); }销量与库存模型一样,存在数据库加行锁并加1的操作,所以也可以用类似方法优化