设计与开发
表结构
CREATE TABLE `t_product
` (
`id
` int(12) NOT NULL AUTO_INCREMENT COMMENT '产品编号',
`product_name
` varchar(60) NOT NULL COMMENT '产品名称',
`stock
` int(10) NOT NULL COMMENT '库存',
`price
` decimal(16,2) NOT NULL COMMENT '单价',
`version
` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',
`note
` varchar(255) DEFAULT NULL COMMENT '备注',
PRIMARY KEY (`id
`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
COMMENT='产品信息表';
CREATE TABLE `t_purchase_record
` (
`id
` int(12) NOT NULL AUTO_INCREMENT COMMENT '编号',
`user_id
` int(12) NOT NULL COMMENT '用户编号',
`product_id
` int(12) NOT NULL COMMENT '产品编号',
`price
` decimal(16,2) NOT NULL COMMENT '价格',
`quantity
` int(12) NOT NULL COMMENT '数量',
`sum
` decimal(12,2) NOT NULL COMMENT '总价',
`purchase_date
` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '购买日期',
`note
` varchar(512) DEFAULT NULL COMMENT '备注',
PRIMARY KEY (`id
`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
COMMENT='购买信息表';
判断产品表 的产品 有没有足够的库存 支持用户的购买,如果有 则对产品 减库存然后在 将 购买信息 插入到购买记录中,如果库存不足,则返回交易失败。
pom引用
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-aop
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-data-redis
</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce
</groupId>
<artifactId>lettuce-core
</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients
</groupId>
<artifactId>jedis
</artifactId>
</dependency>
//连接池,可以不引用
<dependency>
<groupId>org.apache.commons
</groupId>
<artifactId>commons-pool2
</artifactId>
</dependency>
//web 必须引用
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-web
</artifactId>
</dependency>
//mybatis 必须引用
<dependency>
<groupId>org.mybatis.spring.boot
</groupId>
<artifactId>mybatis-spring-boot-starter
</artifactId>
<version>1.3.2
</version>
</dependency>
//驱动必须引用
<dependency>
<groupId>mysql
</groupId>
<artifactId>mysql-connector-java
</artifactId>
<scope>runtime
</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-tomcat
</artifactId>
<scope>provided
</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed
</groupId>
<artifactId>tomcat-embed-jasper
</artifactId>
<scope>provided
</scope>
</dependency>
<dependency>
<groupId>javax.servlet
</groupId>
<artifactId>jstl
</artifactId>
<scope>provided
</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-test
</artifactId>
<scope>test
</scope>
</dependency>
MyBatis 开发持久层
pojo
@Alias("product")
public class ProductPo implements Serializable {
private static final long serialVersionUID
= 3L
;
private Long id
;
private String productName
;
private int stock
;
private double price
;
private int version
;
private String note
;
}
@Alias("purchaseRecord")
public class PurchaseRecordPo implements Serializable {
private static final long serialVersionUID
= -3L
;
private Long id
;
private Long userId
;
private Long productId
;
private double price
;
private int quantity
;
private double sum
;
private Timestamp purchaseTime
;
private String note
;
}
定义 Alias 的别名
mapper文件
ProductMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.springboot.chapter15.dao.ProductDao">
<select id="getProduct" parameterType="long" resultType="product">
select id, product_name as productName,
stock, price, version, note from t_product
where id=#{id}
</select>
<update id="decreaseProduct">
update t_product set stock = stock - #{quantity},
version = version +1
where id = #{id}
</update>
</mapper>
PurchaseRecordMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.springboot.chapter15.dao.PurchaseRecordDao">
<insert id="insertPurchaseRecord" parameterType="purchaseRecord">
insert into t_purchase_record(
user_id, product_id, price, quantity, sum, purchase_date, note)
values(#{userId}, #{productId}, #{price}, #{quantity},
#{sum}, now(), #{note})
</insert>
</mapper>
dao
@Mapper
public interface ProductDao {
public ProductPo
getProduct(Long id
);
public int decreaseProduct(@Param("id") Long id
, @Param("quantity") int quantity
);
}
@Mapper
public interface PurchaseRecordDao {
public int insertPurchaseRecord(PurchaseRecordPo pr
);
}
开发业务层 和 控制层
service
public interface PurchaseService {
public boolean purchase(Long userId
, Long productId
, int quantity
);
boolean purchaseRedis(Long userId
, Long productId
, int quantity
);
boolean dealRedisPurchase(List
<PurchaseRecordPo> prpList
);
}
@Service
public class PurchaseServiceImpl implements PurchaseService {
@Autowired
private ProductDao productDao
= null
;
@Autowired
private PurchaseRecordDao purchaseRecordDao
= null
;
@Override
@Transactional
public boolean purchase(Long userId
, Long productId
, int quantity
) {
ProductPo product
= productDao
.getProduct(productId
);
if (product
.getStock() < quantity
) {
return false;
}
productDao
.decreaseProduct(productId
, quantity
);
PurchaseRecordPo pr
= this.initPurchaseRecord(userId
, product
, quantity
);
purchaseRecordDao
.insertPurchaseRecord(pr
);
return true;
}
private PurchaseRecordPo
initPurchaseRecord(Long userId
, ProductPo product
, int quantity
) {
PurchaseRecordPo pr
= new PurchaseRecordPo();
pr
.setNote("购买日志,时间:" + System
.currentTimeMillis());
pr
.setPrice(product
.getPrice());
pr
.setProductId(product
.getId());
pr
.setQuantity(quantity
);
double sum
= product
.getPrice() * quantity
;
pr
.setSum(sum
);
pr
.setUserId(userId
);
return pr
;
}
}
controller
@RestController
public class PurchaseController {
@Autowired
PurchaseService purchaseService
= null
;
@GetMapping("/test")
public ModelAndView
testPage() {
ModelAndView mv
= new ModelAndView("test");
return mv
;
}
@PostMapping("/purchase")
public Result
purchase(Long userId
, Long productId
, Integer quantity
) {
boolean success
= purchaseService
.purchaseRedis(userId
, productId
, quantity
);
String message
= success
? "抢购成功" : "抢购失败";
Result result
= new Result(success
, message
);
return result
;
}
class Result {
private boolean success
= false;
private String message
= null
;
public Result() {
}
public Result(boolean success
, String message
) {
this.success
= success
;
this.message
= message
;
}
}
}
配置和测试
jsp
<%@ page language="java" contentType="text/html; charset=UTF-8"
pageEncoding="UTF-8" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<title>购买产品测试</title>
<script type="text/javascript"
src="https://code.jquery.com/jquery-3.2.1.min.js"></script>
</head>
<!--后面需要改写这段JavaScript脚本进行测试-->
<script type="text/javascript">
var params = {
userId : 1,
productId : 1,
quantity : 3
};
// 通过POST请求后端
$.post("./purchase", params, function(result) {
alert(result.message);
});
for (var i = 1; i <= 50000; i++) {
var params = {
userId: 1,
productId: 1,
quantity: 1
};
// 通过POST请求后端,这里的JavaScript会采用异步请求
$.post("./purchase", params, function (result) {
});
}
</script>
<body>
<h1>抢购产品测试</h1>
</body>
</html>
配置
spring.mvc.view.prefix=/WEB-INF/jsp/
spring.mvc.view.suffix=.jsp
########## 数据库配置 ##########
spring.datasource.url=jdbc:mysql://localhost:3306/spring_boot_chapter15
spring.datasource.username=root
spring.datasource.password=123456
#spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.tomcat.max-idle=10
spring.datasource.tomcat.max-active=50
spring.datasource.tomcat.max-wait=10000
spring.datasource.tomcat.initial-size=5
# 采用隔离级别为读写提交
spring.datasource.tomcat.default-transaction-isolation=2
########## MyBatis配置 ##########
# 映射文件
mybatis.mapper-locations=classpath:com/springboot/chapter15/mapper/*.xml
# 扫描别名
mybatis.type-aliases-package=com.springboot.chapter15.pojo
main方法
@SpringBootApplication(scanBasePackages
= "com.springboot.chapter15")
@MapperScan(annotationClass
= Mapper
.class, basePackages
= "com.springboot.chapter15")
@EnableScheduling
public class Chapter15Application {
public static void main(String
[] args
) {
SpringApplication
.run(Chapter15Application
.class, args
);
}
}
高并发测试
线程1:读取库存为1,可购买
线程2:读取库存为1,可购买
线程1,扣减库存。此时库存为0
线程2:扣减库存。此时库存为 -1 。超发了。
线程1:插入交易记录。
线程2:插入交易记录,错误,库存已经不足。
线程2,此时并不会 感知 线程1 的这个操作。而是按照 原来读取到的1,进行扣减。
这样就会 出现 -1 。
悲观锁
共享的数据 被 多个线程 所 修改,无法保证 其 执行顺序。如果一个数据库事务 读到 产品后,就将数据 直接锁定,不允许其他线程读写,直到 当前事务完成后,才释放这条数据,则不会出现。超发的问题。
<select id="getProduct" parameterType="long" resultType="product">
select id, product_name as productName,
stock, price, version, note from t_product
where id=#{id} for update
</select>
for update ,这样 数据库事务 执行的过程中,就会锁定 查询出来的数据,其他事务将 不能再对其进行读写(其他线程执行这行代码的时候,就会进入等待)。不加锁用28s,加悲观锁 用了 33秒。加入事务2 得到商品信息的锁,那么事务 1,3,n 就必须等待 持有 商品信息的 事务 2,结束后 释放商品信息,才能去抢夺 商品信息,这样就会有大量的线程 被 挂机 和 等待。悲观锁:使用数据库内部的锁,对记录进行加锁。悲观锁:也成 独占锁 或 排他锁
乐观锁
乐观锁设计
虽然 悲观锁 可以解决 高并发的超发 现象,但并不是一个高效的方案
乐观锁:是一种,不使用 数据库锁 和 不阻塞 线程 并发 的方案
非独占锁, 无阻塞锁
一个线程 先读取 既有的商品 库存数据,保存起来,(旧值)
等到 需要对 共享数据 做修改时,会事先 将 保存的旧值库存 与 当前数据库的 库存进行比较。
如果 一致,就认为没有被修改过, 否则就认为 已经被修改过,(当前计算不被信任,不在修改数据)
保存 旧值——处理业务——旧值 与 当前数据库存 一致
——是 扣减库存——否 不执行逻辑
ABA现象 (先A在B,又A)
这个方案 就是 多线程的概念,CAS compare and swap
会引发 ABA 问题
线程1 读取到 A件,保存为A件
线程2 读取到A件,保存为A件
线程2 扣减库存C件,剩下B件。(当前数据库为A件,与线程旧值一致,成功)
线程1,计算剩余商品的价格(总价),会 按照剩余B件 计算。
线程2,取消购买,库存回退A件。
此时 线程 1 的结果是错的。
线程1 计算商品总价格的时候,当前库存 会被线程 2 所修改。
称为 ABA问题
线程1 在计算商品 总价格时,
当前 库存是一个变化的值,这样就可能出现计算错误。
共享值回退,导致了数据的不一致。
引入版本号
解决ABA,引入版本号
规定:只要操作 过程中 修改共享值,无论 业务正常 回退 还是异常
版本号 只增不减
线程1 读取版本号为1
线程2 读取版本号为 1
线程2 扣减库存C件, 剩下B件。 版本为2
线程2 取消购买,库存回退为A件。 版本为 3
线程1 ,计算商品价格 记录的是 版本为1 ,当前已经为 3 了。所以 取消业务。
<update id="decreaseProduct">
update t_product set stock = stock - #{quantity},
version = version +1
where id = #{id} and version = #{version}
</update>
and version = #{version} 判断,有没有别的事务已经修改过数据一旦 版本号 修改失败,则什么数据 也不会 触发更新
使用乐观锁,版本号处理
public int decreaseProduct(@Param("id") Long id
, @Param("quantity") int quantity
, @Param("version") int version
);
UPDATE t_product
SET stock
= stock
- 1,
version
= version
+ 1
WHERE
id
= '1'
AND version
= 1
@Transactional(isolation
=Isolation
.READ_COMMITTED
)
public boolean purchase(Long userId
, Long productId
, int quantity
) {
ProductPo product
= productDao
.getProduct(productId
);
if (product
.getStock() < quantity
) {
return false;
}
int version
= product
.getVersion();
int result
= productDao
.decreaseProduct(productId
, quantity
, version
);
if (result
== 0) {
return false;
}
PurchaseRecordPo pr
= this.initPurchaseRecord(userId
, product
, quantity
);
purchaseRecordDao
.insertPurchaseRecord(pr
);
return true;
}
耗时 27s ,5万个请求过去,还有库存。没有超发。因为加入了版本号的判断,大量的请求得到失败的结果。这个失败率比较高。
乐观锁 加入 重入机制
一旦更新失败,就重新做 一次,称乐观锁为 可重入的锁其原理:一单发现 版本号被更新,不是结束请求,而是重新做一次流程。直到成功为止会带来另一个问题:造成大量的SQL被执行
一个请求需要执行3条SQL,重入需要 3次,那么就要12条sql ,会给数据带来压力为了克服:使用 限制 时间 或 重入次数。压制过多的SQL
使用是时间戳 限制重入
一个请求 限制 100ms的生存期
100ms 内发生版本号冲突,则重试
@Override
@Transactional(isolation
= Isolation
.READ_COMMITTED
)
public boolean purchase(Long userId
, Long productId
, int quantity
) {
long start
= System
.currentTimeMillis();
while (true) {
long end
= System
.currentTimeMillis();
if (end
- start
> 100) {
return false;
}
ProductPo product
= productDao
.getProduct(productId
);
int version
= product
.getVersion();
if (product
.getStock() < quantity
) {
return false;
}
int result
= productDao
.decreaseProduct(productId
, quantity
, version
);
if (result
== 0) {
continue;
}
PurchaseRecordPo pr
= this.initPurchaseRecord(userId
, product
, quantity
);
purchaseRecordDao
.insertPurchaseRecord(pr
);
return true;
}
}
long start
= System
.currentTimeMillis();
while (true) {
long end
= System
.currentTimeMillis();
if (end
- start
> 100) {
return false;
}
if (result
== 0) {
continue;
}
return true;
}
按照时间戳 重入 也有一个弊端:系统会随自身的忙碌,而大大减少重入的次数
因此有时候也会采用 按照次数重入
按照限定次数 重入的乐观锁
@Override
@Transactional(isolation
= Isolation
.READ_COMMITTED
)
public boolean purchase(Long userId
, Long productId
, int quantity
) {
for (int i
= 0; i
< 3; i
++) {
ProductPo product
= productDao
.getProduct(productId
);
if (product
.getStock() < quantity
) {
return false;
}
int version
= product
.getVersion();
int result
= productDao
.decreaseProduct(productId
, quantity
,version
);
if (result
== 0) {
continue;
}
PurchaseRecordPo pr
= this.initPurchaseRecord(userId
, product
, quantity
);
purchaseRecordDao
.insertPurchaseRecord(pr
);
return true;
}
return false;
}
乐观锁:不使用 数据库锁的机制
不会造成线程的阻塞,只是采用多版本号 机制来实现因为版本的冲突造成了 请求失败的概率增加 ——往往需要重入的机制机制。重入又会造成 多执行SQL,可以时间戳 或限制重入次数。或者用 redis
使用redis处理高并发
数据库 是 写入磁盘的过程。
redis : 写入内存 (是 数据库的 几倍 或 数十倍)
其命令方式,运算能力比较薄弱(redis lua命令代替)。redis lua 执行中 ,具备 原子性使用 redis 去 替代 数据库作为 响应用户的数据载体要处理 redis 存储的不稳定,还需要 有一定的机制 将redis 存储的数据刷入数据库中
设计思路
先用 redis响应高并发用户的请求及时的将 数据保存到数据库,启用定时任务去查找redis,将它们保存到数据库中
redis 配置
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-data-redis
</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce
</groupId>
<artifactId>lettuce-core
</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients
</groupId>
<artifactId>jedis
</artifactId>
</dependency>
#最小 活跃 最大 最大等待
spring.redis.jedis.pool
.min-idle=5
.max-active=10
.max-idle=10
.max-wait=2000
# port host pwd timeout
spring.redis
.port=6379
.host=192.168.2.198
.password=123456
.timeout=1000
自动生成,redistTemplate,StringRedisTemplate
Redis 的 Lua编程
@Autowired
StringRedisTemplate stringRedisTemplate
= null
;
String purchaseScript
=
" redis.call('sadd', KEYS[1], ARGV[2]) \n"
+ "local productPurchaseList = KEYS[2]..ARGV[2] \n"
+ "local userId = ARGV[1] \n"
+ "local product = 'product_'..ARGV[2] \n"
+ "local quantity = tonumber(ARGV[3]) \n"
+ "local stock = tonumber(redis.call('hget', product, 'stock')) \n"
+ "local price = tonumber(redis.call('hget', product, 'price')) \n"
+ "local purchase_date = ARGV[4] \n"
+ "if stock < quantity then return 0 end \n"
+ "stock = stock - quantity \n"
+ "redis.call('hset', product, 'stock', tostring(stock)) \n"
+ "local sum = price * quantity \n"
+ "local purchaseRecord = userId..','..quantity..','"
+ "..sum..','..price..','..purchase_date \n"
+ "redis.call('rpush', productPurchaseList, purchaseRecord) \n"
+ "return 1 \n";
private static final String PURCHASE_PRODUCT_LIST
= "purchase_list_";
private static final String PRODUCT_SCHEDULE_SET
= "product_schedule_set";
private String sha1
= null
;
@Override
public boolean purchaseRedis(Long userId
, Long productId
, int quantity
) {
Long purchaseDate
= System
.currentTimeMillis();
Jedis jedis
= null
;
try {
jedis
= (Jedis
) stringRedisTemplate
.getConnectionFactory().getConnection().getNativeConnection();
if (sha1
== null
) {
sha1
= jedis
.scriptLoad(purchaseScript
);
}
Object res
= jedis
.evalsha(sha1
, 2, PRODUCT_SCHEDULE_SET
,
PURCHASE_PRODUCT_LIST
, userId
+ "", productId
+ "",
quantity
+ "", purchaseDate
+ "");
Long result
= (Long
) res
;
return result
== 1;
} finally {
if (jedis
!= null
&& jedis
.isConnected()) {
jedis
.close();
}
}
}
保存购买信息将购买记录 保存到 数据库中
@Override
@Transactional(propagation
= Propagation
.REQUIRES_NEW
)
public boolean dealRedisPurchase(List
<PurchaseRecordPo> prpList
) {
for (PurchaseRecordPo prp
: prpList
) {
purchaseRecordDao
.insertPurchaseRecord(prp
);
productDao
.decreaseProduct(prp
.getProductId(), prp
.getQuantity());
}
return true;
}
定时任务,把redis中数据保存到数据
@EnableScheduling
@Service
public class TaskServiceImpl implements TaskService {
@Autowired
private StringRedisTemplate stringRedisTemplate
= null
;
@Autowired
private PurchaseService purchaseService
= null
;
private static final String PRODUCT_SCHEDULE_SET
= "product_schedule_set";
private static final String PURCHASE_PRODUCT_LIST
= "purchase_list_";
private static final int ONE_TIME_SIZE
= 1000;
@Scheduled(fixedRate
= 1000 * 60)
public void purchaseTask() {
System
.out
.println("定时任务开始......");
Set
<String> productIdList
= stringRedisTemplate
.opsForSet().members(PRODUCT_SCHEDULE_SET
);
List
<PurchaseRecordPo> prpList
= new ArrayList<>();
for (String productIdStr
: productIdList
) {
Long productId
= Long
.parseLong(productIdStr
);
String purchaseKey
= PURCHASE_PRODUCT_LIST
+ productId
;
BoundListOperations
<String, String> ops
= stringRedisTemplate
.boundListOps(purchaseKey
);
long size
= stringRedisTemplate
.opsForList().size(purchaseKey
);
Long times
= size
% ONE_TIME_SIZE
== 0 ?
size
/ ONE_TIME_SIZE
: size
/ ONE_TIME_SIZE
+ 1;
for (int i
= 0; i
< times
; i
++) {
List
<String> prList
= null
;
if (i
== 0) {
prList
= ops
.range(i
* ONE_TIME_SIZE
,
(i
+ 1) * ONE_TIME_SIZE
);
} else {
prList
= ops
.range(i
* ONE_TIME_SIZE
+ 1,
(i
+ 1) * ONE_TIME_SIZE
);
}
for (String prStr
: prList
) {
PurchaseRecordPo prp
= this.createPurchaseRecord(productId
, prStr
);
prpList
.add(prp
);
}
try {
purchaseService
.dealRedisPurchase(prpList
);
} catch (Exception ex
) {
ex
.printStackTrace();
}
prpList
.clear();
}
stringRedisTemplate
.delete(purchaseKey
);
stringRedisTemplate
.opsForSet()
.remove(PRODUCT_SCHEDULE_SET
, productIdStr
);
}
System
.out
.println("定时任务结束......");
}
private PurchaseRecordPo
createPurchaseRecord(
Long productId
, String prStr
) {
String
[] arr
= prStr
.split(",");
Long userId
= Long
.parseLong(arr
[0]);
int quantity
= Integer
.parseInt(arr
[1]);
double sum
= Double
.valueOf(arr
[2]);
double price
= Double
.valueOf(arr
[3]);
Long time
= Long
.parseLong(arr
[4]);
Timestamp purchaseTime
= new Timestamp(time
);
PurchaseRecordPo pr
= new PurchaseRecordPo();
pr
.setProductId(productId
);
pr
.setPurchaseTime(purchaseTime
);
pr
.setPrice(price
);
pr
.setQuantity(quantity
);
pr
.setSum(sum
);
pr
.setUserId(userId
);
pr
.setNote("购买日志,时间:" + purchaseTime
.getTime());
return pr
;
}
}
测试
rieds命令 执行 命令
hmset product_1 id 1 stock 3000 price 5.00
redis里面会存在键:product_1有3列:第一列:id 1 。 stock 2997。 price 5:00
从性能上来讲,只需要6s的时间,比锁 快了 数倍
使用redis 建议使用 独立的Redis 服务器,做好备份,容灾。
private static final String PURCHASE_PRODUCT_LIST
= "purchase_list_";
private static final String PRODUCT_SCHEDULE_SET
= "product_schedule_set";
执行过之后redis,product_schedule_set 为 1 (row) 1 (value)
purchase_list_1 为: 1 1,1,5,5,1593581880782
脚本的执行返回值为 1
每次抢购 product_1 stock会减少
purchase_list_1 会增加一行
定时任务结束后 product_1 之外的两个 redis 清楚