秒杀系统常见问题

超卖问题

秒杀系统主要应用在商品抢购的场景,比如:

  • 电商抢购限量商品
  • 卖演唱会的门票
  • 火车票抢座

秒杀系统抽象来说就是以下几个步骤:

  • 用户选定商品下单
  • 校验库存
  • 扣库存
  • 创建用户订单

对于秒杀系统来说,严格防止超卖是一件十分重要的事情。下面我们看一下超卖问题出现的原因与解决办法。

建立“简易”的数据库表结构

一张库存表stock,一张订单表stock_order

-- ----------------------------
-- Table structure for stock
-- ----------------------------
DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(50) NOT NULL DEFAULT '' COMMENT '名称',
  `count` int(11) NOT NULL COMMENT '库存',
  `sale` int(11) NOT NULL COMMENT '已售',
  `version` int(11) NOT NULL COMMENT '乐观锁,版本号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for stock_order
-- ----------------------------
DROP TABLE IF EXISTS `stock_order`;
CREATE TABLE `stock_order` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `sid` int(11) NOT NULL COMMENT '库存ID',
  `name` varchar(30) NOT NULL DEFAULT '' COMMENT '商品名称',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

相应代码

Controller层代码

提供一个HTTP接口: 参数为商品的Id

@RequestMapping("/createWrongOrder/{sid}")
@ResponseBody
public String createWrongOrder(@PathVariable int sid) {
    LOGGER.info("购买物品编号sid=[{}]", sid);
    int id = 0;
    try {
        id = orderService.createWrongOrder(sid);
        LOGGER.info("创建订单id: [{}]", id);
    } catch (Exception e) {
        LOGGER.error("Exception", e);
    }
    return String.valueOf(id);
}
Service层代码
@Override
public int createWrongOrder(int sid) throws Exception {
    //校验库存
    Stock stock = checkStock(sid);
    //扣库存
    saleStock(stock);
    //创建订单
    int id = createOrder(stock);
    return id;
}

private Stock checkStock(int sid) {
    Stock stock = stockService.getStockById(sid);
    if (stock.getSale().equals(stock.getCount())) {
        throw new RuntimeException("库存不足");
    }
    return stock;
}

private int saleStock(Stock stock) {
    stock.setSale(stock.getSale() + 1);
    return stockService.updateStockById(stock);
}

private int createOrder(Stock stock) {
    StockOrder order = new StockOrder();
    order.setSid(stock.getId());
    order.setName(stock.getName());
    int id = orderMapper.insertSelective(order);
    return id;
}

发起并发购买请求

https://www.cnblogs.com/stulzq/p/8971531.html
在JMeter里启动1000个线程,无延迟同时访问接口。模拟1000个人,抢购100个产品的场景。点击启动。

结果卖出了14个,库存减少了14个,但是每个请求Spring都处理了,创建了1000个订单。

避免超卖问题:更新商品库存的版本号

为了解决上面的超卖问题,我们当然可以在Service层给更新表添加一个事务,这样每个线程更新请求的时候都会先去锁表的这一行(悲观锁),更新完库存后再释放锁。可这样就太慢了,1000个线程可等不及。

我们需要乐观锁。

一个最简单的办法就是,给每个商品库存一个版本号version字段

我们修改代码:

Controller层
/**
 * 乐观锁更新库存
 * @param sid
 * @return
 */
