聚焦Java性能优化 打造亿级流量秒杀系统【学习笔记】06

    技术2022-07-10  116

    文章目录

    本章目标7-1 交易性能瓶颈7-2 交易验证优化7-3 活动缓存库存方案一(重点)库存行锁优化扣减库存缓存化(方案一)异步同步数据库(方案二)异步消息队列rocketmq 库存数据库最终一致性保证 7-5 活动缓存库存方案二分布式事务 7-7 rocketmq安装7-8 缓存库存接入异步化

    本章目标

    掌握高效交易验证方式掌握缓存库存模型

    7-1 交易性能瓶颈

    jmeter压测(对活动下单过程进行压测,采用post请求,设置传入参数,性能发现下单avarage大约2s,tps500,交易验证主要完全依赖数据库的操作)交易验证完全依赖数据库库存行锁后置处理逻辑

    7-2 交易验证优化

    用户风控策略优化:策略缓存模型化

    在开始交易后,针对活动实时信息和用户实时信息的验证,目的是为了风控策略,检查用户账号是否异常,是否异地登陆,策略是:通过异步的方式将用户模型写入缓存,与实时信息做一致性检验,做到风控策略

    活动校验策略优化:引入活动发布流程,模型缓存化,紧急下线能力

    实时活动的缓存存在一个问题:如果后台修改活动信息(修改活动结束时间),但redis的缓存还处于正常有效期,用户依然可以以活动价格秒杀商品,因此需要有紧急下线的能力。对应的策略是:在活动开始前半个小时发布活动,对缓存预热,同时后台设计一个紧急下线的接口,清除redis缓存,那么用户下单时就会去数据库查询活动的最新信息了

    Jmeter压测效果:avg:800 吞吐量tps:800左右

    7-3 活动缓存库存方案一(重点)

    库存行锁优化

    首先回顾我们之前减库存的操作:

    <update id="decreaseStock"> update item_stock set stock = stock - #{amount} where item_id = #{itemId} and stock >= #{amount} </update>

    库存的数量就是stock-amount 条件是商品itemId和stock的大小大于amount,条件是item_id要加上唯一索引,这样查询的时候为数据库加上行锁,否则是数据库表锁

    扣减库存缓存化(方案一)

    方案是:我们要将扣减库存的操作发生在缓存而不是数据库中,缓存的扣减时间相对较少

    首先要:(1)活动发布同时同步库存进缓存

    ​ (2)下单交易减缓库存

    PromoService 接口中添加活动发布接口

    //活动发布 void publishPromo(Integer promoId);

    PromoServiceImpl实现类(默认获取活动id以及商品信息的时候库存不发生变化)

    @Override public void publishPromo(Integer promoId) { //通过活动id获取活动 PromoDO promoDO = promoDOMapper.selectByPrimaryKey(promoId); if(promoDO.getItemId()==null || promoDO.getItemId().intValue()==0) { return; } ItemModel itemModel = itemService.getItemById(promoDO.getItemId()); //将库存同步到redis内 redisTemplate.opsForValue().set("promo_item_stock_"+itemModel.getId(),itemModel.getStock()); }

    前端写发布活动的controller接口

    @RequestMapping(value = "/publishpromo",method = RequestMethod.GET) //浏览时服务端用GET请求 @ResponseBody public CommonReturnType publishpromo(@RequestParam(name = "id") Integer id) { promoService.publishPromo(id); return CommonReturnType.create(null); }

    更新ItemServiceImpl里更新redis减库存的操作

    //减缓存库存 long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1); if(result > 0 ) { //更新库存成功 return true; }else { //更新库存失败 return false; }

    做到缓存减库存操作,但这样还存在数据库记录不一致的情况

    异步同步数据库(方案二)

    采用异步消息队列的方式,将异步扣减的消息同步给消息的consumer端,并由消息的consunmer端完成数据库扣减的操作

    (1)活动发布同步库存进缓存

    (2)下单交易减缓存库存

    (3)异步消息扣减数据库内存

    异步消息队列rocketmq

    采用异步队列可以既能让C端用户完成购买商品的高效体验,又能保证数据库的一致性。

    常见的异步消息中间件用到的有ActiveMQ(实现java的AMS)、Kafka(基于流式处理)、RocketMQ是阿里巴巴基于Kafka改造的一种异步消息队列

    高性能,高并发,分布式消息中间件典型应用场景:分布式事务,异步解耦

    RocketMQ主要有 Producer端,负责向Broker发送消息;Consumer端,多个consumer组成一个ConsumerGroup,每个消息会有一个Group里的consumer来消费;Broker由topic和MessageQueue组成,消息隶属于某个topic,一个topic可能由一个或多个topic管理

    消息队列也是面试官经常会问到的内容,想了解消息队列的原理可以看看这篇新手也能看懂消息队列的文章,写的真心不错,强推!!还有这篇面试会涉及到的文章

    库存数据库最终一致性保证

    7-5 活动缓存库存方案二

    首先NameServer相当于一个注册管理器,Broker1向NameServer发出注册请求,NameServer记录broker1:ip以及它负责的topicA,topicA负责的queue1和queue2

    Producer连接NameServer发现broker1,会向topicA为主题的broker1投递消息,采用负载轮询向queue投递;

    Consumer抓取负责的topicA,与queue建立长连接,当有消息时,唤醒,拉取对应的message,没有消息就等待,这种方式叫做长轮询

    一个consumer对应一个group,会平均划分,如果出现consumer过多,会有空闲。一个项目中会出现多个不同的ConsumerGroup,比如订单系统、商品系统等。若一个queue被多个consumer消费,会存在锁竞争机制,rocketmq采用的策略是以queue为单位平均分配,尽量保证consumer与queue数量相等。

    多个Broker会有主从复制机制,用于应对broker1异常,nameserver将broker2设为主库,通知producer以及consumer端去接管,Broker1和Broker2的数据可以是同步也可以是异步的

    分布式事务

    分布式设计CAP三方面,一致性、可用性、分区容忍性

    Soft-state软状态:保证最终结果的一致性,不保证中间过程的一致。比如缓存中的库存和数据库中的库存肯可能会有不一致情况发送

    7-7 rocketmq安装

    rocketmq 官网下载压缩包 上传到服务器

    //解压 unzip rocketmq-all-4.7.1-source-release.zip //启动Name Server nohup sh bin/mqnamesrv & //查看日志 tail -f ~/logs/rocketmqlogs/namesrv.log //启动broker nohup sh bin/mqbroker -n localhost:9876 & //发生报错就修改配置文件broker的容量大小重新启动

    发送和接受消息

    > export NAMESRV_ADDR=localhost:9876 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId= ... > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt...

    7-8 缓存库存接入异步化

    新建mq package MqConsumer和MqProducer类

    配置appliaction

    #设置rocketmq mq.nameserver.addr=数据库服务器ip地址:9876 mq.topicname=stock

    MqProducer

    @Component public class MqProducer { private DefaultMQProducer producer; //声明value注解,引入配置变量 @Value("${mq.nameserver.addr}") private String nameAddr; @Value("${mq.topicname}") private String topicName; @PostConstruct public void init() throws MQClientException { //mq producer初始化 producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr(nameAddr); producer.start(); } //同步库存扣减消息 public boolean asyncReduceStock(Integer itemId,Integer amount) { Map<String,Object> bodyMap = new HashMap<>(); bodyMap.put("itemId",itemId); bodyMap.put("amount",amount); //投放消息 Message message = new Message(topicName,"increase",JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8"))); try { producer.send(message); } catch (MQClientException e) { e.printStackTrace(); return false; } catch (RemotingException e) { e.printStackTrace(); return false; } catch (MQBrokerException e) { e.printStackTrace(); return false; } catch (InterruptedException e) { e.printStackTrace(); return false; } return true; } }

    ItemServiceImpl实现缓存减库存以及发送消息减数据库库存

    @Override @Transactional public boolean decreaseStock(Integer itemId, Integer amount) { //影响行数 //int affectedRow = itemStockDOMapper.decreaseStock(itemId,amount); //减缓存库存 long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1); if(result >= 0 ) { //更新库存成功,发送消息,减数据库库存 boolean mqResult = mqProducer.asyncReduceStock(itemId,amount); if(!mqResult) { //发送消息失败,回补库存 redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()); return false; } return true; }else { //更新库存失败 redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()); return false; }

    MqConsumer

    @Autowired private ItemStockDOMapper itemStockDOMapper; @PostConstruct public void init() throws MQClientException { consumer = new DefaultMQPushConsumer("stock_consumer_group"); consumer.setNamesrvAddr(nameAddr); //订阅所有消息 consumer.subscribe(topicName,"*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //实现库存真正到数据库内扣减的逻辑 Message msg = msgs.get(0); String jsonString = new String(msg.getBody()); Map<String,Object> map = JSON.parseObject(jsonString); Integer itemId = (Integer) map.get("itemId"); Integer amound = (Integer) map.get("amount"); itemStockDOMapper.decreaseStock(itemId,amound); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }

    异步同步数据库还是会出现以下几个问题:

    异步消息发送失败扣减操作执行失败下单失败(用户退单)无法正确回补库存
    Processed: 0.009, SQL: 9