暢購商城(十四):秒殺系統「下」
發表時(shí)間:2020-10-19
發布人(rén):融晨科技
浏覽次數:78
好好學習,天天向上(shàng)
本文已收錄至我的(de)Github倉庫DayDayUP:github.com/RobodLee/DayDayUP,歡迎Star
- 暢購商城(一):環境搭建
- 暢購商城(二):分布式文件系統FastDFS
- 暢購商城(三):商品管理
- 暢購商城(四):Lua、OpenResty、Canal實現廣告緩存與同步
- 暢購商城(五):Elasticsearch實現商品搜索
- 暢購商城(六):商品搜索
- 暢購商城(七):Thymeleaf實現靜态頁
- 暢購商城(八):微服務網關和(hé / huò)JWT令牌
- 暢購商城(九):Spring Security Oauth2
- 暢購商城(十):購物車
- 暢購商城(十一):訂單
- 暢購商城(十二):接入微信支付
- 暢購商城(十三):秒殺系統「上(shàng)」
- 暢購商城(十四):秒殺系統「下」
防止秒殺重複排隊
回顧一下上(shàng)一篇文章中講到(dào)的(de)下單的(de)流程。當用戶點擊下單之(zhī)後,用戶名和(hé / huò)商品id就(jiù)會組裝成一個(gè)SeckillStatus對象存入Redis隊列中等待被處理,這(zhè)個(gè)過程叫做排隊。所以(yǐ)說(shuō),隻要(yào / yāo)用戶點擊了(le/liǎo)一次下單後不(bù)論最後是(shì)否下單成功,他(tā)都會進入到(dào)排隊的(de)狀态。如果用戶重複點擊下單,那麽Redis隊列中就(jiù)會有很多個(gè)相同的(de)SeckillStatus對象,也(yě)就(jiù)是(shì)一個(gè)用戶排隊多次,這(zhè)顯然是(shì)不(bù)符合邏輯的(de),一個(gè)用戶應該隻能排隊一次。
爲(wéi / wèi)了(le/liǎo)避免用戶重複排隊的(de)情況,可以(yǐ)爲(wéi / wèi)每個(gè)用戶在(zài)Redis中設置一個(gè)自增值,每次排隊的(de)時(shí)候加1,如果大(dà)于(yú)1,說(shuō)明重複排隊了(le/liǎo),那麽直接抛出(chū)異常,告訴用戶重複排隊了(le/liǎo)。
//SeckillOrderServiceImpl
@Override
public boolean add(Long id, String time, String username) {
Long increment = redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).increment(username, 1);
if (increment>1) { //記錄指定hashkey的(de)增量,大(dà)于(yú)1說(shuō)明排隊次數超過1次,重複排隊
throw new RuntimeException("重複排隊");
}
…………
}
這(zhè)段代碼中,添加了(le/liǎo)對用戶重複排隊的(de)判斷,先自增1,再進行判斷。這(zhè)裏的(de)key設置的(de)是(shì)username,因爲(wéi / wèi)一個(gè)用戶隻能下單一件商品,如果去下單其它商品,同樣也(yě)是(shì)重複排隊。
測試了(le/liǎo)一下,是(shì)成功的(de)。但是(shì)有一個(gè)問題:如果用戶在(zài)這(zhè)裏排隊未成功,該怎麽清理排隊信息呢?這(zhè)個(gè)下一步就(jiù)會說(shuō),接着往下看👇
并發超賣問題解決
現在(zài)的(de)代碼看似很完美,但是(shì)漏洞百出(chū),比如就(jiù)存在(zài)并發超賣的(de)問題。爲(wéi / wèi)什麽這(zhè)麽說(shuō),看代碼說(shuō)話:
這(zhè)個(gè)是(shì)多線程下單的(de)方法,流程是(shì)查庫存——>下單——>減庫存。假如現在(zài)有件商品還剩1件,正好有多個(gè)線程同時(shí)走到(dào)了(le/liǎo)查詢庫存這(zhè)一步,結果查出(chū)來(lái)都是(shì)一件,然後這(zhè)三個(gè)線程就(jiù)可以(yǐ)往下接着走,最後三個(gè)線程都成功下單了(le/liǎo),不(bù)就(jiù)多賣了(le/liǎo)兩件嘛。所以(yǐ)這(zhè)段代碼還存在(zài)問題,那怎麽解決呢?可不(bù)可以(yǐ)采用加鎖的(de)方法,不(bù)可以(yǐ)。因爲(wéi / wèi)如果是(shì)在(zài)集群環境下,一台機器上(shàng)多個(gè)線程走到(dào)了(le/liǎo)同一步确實可以(yǐ)鎖住防止超賣,但是(shì)不(bù)同機器上(shàng)的(de)線程走到(dào)了(le/liǎo)同一部就(jiù)鎖不(bù)住了(le/liǎo)。
所以(yǐ)可以(yǐ)采用Redis隊列的(de)方式去解決。
給每個(gè)sku創建一個(gè)隊列,比如id爲(wéi / wèi)4399的(de)商品數量爲(wéi / wèi)4,那麽就(jiù)在(zài)4399的(de)隊列裏放入4件商品。然後每次查詢就(jiù)從隊列裏去取,假如現在(zài)有五個(gè)線程去查庫存,因爲(wéi / wèi)隻有4件商品,所以(yǐ)5個(gè)線程隻有4個(gè)線程能夠查詢出(chū)庫存。因爲(wéi / wèi)Redis是(shì)單線程的(de),所以(yǐ)不(bù)會出(chū)現多個(gè)線程同時(shí)訪問數據出(chū)錯的(de)情況,這(zhè)樣就(jiù)可以(yǐ)避免并發超賣的(de)問題。
之(zhī)前在(zài)SeckillGoodsPushTask中隻是(shì)将商品存入Redis中,現在(zài)再加一步,爲(wéi / wèi)每個(gè)sku都創建一個(gè)隊列并存入庫存數量的(de)數據到(dào)隊列中。
//定時(shí)将秒殺商品加載到(dào)redis中
@Scheduled(cron = "0/5 * * * * ?")
public void loadGoodsPushRedis() {
…………
for (SeckillGoods seckillGood : seckillGoods) {
boundHashOperations.put(seckillGood.getId(),seckillGood); //把商品存入到(dào)redis
redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGood.getId())
.leftPushAll(getGoodsNumber(seckillGood.getNum())); //存到(dào)Redis隊列
}
}
}
//獲取秒殺商品數量的(de)數組
public Byte[] getGoodsNumber(int num) {
Byte[] arr = new Byte[num];
for (int i = 0; i < num; i++) {
arr[i] = '0';
}
return arr;
}
隊列的(de)内容就(jiù)是(shì)商品數量的(de)Byte,視頻中用的(de)是(shì)商品id,但是(shì)商品id是(shì)Long型的(de),Byte比Long要(yào / yāo)省空間,而(ér)且放什麽無所謂關鍵是(shì)放幾個(gè),所以(yǐ)我就(jiù)放了(le/liǎo)對應數量的(de)Byte進去。
接下來(lái)就(jiù)該在(zài)下單之(zhī)前獲取庫存的(de)信息:
@Async
public void createOrder() {
…………
//從秒殺商品隊列中獲取數據,如果獲取不(bù)到(dào)則說(shuō)明已經賣完了(le/liǎo),清除掉排隊信息
Object o = redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId())
.rightPop();
if (o == null) {
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(seckillStatus.getUsername()); //清除排隊隊列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(seckillStatus.getUsername()); //排隊狀态隊列
return;
}
//創建秒殺訂單
…………
}
如果商品庫存不(bù)足,那麽應該清除掉排隊的(de)信息,否則用戶該商品下不(bù)了(le/liǎo)單還不(bù)能下單其它商品。這(zhè)裏将排隊的(de)隊列以(yǐ)及查詢狀态的(de)隊列清除了(le/liǎo)。
同步庫存不(bù)精準問題
解決了(le/liǎo)并發超賣的(de)問題之(zhī)後,還有一個(gè)庫存數量不(bù)精準的(de)問題。這(zhè)個(gè)問題出(chū)現的(de)原因和(hé / huò)超賣問題類似,假如現在(zài)同時(shí)有兩個(gè)線程下單完成了(le/liǎo)開始遞減庫存,A線程查詢出(chū)庫存有3個(gè),B線程也(yě)查詢出(chū)庫存有3個(gè),然後它們同時(shí)遞減,都是(shì)2個(gè),寫到(dào)了(le/liǎo)數據庫中。其實此時(shí)庫存應該還剩一個(gè)。
解決的(de)辦法也(yě)很簡單,因爲(wéi / wèi)現在(zài)是(shì)調用**seckillGoods.getStockCount()**查詢出(chū)的(de)庫存,那我們就(jiù)不(bù)用這(zhè)個(gè)查詢,直接用上(shàng)一節中的(de)隊列,隊列中剩餘多少就(jiù)說(shuō)明現在(zài)的(de)庫存是(shì)多少,絕對準确。
@Async
public void createOrder() {
…………
//減庫存,如果庫存沒了(le/liǎo)就(jiù)從redis中删除,并将庫存數據寫到(dào)MySQL中
//seckillGoods.setStockCount(seckillGoods.getStockCount()-1);
Long size = redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId()).size();//獲取庫存
//if (seckillGoods.getStockCount() <= 0) {
seckillGoods.setNum(size.intValue());
if (size <= 0) {
seckillGoodsBoundHashOps.delete(seckillStatus.getGoodsId());
seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);
} else {
seckillGoodsBoundHashOps.put(seckillStatus.getGoodsId(),seckillGoods);
}
//創建秒殺訂單
…………
}
秒殺支付
改造二維碼創建及支付結果通知方法
秒殺支付的(de)流程和(hé / huò)之(zhī)前做的(de)類似,隻不(bù)過現在(zài)秒殺訂單的(de)支付狀态發送到(dào)Queue2中,普通訂單還是(shì)發送到(dào)queue1中,但是(shì)我們怎麽知道(dào)該将訂單的(de)支付狀态發送給queue1還是(shì)queue2呢?如果微信服務器可以(yǐ)将MQ隊列的(de)exchange和(hé / huò)routingKey返回給我們就(jiù)好了(le/liǎo),這(zhè)樣我們就(jiù)可以(yǐ)動态地(dì / de)指定要(yào / yāo)發送的(de)MQ了(le/liǎo)。
從微信支付的(de)官方文檔中我們可以(yǐ)知道(dào)在(zài)創建二維碼和(hé / huò)接收支付結果的(de)參數中都有一個(gè)attach參數
這(zhè)個(gè)是(shì)自定義的(de)數據,也(yě)就(jiù)是(shì)說(shuō)我們在(zài)創建二維碼的(de)時(shí)候發送給微信服務器什麽,返回支付結果的(de)時(shí)候就(jiù)會返回給我們什麽。所以(yǐ)在(zài)創建二維碼的(de)時(shí)候由前端将指定的(de)exchange和(hé / huò)routingKey發送給後端,然後再添加到(dào)attach參數中。就(jiù)可以(yǐ)實現将不(bù)同的(de)訂單動态地(dì / de)發送到(dào)指定的(de)隊列了(le/liǎo)。
普通訂單:exchange:exchange.order routingKey:routing.order
秒殺訂單:exchange:exchange.seckill_order routingKey:routing.seckill_order
由于(yú)之(zhī)前寫的(de)createNative方法是(shì)接收一個(gè)order對象,所以(yǐ)在(zài)Order裏面添加兩個(gè)字段:
private String exchange; //mq交換機的(de)名稱
private String routingKey; //mq的(de)路由鍵
修改之(zhī)前**createNative()**的(de)代碼,添加attach參數,
@Override
public Map<String, String> createNative(Order order) {
…………
//獲取exchange和(hé / huò)routingKey,封裝程map集合,添加到(dào)attach參數中
String exchange = order.getExchange();
String routingKey = order.getRoutingKey();
Map<String,String> attachMap = new HashMap<>(2);
attachMap.put("exchange",exchange);
attachMap.put("routingKey",routingKey);
String attach = JSON.toJSONString(attachMap);
map.put("attach",attach);
…………
}
然後再修改**WeChatPayController.notifyUrl()**方法,從服務器返回的(de)Map集合中獲取attach,并從attach中獲取exchange和(hé / huò)routingKey。
@RequestMapping("/notify/url")
public String notifyUrl(HttpServletRequest request) throws Exception {
…………
Map<String, String> xmlMap = WXPayUtil.xmlToMap(xmlString);
String attach = xmlMap.get("attach");
Map<String, String> attachMap = JSONObject.parseObject(attach, Map.class);
//将java對象轉換成amqp消息發送出(chū)去,調用的(de)是(shì)send方法
//rabbitTemplate.convertAndSend("exchange.order","routing.order", xmlString);
rabbitTemplate.convertAndSend(attachMap.get("exchange"),attachMap.get("routingKey"), xmlString);
…………
}
監聽秒殺
前面已經将消息發送到(dào)消息隊列中了(le/liǎo)現在(zài)就(jiù)可以(yǐ)去監聽消息隊列了(le/liǎo)。
從流程圖中可以(yǐ)看到(dào),在(zài)寫監聽的(de)方法之(zhī)前,需要(yào / yāo)有兩個(gè)方法:改訂單狀态和(hé / huò)删除訂單。
SeckillOrderServiceImpl.updatePayStatus
public void updatePayStatus(String username, String transactionId, String endTime) {
//從Redis中将訂單信息查詢出(chū)來(lái)
SeckillOrder order = (SeckillOrder) redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY)
.get(username);
if (order != null) {
try {
order.setStatus("1");
order.setTransactionId(transactionId);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
order.setPayTime(simpleDateFormat.parse(endTime));
seckillOrderMapper.insertSelective(order); //将訂單信息存到(dào)mysql中
//删除redis中的(de)訂單信息
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
//删除用戶的(de)排隊信息
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(username); //清除排隊隊列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username); //排隊狀态隊列
} catch (ParseException e) {
e.printStackTrace();
}
}
}
首先将訂單信息從Redis中查詢出(chū)來(lái),将訂單狀态改爲(wéi / wèi)已支付,然後将交易流水号和(hé / huò)支付時(shí)間補充完整存入MySQL。這(zhè)時(shí)候交易已經完成了(le/liǎo),可以(yǐ)将訂單信息從Redis中删除,并将用戶的(de)排隊信息也(yě)一并删除。
SeckillOrderServiceImpl.deleteOrder
public void deleteOrder(String username) {
//删除Redis中的(de)訂單
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
//删除用戶的(de)排隊信息
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_QUEUE_COUNT).delete(username); //清除排隊隊列
redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username); //排隊狀态隊列
//查詢出(chū)秒殺的(de)狀态信息
SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY)
.get(username);
//回滾庫存
SeckillGoods seckillGoods = (SeckillGoods) redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime())
.get(seckillStatus.getGoodsId());
if (seckillGoods == null) {
seckillGoodsMapper.selectByPrimaryKey(seckillGoods.getId());
seckillGoods.setStockCount(seckillGoods.getStockCount()+1);
seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);
} else {
seckillGoods.setStockCount(seckillGoods.getStockCount()+1);
}
redisTemplate
.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime())
.put(seckillGoods.getId(),seckillGoods);
//将商品放入隊列
redisTemplate.boundListOps(SystemConstants.SEC_KILL_GOODS_COUNT_LIST + seckillGoods.getId())
.leftPush("0");
}
支付失敗了(le/liǎo),應該将訂單删除掉。首先将Redis中的(de)訂單删除,然後删除用戶的(de)排隊信息。接着回滾庫存,如果Redis中沒有則說(shuō)明已經賣完了(le/liǎo),就(jiù)從MySQL中查詢出(chū)來(lái)然後将商品數量加1再存入MySQL;如果Redis中有數據就(jiù)将Redis中的(de)商品數量加1即可。上(shàng)面講防止并發超賣的(de)時(shí)候不(bù)是(shì)爲(wéi / wèi)每個(gè)商品都在(zài)Redis隊列中存放了(le/liǎo)一下麽,所以(yǐ)最後将商品放回到(dào)隊列中。
SeckillMessageListener
@Component
@RabbitListener(queues = "queue.seckillorder")
public class SeckillMessageListener {
@Autowired
private SeckillOrderService seckillOrderService;
//https://pay.weixin.qq.com/wiki/doc/api/native.php?chapter=9_7&index=8
@RabbitHandler
public void getMessage(String message) {
try {
Map<String, String> resultMap = JSON.parseObject(message,Map.class);
String returnCode = resultMap.get("return_code"); //狀态碼
if ("SUCCESS".equals(returnCode)) {
String resultCode = resultMap.get("result_code"); //業務結果
String attach = resultMap.get("attach");
Map<String,String> attachMap = JSON.parseObject(attach,Map.class);
if ("SUCCESS".equals(resultCode)) {
//改訂單狀态
seckillOrderService.updatePayStatus(attachMap.get("username"),
resultMap.get("transaction_id"),resultMap.get("time_end"));
} else {
//删除訂單
seckillOrderService.deleteOrder(attachMap.get("username"));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
這(zhè)個(gè)方法使用來(lái)監聽秒殺隊列的(de)消息的(de),exchange和(hé / huò)queue需要(yào / yāo)我們手動地(dì / de)在(zài)RabbitMQ的(de)網頁中創建并進行綁定
在(zài)該方法中,首先去讀取狀态碼和(hé / huò)業務結果,如果都爲(wéi / wèi)“SUCCESS”的(de)話則說(shuō)明訂單支付成功,修改訂單的(de)狀态。反之(zhī)訂單支付失敗,删除訂單。
總結
文章鴿了(le/liǎo)快一個(gè)月了(le/liǎo),終于(yú)補上(shàng)了(le/liǎo),主要(yào / yāo)是(shì)上(shàng)篇文章寫完後就(jiù)在(zài)做個(gè)小東西,然後就(jiù)是(shì)國(guó)慶節放假,在(zài)家待着有點懶。回校後又在(zài)參加電賽,沒時(shí)間。所以(yǐ)一路鴿到(dào)現在(zài)。
這(zhè)篇文章主要(yào / yāo)是(shì)将之(zhī)前的(de)秒殺流程進行一個(gè)完善,實現了(le/liǎo)防止秒殺重複排隊,解決并發超賣的(de)問題,并解決了(le/liǎo)同步庫存不(bù)精準的(de)問題。最後實現了(le/liǎo)秒殺支付。
碼字不(bù)易,可以(yǐ)的(de)話,給我來(lái)個(gè)
點贊
,收藏
,關注
如果你喜歡我的(de)文章,歡迎關注微信公衆号 『 R o b o d 』
代碼:https://github.com/RobodLee/changgou
本文已收錄至我的(de)Github倉庫DayDayUP:github.com/RobodLee/DayDayUP,歡迎Star