@RequestMapping("/createOptimisticOrder/{sid}")
@ResponseBody
public String createOptimisticOrder(@PathVariable int sid) {
    int id;
    try {
        id = orderService.createOptimisticOrder(sid);
        LOGGER.info("购买成功,剩余库存为: [{}]", id);
    } catch (Exception e) {
        LOGGER.error("购买失败:[{}]", e.getMessage());
        return "购买失败,库存不足";
    }
    return String.format("购买成功,剩余库存为:%d", id);
Service层
@Override
public int createOptimisticOrder(int sid) throws Exception {
    //校验库存
    Stock stock = checkStock(sid);
    //乐观锁更新库存
    saleStockOptimistic(stock);
    //创建订单
    int id = createOrder(stock);
    return stock.getCount() - (stock.getSale()+1);
}

private void saleStockOptimistic(Stock stock) {
    LOGGER.info("查询数据库,尝试更新库存");
    int count = stockService.updateStockByOptimistic(stock);
    if (count == 0){
        throw new RuntimeException("并发更新库存失败,version不匹配") ;
    }
}
Mapper
<update id="updateByOptimistic" parameterType="cn.monitor4all.miaoshadao.dao.Stock">
    update stock
    <set>
      sale = sale + 1,
      version = version + 1,
    </set>
    WHERE id = #{id,jdbcType=INTEGER}
    AND version = #{version,jdbcType=INTEGER}
  </update>

我们在实际减库存的SQL操作中,首先判断version是否是我们查询库存时候的version,如果是,扣减库存,成功抢购。如果发现version变了,则不更新数据库,返回抢购失败。

再次打开JMeter,把库存恢复为100,清空订单表,发起1000次请求。

这次的结果是:

卖出去了39个,version更新为了39,同时创建了39个订单。我们没有超卖,可喜可贺。

由于并发访问的原因,很多线程更新库存失败了,所以在我们这种设计下,1000个人真要是同时发起购买,只有39个幸运儿能够买到东西,但是我们防止了超卖。

令牌桶限流 + 再谈超卖

接口限流

在面临高并发的请购请求时,我们如果不对接口进行限流,可能会对后台系统造成极大的压力。尤其是对于下单的接口,过多的请求打到数据库会对系统的稳定性造成影响。

所以秒杀系统会尽量选择独立于公司其他后端系统之外进行单独部署,以免秒杀业务崩溃影响到其他系统。

除了独立部署秒杀业务之外,我们能够做的就是尽量让后台系统稳定优雅的处理大量请求。

令牌桶算法与漏桶算法

漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

令牌桶算法不能与另外一种常见算法漏桶算法相混淆。这两种算法的主要区别在于:

漏桶算法能够强行限制数据的传输速率,而令牌桶算法在能够限制数据的平均传输速率外,还允许某种程度的突发传输。在令牌桶算法中,只要令牌桶中存在令牌,那么就允许突发地传输数据直到达到用户配置的门限,因此它适合于具有突发特性的流量。

使用Guava的RateLimiter实现令牌桶限流接口

Guava是只提供了令牌桶的一种实现,实际项目中肯定还要根据需求来使用或者自己实现,大家可以看看这篇文章:
https://segmentfault.com/a/1190000012875897

OrderController

@Controller
public class OrderController {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);

    @Autowired
    private StockService stockService;

    @Autowired
    private OrderService orderService;

    //每秒放行10个请求
    RateLimiter rateLimiter = RateLimiter.create(10);

    @RequestMapping("/createWrongOrder/{sid}")
    @ResponseBody
    public String createWrongOrder(@PathVariable int sid) {
        int id = 0;
        try {
            id = orderService.createWrongOrder(sid);
            LOGGER.info("创建订单id: [{}]", id);
        } catch (Exception e) {
            LOGGER.error("Exception", e);
        }
        return String.valueOf(id);
    }

    /**
     * 乐观锁更新库存 + 令牌桶限流
     * @param sid
     * @return
     */
    @RequestMapping("/createOptimisticOrder/{sid}")
    @ResponseBody
    public String createOptimisticOrder(@PathVariable int sid) {
        // 阻塞式获取令牌
        //LOGGER.info("等待时间" + rateLimiter.acquire());
        // 非阻塞式获取令牌
        if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) {
            LOGGER.warn("你被限流了,真不幸,直接返回失败");
            return "购买失败,库存不足";
        }
        int id;
        try {
            id = orderService.createOptimisticOrder(sid);
            LOGGER.info("购买成功,剩余库存为: [{}]", id);
        } catch (Exception e) {
            LOGGER.error("购买失败:[{}]", e.getMessage());
            return "购买失败,库存不足";
        }
        return String.format("购买成功,剩余库存为:%d", id);
    }
}

代码中,RateLimiter rateLimiter = RateLimiter.create(10);这里初始化了令牌桶类,每秒放行10个请求。

在接口中,可以看到有两种使用方法:

  1. 阻塞式获取令牌:请求进来后,若令牌桶里没有足够的令牌,就在这里阻塞住,等待令牌的发放。
  2. 非阻塞式获取令牌:请求进来后,若令牌桶里没有足够的令牌,会尝试等待设置好的时间(这里写了1000ms),其会自动判断在1000ms后,这个请求能不能拿到令牌,如果不能拿到,直接返回抢购失败。如果timeout设置为0,则等于阻塞时获取令牌。

再谈防止超卖

讲完了令牌桶限流算法,我们再回头思考超卖的问题,在海量请求的场景下,如果像第一篇文章那样的使用乐观锁,会导致大量的请求返回抢购失败,用户体验极差。

然而使用悲观锁,比如数据库事务,则可以让数据库一个个处理库存数修改,修改成功后再迎接下一个请求,所以在不同情况下,应该根据实际情况使用悲观锁和乐观锁。

实现不需要版本号字段的乐观锁

<update id="updateByOptimistic" parameterType="cn.monitor4all.miaoshadao.dao.Stock">
    update stock
    <set>
      sale = sale + 1,
    </set>
    WHERE id = #{id,jdbcType=INTEGER}
    AND sale = #{sale,jdbcType=INTEGER}
</update>

实现悲观锁

Controller

/**
 * 事务for update更新库存
 * @param sid
 * @return
 */
@RequestMapping("/createPessimisticOrder/{sid}")
@ResponseBody
public String createPessimisticOrder(@PathVariable int sid) {
    int id;
    try {
        id = orderService.createPessimisticOrder(sid);
        LOGGER.info("购买成功,剩余库存为: [{}]", id);
    } catch (Exception e) {
        LOGGER.error("购买失败:[{}]", e.getMessage());
        return "购买失败,库存不足";
    }
    return String.format("购买成功,剩余库存为:%d", id);
}

在Service中,给该卖商品流程加上事务:

@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
@Override
public int createPessimisticOrder(int sid){
    //校验库存(悲观锁for update)
    Stock stock = checkStockForUpdate(sid);
    //更新库存
    saleStock(stock);
    //创建订单
    int id = createOrder(stock);
    return stock.getCount() - (stock.getSale());
}

/**
 * 检查库存 ForUpdate
 * @param sid
 * @return
 */
private Stock checkStockForUpdate(int sid) {
    Stock stock = stockService.getStockByIdForUpdate(sid);
    if (stock.getSale().equals(stock.getCount())) {
        throw new RuntimeException("库存不足");
    }
    return stock;
}

/**
 * 更新库存
 * @param stock
 */
private void saleStock(Stock stock) {
    stock.setSale(stock.getSale() + 1);
    stockService.updateStockById(stock);
}

/**
 * 创建订单
 * @param stock
 * @return
 */
private int createOrder(Stock stock) {
    StockOrder order = new StockOrder();
    order.setSid(stock.getId());
    order.setName(stock.getName());
    int id = orderMapper.insertSelective(order);
    return id;
}

这里使用Spring的事务,@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED),如果遇到回滚,则返回Exception,并且事务传播使用PROPAGATION_REQUIRED–支持当前事务,如果当前没有事务,就新建一个事务。

所以,悲观锁在大量请求的请求下,有着更好的卖出成功率。但是需要注意的是,如果请求量巨大,悲观锁会导致后面的请求进行了长时间的阻塞等待,用户就必须在页面等待,很像是“假死”,可以通过配合令牌桶限流,或者是给用户显著的等待提示来优化。

抢购接口隐藏 + 单用户限制频率

抢购接口隐藏

抢购接口隐藏(接口加盐)的具体做法:

  • 每次点击秒杀按钮,先从服务器获取一个秒杀验证值(接口内判断是否到秒杀时间)。
  • Redis以缓存用户ID和商品ID为Key,秒杀地址为Value缓存验证值
  • 用户请求秒杀商品的时候,要带上秒杀验证值进行校验。

理论上来说在访问接口的时间上受到了限制,并且我们还能通过在验证值接口增加更复杂的逻辑,让获取验证值的接口并不快速返回验证值,进一步拉平普通用户和坏蛋们的下单时刻。所以接口加盐还是有用的!

加盐代码逻辑实现
代码还是使用之前的项目,我们在其上面增加两个接口:

  • 获取验证值接口
  • 携带验证值下单接口

由于之前我们只有两个表,一个stock表放库存商品,一个stockOrder订单表,放订购成功的记录。但是这次涉及到了用户,所以我们新增用户表,并且添加一个用户张三。并且在订单表中,不仅要记录商品id,同时要写入用户id。

整个SQL结构如下,讲究一个简洁,暂时不加入别的多余字段:

-- ----------------------------
-- Table structure for stock
-- ----------------------------
DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(50) NOT NULL DEFAULT '' COMMENT '名称',
  `count` int(11) NOT NULL COMMENT '库存',
  `sale` int(11) NOT NULL COMMENT '已售',
  `version` int(11) NOT NULL COMMENT '乐观锁,版本号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of stock
-- ----------------------------
INSERT INTO `stock` VALUES ('1', 'iphone', '50', '0', '0');
INSERT INTO `stock` VALUES ('2', 'mac', '10', '0', '0');

-- ----------------------------
-- Table structure for stock_order
-- ----------------------------
DROP TABLE IF EXISTS `stock_order`;
CREATE TABLE `stock_order` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `sid` int(11) NOT NULL COMMENT '库存ID',
  `name` varchar(30) NOT NULL DEFAULT '' COMMENT '商品名称',
  `user_id` int(11) NOT NULL DEFAULT '0',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of stock_order
-- ----------------------------

-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `user_name` varchar(255) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES ('1', '张三');
获取验证值接口

该接口要求传用户id和商品id,返回验证值,并且该验证值

Controller中添加方法:

/**
 * 获取验证值
 * @return
 */
@RequestMapping(value = "/getVerifyHash", method = {RequestMethod.GET})
@ResponseBody
public String getVerifyHash(@RequestParam(value = "sid") Integer sid,
                            @RequestParam(value = "userId") Integer userId) {
    String hash;
    try {
        hash = userService.getVerifyHash(sid, userId);
    } catch (Exception e) {
        LOGGER.error("获取验证hash失败,原因:[{}]", e.getMessage());
        return "获取验证hash失败";
    }
    return String.format("请求抢购验证hash值为:%s", hash);
}

UserService中添加方法:

@Override
public String getVerifyHash(Integer sid, Integer userId) throws Exception {

    // 验证是否在抢购时间内
    LOGGER.info("请自行验证是否在抢购时间内");


    // 检查用户合法性
    User user = userMapper.selectByPrimaryKey(userId.longValue());
    if (user == null) {
        throw new Exception("用户不存在");
    }
    LOGGER.info("用户信息:[{}]", user.toString());

    // 检查商品合法性
    Stock stock = stockService.getStockById(sid);
    if (stock == null) {
        throw new Exception("商品不存在");
    }
    LOGGER.info("商品信息:[{}]", stock.toString());

    // 生成hash
    String verify = SALT + sid + userId;
    String verifyHash = DigestUtils.md5DigestAsHex(verify.getBytes());

    // 将hash和用户商品信息存入redis
    String hashKey = CacheKey.HASH_KEY.getKey() + "_" + sid + "_" + userId;
    stringRedisTemplate.opsForValue().set(hashKey, verifyHash, 3600, TimeUnit.SECONDS);
    LOGGER.info("Redis写入:[{}] [{}]", hashKey, verifyHash);
    return verifyHash;
}

一个Cache常量枚举类CacheKey:

package cn.monitor4all.miaoshadao.utils;

public enum CacheKey {
    HASH_KEY("miaosha_hash"),
    LIMIT_KEY("miaosha_limit");

    private String key;

    private CacheKey(String key) {
        this.key = key;
    }
    public String getKey() {
        return key;
    }
}

可以看到在Service中,我们拿到用户id和商品id后,会检查商品和用户信息是否在表中存在,并且会验证现在的时间(我这里为了简化,只是写了一行LOGGER,大家可以根据需求自行实现)。在这样的条件过滤下,才会给出hash值。并且将Hash值写入了Redis中,缓存3600秒(1小时),如果用户拿到这个hash值一小时内没下单,则需要重新获取hash值。

想一下,这个hash值,如果每次都按照商品+用户的信息来md5,是不是不太安全呢。毕竟用户id并不一定是用户不知道的(就比如我这种用自增id存储的,肯定不安全),而商品id,万一也泄露了出去,那么如果再知到我们是简单的md5,那直接就把hash算出来了!

在代码里,我给hash值加了个前缀,也就是一个salt(盐),相当于给这个固定的字符串撒了一把盐,这个盐是HASH_KEY("miaosha_hash"),写死在了代码里。这样黑产只要不猜到这个盐,就没办法算出来hash值。

这也只是一种例子,实际中,你可以把盐放在其他地方, 并且不断变化,或者结合时间戳,这样就算自己的程序员也没法知道hash值的原本字符串是什么了。

携带验证值下单接口

用户在前台拿到了验证值后,点击下单按钮,前端携带着特征值,即可进行下单操作。

Controller中添加方法:

/**
 * 要求验证的抢购接口
 * @param sid
 * @return
 */
@RequestMapping(value = "/createOrderWithVerifiedUrl", method = {RequestMethod.GET})
@ResponseBody
public String createOrderWithVerifiedUrl(@RequestParam(value = "sid") Integer sid,
                                         @RequestParam(value = "userId") Integer userId,
                                         @RequestParam(value = "verifyHash") String verifyHash) {
    int stockLeft;
    try {
        stockLeft = orderService.createVerifiedOrder(sid, userId, verifyHash);
        LOGGER.info("购买成功,剩余库存为: [{}]", stockLeft);
    } catch (Exception e) {
        LOGGER.error("购买失败:[{}]", e.getMessage());
        return e.getMessage();
    }
    return String.format("购买成功,剩余库存为:%d", stockLeft);
}

OrderService中添加方法:

@Override
public int createVerifiedOrder(Integer sid, Integer userId, String verifyHash) throws Exception {

    // 验证是否在抢购时间内
    LOGGER.info("请自行验证是否在抢购时间内,假设此处验证成功");

    // 验证hash值合法性
    String hashKey = CacheKey.HASH_KEY.getKey() + "_" + sid + "_" + userId;
    String verifyHashInRedis = stringRedisTemplate.opsForValue().get(hashKey);
    if (!verifyHash.equals(verifyHashInRedis)) {
        throw new Exception("hash值与Redis中不符合");
    }
    LOGGER.info("验证hash值合法性成功");

    // 检查用户合法性
    User user = userMapper.selectByPrimaryKey(userId.longValue());
    if (user == null) {
        throw new Exception("用户不存在");
    }
    LOGGER.info("用户信息验证成功:[{}]", user.toString());

    // 检查商品合法性
    Stock stock = stockService.getStockById(sid);
    if (stock == null) {
        throw new Exception("商品不存在");
    }
    LOGGER.info("商品信息验证成功:[{}]", stock.toString());

    //乐观锁更新库存
    saleStockOptimistic(stock);
    LOGGER.info("乐观锁更新库存成功");

    //创建订单
    createOrderWithUserInfo(stock, userId);
    LOGGER.info("创建订单成功");

    return stock.getCount() - (stock.getSale()+1);
}

单用户限制频率

假设我们做好了接口隐藏,但是像我上面说的,总有无聊的人会写一个复杂的脚本,先请求hash值,再立刻请求购买,如果你的app下单按钮做的很差,大家都要开抢后0.5秒才能请求成功,那可能会让脚本依然能够在大家前面抢购成功。

我们需要在做一个额外的措施,来限制单个用户的抢购频率。

其实很简单的就能想到用redis给每个用户做访问统计,甚至是带上商品id,对单个商品做访问统计,这都是可行的。

我们先实现一个对用户的访问频率限制,我们在用户申请下单时,检查用户的访问次数,超过访问次数,则不让他下单!

使用Redis/Memcached

我们使用外部缓存来解决问题,这样即便是分布式的秒杀系统,请求被随意分流的情况下,也能做到精准的控制每个用户的访问次数。

Controller中添加方法:

/**
 * 要求验证的抢购接口 + 单用户限制访问频率
 * @param sid
 * @return
 */
@RequestMapping(value = "/createOrderWithVerifiedUrlAndLimit", method = {RequestMethod.GET})
@ResponseBody
public String createOrderWithVerifiedUrlAndLimit(@RequestParam(value = "sid") Integer sid,
                                                 @RequestParam(value = "userId") Integer userId,
                                                 @RequestParam(value = "verifyHash") String verifyHash) {
    int stockLeft;
    try {
        int count = userService.addUserCount(userId);
        LOGGER.info("用户截至该次的访问次数为: [{}]", count);
        boolean isBanned = userService.getUserIsBanned(userId);
        if (isBanned) {
            return "购买失败,超过频率限制";
        }
        stockLeft = orderService.createVerifiedOrder(sid, userId, verifyHash);
        LOGGER.info("购买成功,剩余库存为: [{}]", stockLeft);
    } catch (Exception e) {
        LOGGER.error("购买失败:[{}]", e.getMessage());
        return e.getMessage();
    }
    return String.format("购买成功,剩余库存为:%d", stockLeft);
}

UserService中增加两个方法:

  • addUserCount:每当访问订单接口,则增加一次访问次数,写入Redis
  • getUserIsBanned:从Redis读出该用户的访问次数,超过10次则不让购买了!不能让张三做法外狂徒。
@Override
    public int addUserCount(Integer userId) throws Exception {
        String limitKey = CacheKey.LIMIT_KEY.getKey() + "_" + userId;
        String limitNum = stringRedisTemplate.opsForValue().get(limitKey);
        int limit = -1;
        if (limitNum == null) {
            stringRedisTemplate.opsForValue().set(limitKey, "0", 3600, TimeUnit.SECONDS);
        } else {
            limit = Integer.parseInt(limitNum) + 1;
            stringRedisTemplate.opsForValue().set(limitKey, String.valueOf(limit), 3600, TimeUnit.SECONDS);
        }
        return limit;
    }

    @Override
    public boolean getUserIsBanned(Integer userId) {
        String limitKey = CacheKey.LIMIT_KEY.getKey() + "_" + userId;
        String limitNum = stringRedisTemplate.opsForValue().get(limitKey);
        if (limitNum == null) {
            LOGGER.error("该用户没有访问申请验证值记录,疑似异常");
            return true;
        }
        return Integer.parseInt(limitNum) > ALLOW_COUNT;
    }
能否不用Redis/Memcached实现用户访问频率统计

如果你说你不愿意用redis,有什么办法能够实现访问频率统计吗,有呀,如果你放弃分布式的部署服务,那么你可以在内存中存储访问次数,比如:

  • Google Guava的内存缓存
  • 状态模式

缓存与数据库双写问题的争议

缓存热点数据

在秒杀实际的业务中,一定有很多需要做缓存的场景,比如售卖的商品,包括名称,详情等。访问量很大的数据,可以算是“热点”数据了,尤其是一些读取量远大于写入量的数据,更应该被缓存,而不应该让请求打到数据库上。

缓存量大但又不常变化的数据,比如详情,评论等。对于那些经常变化的数据,其实并不适合缓存,一方面会增加系统的复杂性(缓存的更新,缓存脏数据),另一方面也给系统带来一定的不稳定性(缓存系统的维护)。

「但一些极端情况下,你需要将一些会变动的数据进行缓存,比如想要页面显示准实时的库存数,或者其他一些特殊业务场景。这时候你需要保证缓存不能(一直)有脏数据。

上缓存的优点:

  • 能够缩短服务的响应时间,给用户带来更好的体验。
  • 能够增大系统的吞吐量,依然能够提升用户体验。
  • 减轻数据库的压力,防止高峰期数据库被压垮,导致整个线上服务BOOM!

上了缓存,也会引入很多额外的问题:

  • 缓存有多种选型,是内存缓存,memcached还是redis,你是否都熟悉,如果不熟悉,无疑增加了维护的难度(本来是个纯洁的数据库系统)。
  • 缓存系统也要考虑分布式,比如redis的分布式缓存还会有很多坑,无疑增加了系统的复杂性。
  • 在特殊场景下,如果对缓存的准确性有非常高的要求,就必须考虑「缓存和数据库的一致性问题」。

缓存和数据库双写一致性

不使用更新缓存而是删除缓存

原因一:线程安全角度
同时有请求A和请求B进行更新操作,那么会出现

(1)线程A更新了数据库
(2)线程B更新了数据库
(3)线程B更新了缓存
(4)线程A更新了缓存

这就出现请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑。

原因二:业务场景角度
有如下两点:

(1)如果你是一个写数据库场景比较多,而读数据场景比较少的业务需求,采用这种方案就会导致,数据压根还没读到,缓存就被频繁的更新,浪费性能。
(2)如果你写入数据库的值,并不是直接写入缓存的,而是要经过一系列复杂的计算再写入缓存。那么,每次写入数据库后,都再次计算写入缓存的值,无疑是浪费性能的。显然,删除缓存更为适合。

其实如果业务非常简单,只是去数据库拿一个值,写入缓存,那么更新缓存也是可以的。但是,淘汰缓存操作简单,并且带来的副作用只是增加了一次cache miss,建议作为通用的处理方式。

先删除缓存,还是先操作数据库?

对于一个不能保证事务性的操作,一定涉及“哪个任务先做,哪个任务后做”的问题,解决这个问题的方向是:如果出现不一致,谁先做对业务的影响较小,就谁先执行。

假设先淘汰缓存,再写数据库:第一步淘汰缓存成功,第二步写数据库失败,则只会引发一次Cache miss。

假设先写数据库,再淘汰缓存:第一步写数据库操作成功,第二步淘汰缓存失败,则会出现DB中是新数据,Cache中是旧数据,数据不一致。

假如先删缓存,再更新数据库,该方案会导致请求数据不一致,比如同时有一个请求A进行更新操作,另一个请求B进行查询操作。那么会出现如下情形:

(1)请求A进行写操作,删除缓存
(2)请求B查询发现缓存不存在
(3)请求B去数据库查询得到旧值
(4)请求B将旧值写入缓存
(5)请求A将新值写入数据库

上述情况就会导致不一致的情形出现。而且,如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。

所以先删缓存,再更新数据库并不是一劳永逸的解决方案,再看看先更新数据库,再删缓存

先更新数据库,再删缓存这种情况不存在并发问题么?不是的。假设这会有两个请求,一个请求A做查询操作,一个请求B做更新操作,那么会有如下情形产生

(1)缓存刚好失效
(2)请求A查询数据库,得一个旧值
(3)请求B将新值写入数据库
(4)请求B删除缓存
(5)请求A将查到的旧值写入缓存

如果发生上述情况,确实是会发生脏数据。然而,发生这种情况的概率又有多少呢?发生上述情况有一个先天性条件,就是步骤(3)的写数据库操作比步骤(2)的读数据库操作耗时更短,才有可能使得步骤(4)先于步骤(5)。可是,大家想想,数据库的读操作的速度远快于写操作的(不然做读写分离干嘛,做读写分离的意义就是因为读操作比较快,耗资源少),因此步骤(3)耗时比步骤(2)更短,这一情形很难出现。

所以,如果你想实现基础的缓存数据库双写一致的逻辑,那么在大多数情况下,在不想做过多设计,增加太大工作量的情况下,请先更新数据库,再删缓存!

我一定要数据库和缓存数据一致怎么办

没有办法做到绝对的一致性,这是由CAP理论决定的,缓存系统适用的场景就是非强一致性的场景,所以它属于CAP中的AP

所以,我们得委曲求全,可以去做到BASE理论中说的「最终一致性」

最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态。因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性

延时双删

上文我们提到,在先删除缓存,再更新数据库的情况下,如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。那么延时双删怎么解决这个问题呢?

(1)先淘汰缓存
(2)再写数据库(这两步和原来一样)
(3)休眠1秒,再次淘汰缓存

这么做,可以将1秒内所造成的缓存脏数据,再次删除。

针对上面的情形,读者应该自行评估自己的项目的读数据业务逻辑的耗时。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上,加几百ms即可。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。

如果你用了mysql的读写分离架构怎么办?在这种情况下,造成数据不一致的原因如下,还是两个请求,一个请求A进行更新操作,另一个请求B进行查询操作。

(1)请求A进行写操作,删除缓存
(2)请求A将数据写入数据库了,
(3)请求B查询缓存发现,缓存没有值
(4)请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值
(5)请求B将旧值写入缓存
(6)数据库完成主从同步,从库变为新值

上述情形,就是数据不一致的原因。还是使用双删延时策略。只是,睡眠时间修改为在主从同步的延时时间基础上,加几百ms。

采用这种同步淘汰策略,吞吐量降低怎么办?那就将第二次删除作为异步的。自己起一个线程,异步删除。这样,写的请求就不用沉睡一段时间后了,再返回。这么做,加大吞吐量。

所以在先删除缓存,再更新数据库的情况下,可以使用延时双删的策略,来保证脏数据只会存活一段时间,就会被准确的数据覆盖。

在先更新数据库,再删缓存的情况下,缓存出现脏数据的情况虽然可能性极小,但也会出现。我们依然可以用延时双删策略,在请求A对缓存写入了脏的旧值之后,再次删除缓存。来保证去掉脏缓存。

删缓存失败了怎么办:重试机制

看似问题都已经解决了,但其实,还有一个问题没有考虑到,那就是删除缓存的操作,失败了怎么办?比如延时双删的时候,第二次缓存删除失败了,那不还是没有清除脏数据吗?

方案一:
(1)更新数据库数据;
(2)缓存因为种种问题删除失败
(3)将需要删除的key发送至消息队列
(4)自己消费消息,获得需要删除的key
(5)继续重试删除操作,直到成功

然而,该方案有一个缺点,对业务线代码造成大量的侵入。

方案二:
(1)更新数据库数据
(2)数据库会将操作信息写入binlog日志当中(而读取binlog的中间件,可以采用阿里开源的canal)
(3)订阅程序提取出所需要的数据以及key
(4)另起一段非业务代码,获得该信息
(5)尝试删除缓存操作,发现删除失败
(6)将这些信息发送至消息队列
(7)重新从消息队列中获得该数据,重试操作。

🔗参考文档

如何优雅的实现订单异步处理

简单的订单异步处理实现

在秒杀系统用户进行抢购的过程中,由于在同一时间会有大量请求涌入服务器,如果每个请求都立即访问数据库进行扣减库存+写入订单的操作,对数据库的压力是巨大的。

如何减轻数据库的压力呢,我们将每一条秒杀的请求存入消息队列(例如RabbitMQ)中,放入消息队列后,给用户返回类似“抢购请求发送成功”的结果。而在消息队列中,我们将收到的下订单请求一个个的写入数据库中,比起多线程同步修改数据库的操作,大大缓解了数据库的连接压力,最主要的好处就表现在数据库连接的减少:

  • 同步方式:大量请求快速占满数据库框架开启的数据库连接池,同时修改数据库,导致数据库读写性能骤减。
  • 异步方式:一条条消息以顺序的方式写入数据库,连接数几乎不变(当然,也取决于消息队列消费者的数量

这种实现可以理解为是一中流量削峰:让数据库按照他的处理能力,从消息队列中拿取消息进行处理。

我们在源码仓库里,新增一个controller对外接口:

/**
 * 下单接口:异步处理订单
 * @param sid
 * @return
 */
@RequestMapping(value = "/createUserOrderWithMq", method = {RequestMethod.GET})
@ResponseBody
public String createUserOrderWithMq(@RequestParam(value = "sid") Integer sid,
                              @RequestParam(value = "userId") Integer userId) {
    try {
        // 检查缓存中该用户是否已经下单过
        Boolean hasOrder = orderService.checkUserOrderInfoInCache(sid, userId);
        if (hasOrder != null && hasOrder) {
            LOGGER.info("该用户已经抢购过");
            return "你已经抢购过了,不要太贪心.....";
        }
        // 没有下单过,检查缓存中商品是否还有库存
        LOGGER.info("没有抢购过,检查缓存中商品是否还有库存");
        Integer count = stockService.getStockCount(sid);
        if (count == 0) {
            return "秒杀请求失败,库存不足.....";
        }

        // 有库存,则将用户id和商品id封装为消息体传给消息队列处理
        // 注意这里的有库存和已经下单都是缓存中的结论,存在不可靠性,在消息队列中会查表再次验证
        LOGGER.info("有库存:[{}]", count);
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("sid", sid);
        jsonObject.put("userId", userId);
        sendToOrderQueue(jsonObject.toJSONString());
        return "秒杀请求提交成功";
    } catch (Exception e) {
        LOGGER.error("下单接口:异步处理订单异常:", e);
        return "秒杀请求失败,服务器正忙.....";
    }
}

createUserOrderWithMq接口整体流程如下:

  • 检查缓存中该用户是否已经下单过:在消息队列下单成功后写入redis一条用户id和商品id绑定的数据
  • 没有下单过,检查缓存中商品是否还有库存
  • 缓存中如果有库存,则将用户id和商品id封装为消息体「传给消息队列处理」

注意:这里的「有库存和已经下单」都是缓存中的结论,存在不可靠性,在消息队列中会查表再次验证,「作为兜底逻辑」

消息队列是如何接收消息的呢?我们新建一个消息队列,采用第四篇文中使用过的RabbitMQ,我再稍微贴一下整个创建RabbitMQ的流程把

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
@Configuration
public class RabbitMqConfig {

    @Bean
    public Queue orderQueue() {
        return new Queue("orderQueue");
    }

}

添加一个消费者:

@Component
@RabbitListener(queues = "orderQueue")
public class OrderMqReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMqReceiver.class);

    @Autowired
    private StockService stockService;

    @Autowired
    private OrderService orderService;

    @RabbitHandler
    public void process(String message) {
        LOGGER.info("OrderMqReceiver收到消息开始用户下单流程: " + message);
        JSONObject jsonObject = JSONObject.parseObject(message);
        try {
            orderService.createOrderByMq(jsonObject.getInteger("sid"),jsonObject.getInteger("userId"));
        } catch (Exception e) {
            LOGGER.error("消息处理异常:", e);
        }
    }
}

真正的下单的操作,在service中完成,我们在orderService中新建createOrderByMq方法:

@Override
public void createOrderByMq(Integer sid, Integer userId) throws Exception {

    Stock stock;
    //校验库存(不要学我在trycatch中做逻辑处理,这样是不优雅的。这里这样处理是为了兼容之前的秒杀系统文章)
    try {
        stock = checkStock(sid);
    } catch (Exception e) {
        LOGGER.info("库存不足!");
        return;
    }
    //乐观锁更新库存
    boolean updateStock = saleStockOptimistic(stock);
    if (!updateStock) {
        LOGGER.warn("扣减库存失败,库存已经为0");
        return;
    }

    LOGGER.info("扣减库存成功,剩余库存:[{}]", stock.getCount() - stock.getSale() - 1);
    stockService.delStockCountCache(sid);
    LOGGER.info("删除库存缓存");

    //创建订单
    LOGGER.info("写入订单至数据库");
    createOrderWithUserInfoInDB(stock, userId);
    LOGGER.info("写入订单至缓存供查询");
    createOrderWithUserInfoInCache(stock, userId);
    LOGGER.info("下单完成");

}

真正的下单的操作流程为:

  • 校验数据库库存
  • 乐观锁更新库存(其他之前讲到的锁也可以啦)
  • 写入订单至数据库
  • 「写入订单和用户信息至缓存供查询」:写入后,在外层接口便可以通过判断redis中是否存在用户和商品的抢购信息,来直接给用户返回“你已经抢购过”的消息。

我是如何在redis中记录商品和用户的关系的呢,我使用了set集合,key是商品id,而value则是用户id的集合,当然这样有一些不合理之处:

  • 这种结构默认了一个用户只能抢购一次这个商品
  • 使用set集合,在用户过多后,每次检查需要遍历set,用户过多有性能问题
@Override
    public Boolean checkUserOrderInfoInCache(Integer sid, Integer userId) throws Exception {
        String key = CacheKey.USER_HAS_ORDER.getKey() + "_" + sid;
        LOGGER.info("检查用户Id:[{}] 是否抢购过商品Id:[{}] 检查Key:[{}]", userId, sid, key);
        return stringRedisTemplate.opsForSet().isMember(key, userId.toString());
    }

更加优雅的实现

我们实现了上面的异步处理后,用户那边得到的结果是怎么样的呢?

用户点击了提交订单,收到了消息:您的订单已经提交成功。然后用户啥也没看见,也没有订单号,用户开始慌了,点到了自己的个人中心——已付款。发现居然没有订单!(因为可能还在队列中处理)

这样的话,用户可能马上就要开始投诉了!太不人性化了,我们不能只为了开发方便,舍弃了用户体验!

所以我们要改进一下,如何改进呢?其实很简单:

  • 让前端在提交订单后,显示一个“排队中”
  • 同时,前端不断请求 检查用户和商品是否已经有订单 的接口,如果得到订单已经处理完成的消息,页面跳转抢购成功。
/**
 * 检查缓存中用户是否已经生成订单
 * @param sid
 * @return
 */
@RequestMapping(value = "/checkOrderByUserIdInCache", method = {RequestMethod.GET})
@ResponseBody
public String checkOrderByUserIdInCache(@RequestParam(value = "sid") Integer sid,
                              @RequestParam(value = "userId") Integer userId) {
    // 检查缓存中该用户是否已经下单过
    try {
        Boolean hasOrder = orderService.checkUserOrderInfoInCache(sid, userId);
        if (hasOrder != null && hasOrder) {
            return "恭喜您,已经抢购成功!";
        }
    } catch (Exception e) {
        LOGGER.error("检查订单异常:", e);
    }
    return "很抱歉,你的订单尚未生成,继续排队吧您嘞。";
}

一条小咸鱼