黑马点评


黑马点评

MySQL表

  • tb_user:用户表
  • tb_user_info:用户详情表
  • tb_shop:商户信息表
  • tb_shop_type:商户类型表
  • tb_blog:用户日记表(达人探店日记)
  • tb_follow:用户关注表
  • tb_voucher:优惠券表
  • tb_voucher_order:优惠券的订单表

当前模型

手机或者app端发起请求,请求我们的nginx服务器,nginx基于七层模型走的是HTTP协议,可以实现基于Lua直接绕开tomcat访问redis,也可以作为静态资源服务器,轻松扛下上万并发, 负载均衡到下游tomcat服务器,打散流量,我们都知道一台4核8G的tomcat,在优化和处理简单业务的加持下,大不了就处理1000左右的并发, 经过nginx的负载均衡分流后,利用集群支撑起整个项目,同时nginx在部署了前端项目后,更是可以做到动静分离,进一步降低tomcat服务的压力,这些功能都得靠nginx起作用,所以nginx是整个项目中重要的一环。

在tomcat支撑起并发流量后,我们如果让tomcat直接去访问Mysql,根据经验Mysql企业级服务器只要上点并发,一般是16或32 核心cpu,32 或64G内存,像企业级mysql加上固态硬盘能够支撑的并发,大概就是4000起~7000左右,上万并发, 瞬间就会让Mysql服务器的cpu,硬盘全部打满,容易崩溃,所以我们在高并发场景下,会选择使用mysql集群,同时为了进一步降低Mysql的压力,同时增加访问的性能,我们也会加入Redis,同时使用Redis集群对外提供更好的服务。

用户鉴权

cookie 是一个非常具体的东西,指的就是浏览器里面能永久存储的一种数据,仅仅是浏览器实现的一种数据存储功能。

cookie由服务器生成,发送给浏览器,浏览器把cookie以kv形式保存到某个目录下的文本文件内,下一次请求同一网站时会把该cookie发送给服务器。

由于cookie是存在客户端上的,所以浏览器加入了一些限制确保cookie不会被恶意使用,同时不会占据太多磁盘空间,所以每个域的cookie数量是有限的。

Cookie 技术主要用于:

  • 身份验证:保存用户登录状态,实现持久登录。
  • 会话管理:追踪用户的会话信息,例如购物车内容。
  • 个性化设置:存储用户偏好设置,以提供个性化的用户体验。

Cookie 的相关操作和属性可以通过 JavaScript 的 document.cookie 对象进行访问和修改。可以设置 Cookie 的过期时间、作用域、路径等属性,以控制其有效性和访问范围。

Session

Session(会话)是一种在服务器端存储用户状态和数据的机制。它通过在服务器端创建一个唯一的会话标识(通常是一个 Session ID),并将该标识发送给客户端,实现在不同的请求中跟踪和管理用户的状态。

当用户访问一个网站时,服务器会为该用户创建一个会话,并将会话 ID 存储在 Cookie 中,或者通过其他方式将其发送给客户端。客户端的浏览器会自动在后续的请求中将会话 ID 发送给服务器,在服务器端找到对应的会话数据。

会话数据存储在服务器端的临时存储区域,通常是在服务器的内存中或在数据库中。服务器会根据会话 ID 来检索和更新相应的数据,从而实现用户状态的管理和维护。

Session 技术的主要作用:

  • 身份验证:存储用户登录信息和权限,实现用户认证。
  • 会话管理:追踪用户的操作和活动,在请求之间保持连续性。
  • 数据存储:存储用户数据,例如购物车、表单数据等。

需要注意的是,为了保护用户数据的安全性和隐私,开发人员应采取适当的安全措施:

  • 使用安全的传输协议(如 HTTPS)来保护会话数据在网络传输中的安全性。
  • 针对会话 ID 的保护,包括生成安全的随机会话 ID、设置合适的过期时间,以及通过 HttpOnly 和 Secure 标志来防止跨站脚本攻击和会话劫持。

Token

Token(令牌)是一种在身份验证和授权中使用的字符串,用于验证客户端的身份和权限。它作为客户端和服务器之间进行安全通信的一种方式,来确保用户的身份和访问权限。

Token 的主要特点是它是无状态的,即服务器不需要在存储设备中维护任何信息。服务器在生成 Token 后,将其发送给客户端,客户端在以后的请求中将 Token 作为身份凭证发送给服务器。服务器通过验证 Token 的有效性,来确认客户端的身份和权限。

Token 的生成和验证过程一般包含以下步骤:

  • 客户端向服务器发送身份验证请求,通常是提供用户名和密码。
  • 服务器验证客户端提供的身份信息,并生成一个 Token。
  • 服务器将生成的 Token 发送给客户端,客户端将其保存。
  • 客户端在以后的请求中将 Token 添加到请求头中,作为身份凭证。
  • 服务器在接收到请求时,验证 Token 的有效性和权限,并相应地处理请求。

Token 的主要作用:

  • 身份验证:通过 Token 来验证客户端的身份,替代传统的基于会话的身份验证机制,不需要在服务器端存储会话信息,减轻服务器的负担。
  • 授权:服务器可以根据 Token 中的信息来判断客户端的权限,决定是否允许相应的操作。
  • 单点登录(SSO):Token 可以在不同的应用程序之间共享,实现用户的单点登录功能。

Token 可以有不同的类型和格式,常见的包括 JSON Web Token(JWT)、OAuth 2.0 的访问令牌等。同时,Token 也可以通过加密和签名等方式来保证其安全性。

短信登录

发送验证码

1、用户提交手机号
2、校验手机号是否合法
3、生成验证码
4、将生成的验证码保存到session中,用于后续的验证
5、发送验证码给用户

接口地址:/user/code

请求方式:POST

请求数据类型:application/json

请求参数:

参数名称 参数说明 请求类型 是否必须 数据类型 schema
phone query true string
session query true HttpSession

响应示例:

{
	"data": {},
	"errorMsg": "",
	"success": true,
	"total": 0
}

登录/注册

1、提交手机号和验证码
2、校验验证码
3、根据手机号查询数据库信息
4、用户存在就保存到session,否则就创建新用户并保存到数据库,最后也保存到session中

接口地址:/user/login

请求方式:POST

请求数据类型:application/json

请求参数:

参数名称 参数说明 请求类型 是否必须 数据类型 schema
loginForm loginForm body true LoginFormDTO LoginFormDTO
  code false string
  password false string
  phone false string
session query true HttpSession

校验登录状态-拦截器

首先我们要知道怎么基于session进行校验,session是基于cookie的(每一个session的id都会保存到cookie中),当用户访问的时候会携带cookie,所以我们可以根据cookie中的session_id来查询session中是否有这个用户:

1、用户发送请求并携带cookie
2、从session中获取用户
3、判断用户是否存在:
(1)没有这个用户就拦截
(2)有这个用户就保存用户信息到ThreadLocal用于登录缓存(ThreadLocal是一个线程域对象,每一个请求到达服务都会是一个独立线程,直接保存到本地变量会出现并发修改的安全问题,而ThreadLocal会将数据保存到每个线程内部,在线程内部创建一个Map来进行保存),保存完后就放行该用户即可

基于session校验需要在跨各种组件和页面的时候都访问session,编写重复的校验代码,增加了访问session的开销。所以使用ThreadLocal,使得每一个线程只需要访问一次session。

@Configuration
public class MvcConfig implements WebMvcConfigurer {
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor())
                .excludePathPatterns(   // 白名单
                        "/shop/**",
                        "/shop-type/**",
                        "/voucher/**",
                        "/upload/**",
                        "/blog/hot",
                        "/user/code",
                        "/user/login"
                );
    }
}

public class LoginInterceptor implements HandlerInterceptor {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 获取客户端请求传进来的session
        HttpSession session = request.getSession();
        // 获取user
        Object user = session.getAttribute("user");
        // 用户不存在,拦截,返回401状态码
        if (user == null) {
            response.setStatus(401);
            return false;
        }
        // 用户存在session中,保存到localthread,放行
        UserHolder.saveUser((UserDTO) user);
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 线程执行后移除用户
        UserHolder.removeUser();
    }
}

集群的session共享问题

session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同Tomcat服务时导致数据丢失的问题。
这是因为我们为了我们将来系统的高并发性,就需要水平拓展,形成负载均衡的集群每个Tomcat都会有一个对应的session。当我们在某一台Tomcat上进行登录以后,第二次登录的时候,要是被负载均衡到了另一台Tomcat,就会造成没办法获得之前登录时的session,就没办法再做验证了。
这个问题听起来好像也挺容易解决,如果每台Tomcat都互相拷贝,保存相同的数据,那肯定就不至于发生如上的问题,但是这样的解决方式太浪费空间了,而且拷贝的过程还是比较费时的,如果这时候已经有访问请求,就可能会出现数据不一致的情况。
因此,我们的session信息共享的解决方案应该满足以下特点:
1、数据共享
2、内存存储
3、key-value结构
这时候我们就回到了Redis了,我们知道Redis是独立于Tomcat的,单独进行存储,且任何一台Tomcat都可以访问到Redis,因此可以实现数据共享

基于Redis实现共享session

发送验证码

验证码保存到redis中,Redis的结构是key-value的,且value是很多种类型的,在这里我们选择最简单的String类型即可。

一个需要考虑的问题是key的选取,在session中我们选用了“code”来作为key,但在这里却不行。这是因为每一个不同的浏览器在发送请求的时候都会有一个不同的独立的session,也就是说Tomcat的内部维护了很多的session,互相之间是不会干扰的。但是Redis是一个共享的内存空间,如果直接使用key会造成覆盖,所以我们不能直接选用“code”来作为key。因此我们可以用手机号作为key(因为同一账号同一时刻仅在一个平台登录)。

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result sendCode(String phone, HttpSession session) {
    // 校验手机号,正则,不合法则返回错误信息
    if (RegexUtils.isPhoneInvalid(phone)) {
        return Result.fail("手机号格式错误");
    }
    // 生成验证码
    String code = RandomUtil.randomNumbers(6);
    // 保存验证码到redis,用phone为key,限制有效期5min
    //        session.setAttribute("code", code);  这个是存到session
    stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);
    // 发送验证码,调用第三方平台
    log.debug("发送验证码:" + code);
    return Result.ok();
}
验证码登录与注册

最终的用户信息不再保存到session中,而是保存都Redis中去了,同样要考虑key跟value的选择:
(1)value的选取:我们要保存的是用户的信息,这是一个对象。我们用Hash结构是最合适的。
(2)key的选取:这里并不建议用phone作为key,而是以随机token(服务器生成的令牌)为key来存储用户数据,因为这里的token要存到前端,使用手机号为key不安全,容易泄露。

在之前我们校验登录状态的时候,是从cookie中获取session再得到用户信息,而现在我们校验登录的时候要访问的凭证就是这个随机token了,但Tomcat不会将这个token自动写到浏览器上面。所以我们把数据保存到Redis以后还需要手动的把token返回到前端,流程就得修改:

1、提交手机号和验证码
2、校验验证码
3、根据手机号查询用户信息
4、用户保存到Redis
5、返回token给客户端(重要一步)

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
    String phone = loginForm.getPhone();
    // 校验手机号
    if (RegexUtils.isPhoneInvalid(phone)) {
        return Result.fail("手机号格式错误");
    }
    // 校验验证码,从redis中获取验证码
    //        Object cacheCode = session.getAttribute("code");
    String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
    String code = loginForm.getCode();
    // 不一致,报错
    if (cacheCode == null || !cacheCode.toString().equals(code)) {
        return Result.fail("验证码错误");
    }
    // 一致,查数据库tb_user
    User user = query().eq("phone", phone).one();
    // 不存在,创建新用户
    if (user == null) {
        user = createUserWithPhone(phone);
    }
    // 保存用户信息DTO(id,昵称,头像信息)到redis,需要生成token
    String token = UUID.randomUUID().toString();
    UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
    // 将user对象转为map类型,拆分所有字段,以哈希结构存到redis里 -- 对象中的数据类型都应该是string
    Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),  // 将Java对象(Bean)转换为Map类型
                                                     CopyOptions.create()    // 对象转换时的配置项
                                                     .setIgnoreNullValue(true)   // 是否忽略空值,当源对象的值为null时,true: 忽略而不注入此值,false: 注入null
                                                     .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));    // 对转换过程中的字段值进行编辑处理,对象值转为string
    String user_token = LOGIN_USER_KEY + token;
    // 这里long id直接转string会报错
    stringRedisTemplate.opsForHash().putAll(user_token, userMap);
    // 设置token有效期(超过30分钟没有访问就删除)
    stringRedisTemplate.expire(user_token, LOGIN_USER_TTL, TimeUnit.MINUTES);
    //        session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
    return Result.ok(token);
}
校验登录状态

我们不再是从浏览器中的cookie指定的session来获取用户信息,而是以随机token为key来从Redis中获取信息,流程如下:

1、用户发送请求并携带token
2、从Redis中获取用户(以随机token为key)
3、判断用户是否存在:
(1)没有这个用户就拦截
(2)有这个用户就保存用户信息到ThreadLocal,并放行

public class LoginInterceptor implements HandlerInterceptor {
    // 这个拦截器类不是由spring管理的,所以需要手动构造stringRedisTemplate而不能用依赖注入
    // 通过调用该类的spring类注入stringRedisTemplate,然后传递参数到拦截器来完成构造
    private StringRedisTemplate stringRedisTemplate;

    public LoginInterceptor(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 获取客户端请求头传进来的token
        String token = request.getHeader("authorization");
//        HttpSession session = request.getSession();
        // token为空或仅包含空白字符
        if (StrUtil.isBlank(token)) {
            response.setStatus(401);
            return false;
        }
        // 基于token构造redis中存的用户信息的key
        String key = RedisConstants.LOGIN_USER_KEY + token;
        // 基于token获取redis中存的user
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
        // 用户不存在,拦截,返回401状态码
        if (userMap.isEmpty()) {
            response.setStatus(401);
            return false;
        }
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        // 用户存在,保存到localthread,放行
        UserHolder.saveUser(userDTO);
        // 更新过期时间 -- 也就是每次访问新的页面,进行拦截器判断,更新过期时间
        stringRedisTemplate.expire(key, RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 线程执行后移除用户
        UserHolder.removeUser();
    }
}
解决状态登录刷新的问题——登录拦截器的优化

上述代码实现完还有一点小问题,之前的拦截器并不会拦截掉一切路径,而是所有需要登录的路径,那么会出现一个问题:我们的首页并不需要登录就可以直接访问,那么已经登录过的用户一直在首页进行操作,拦截器中的登录状态并不会刷新,就可能造成明明一直在操作系统,却被视为不算是在登录状态。
解决方法是再加上一个拦截器,用户的请求要先经过这个拦截器,这个拦截器会拦截一切的路径,所以我们可以在这个拦截器里面进行token有效期的刷新操作:

1、获取token
2、查询Redis的用户
3、保存到ThreadLocal
4、刷新token有效期
5、放行

这样的话,一切的请求都会触发刷新的操作。那么之前的拦截器只需要查询ThreadLocal的用户,存在则继续,不存在则拦截。

新的拦截一切的拦截器做用户校验,存到threadlocal并放行一切:

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    // 获取客户端请求头传进来的token
    String token = request.getHeader("authorization");
    // token为空或仅包含空白字符,直接放行
    if (StrUtil.isBlank(token)) {
        return true;
    }
    // 基于token构造redis中存的用户信息的key
    String key = RedisConstants.LOGIN_USER_KEY + token;
    // 基于key获取redis中存的user
    Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
    // 用户不存在,直接放行
    if (userMap.isEmpty()) {
        return true;
    }
    UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
    // 用户存在,保存到localthread,更新放行
    UserHolder.saveUser(userDTO);
    // 更新过期时间
    stringRedisTemplate.expire(key, RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
    return true;
}

旧的拦截器只检查threadlocal有没有用户:

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    // 在login拦截器中只需要校验threadlocal里面有没有存用户,没有则拦截
    if (UserHolder.getUser() == null) {
        response.setStatus(401);
        return false;
    }
    return true;
}

==商户查询缓存==

什么是缓存

缓存:数据交换的缓冲区(Cache),是存储数据的临时地方,读写性能高。俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于本地代码(例如:

1:Static final ConcurrentHashMap<K,V> map = new ConcurrentHashMap<>(); 本地用于高并发
例2:static final Cache<K,V> USER_CACHE = CacheBuilder.newBuilder().build(); 用于redis等缓存
例3:Static final Map<K,V> map =  new HashMap(); 本地缓存

由于其被Static修饰,所以随着类的加载而被加载到内存之中,作为本地缓存,由于其又被final修饰,所以其引用(例3:map)和对象(例3:new HashMap())之间的关系是固定的,不能改变,因此不用担心赋值(=)导致缓存失效;

我们的浏览器有浏览器缓存,在浏览器未命中数据,就会在tomcat的应用层缓存中取数据,再没有命中的话就去数据库进行查询检索。
缓存的作用:
1、降低后端负载
2、提高读写效率,降低响应时间
缓存的成本:
1、数据的一致性成本
2、代码维护成本(解决一致性问题的时候带来的代码复杂)
3、运维的成本

商户查询添加缓存

如果直接查数据库,逻辑是这样的。速度慢

@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
    //这里是直接查询数据库
    return shopService.queryById(id);
}

改成缓存,用string类型存商户json信息

@Override
    public Result queryById(Long id) {
        // 查缓存,若存在直接返回,不存在就查数据库
        String key = CACHE_SHOP_KEY + id;
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        if (StrUtil.isNotBlank(shopJson)) {
            // 存在店铺信息,反序列化json为bean对象并返回
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return  Result.ok(shop);
        }
        // 查数据库
        Shop shop = getById(id);
        if (shop == null)
            return Result.fail("店铺不存在");
        // 数据库中存在,将店铺信息序列化为json字符串,存储到redis缓存中
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
        return Result.ok(shop);
    }

商户类型序列缓存

shopTypeController

@GetMapping("list")
public Result queryTypeList() {
    //        List<ShopType> typeList = typeService
    //                .query().orderByAsc("sort").list();
    return typeService.queryShopTypeList();
}

shopTypeServiceImpl

@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {
    @Resource
    StringRedisTemplate stringRedisTemplate;
    @Override
    public Result queryShopTypeList() {
        // 要返回的商户列表
        List<ShopType> typeList = new ArrayList<>();
        // 存到redis中的商户类型list,是商户这个类型的json字符串的list集合
        List<String> redisTypeList = new ArrayList<>();

        // 查询redis缓存
        Long size = stringRedisTemplate.opsForList().size(CACHE_SHOPTYPE_KEY);
        if (size != 0) {    // 缓存有数据就返回
            redisTypeList = stringRedisTemplate.opsForList().range(CACHE_SHOPTYPE_KEY, 0, size);
            // 利用stream流和map将string类型的json list转换为bean list
            typeList = redisTypeList.stream().map(s->{
                return JSONUtil.toBean(s, ShopType.class);
            }).collect(Collectors.toList());
            return Result.ok(typeList);
        }

        // 缓存没有,查数据库,存缓存,返回
        typeList = query().orderByAsc("sort").list();
        
        if (typeList == null || typeList.isEmpty())
            return Result.fail("不存在商户类型数据");

        // 查到的bean list通过stream流转为string list,存到缓存
        redisTypeList = typeList.stream().map(shopType->{
            return JSONUtil.toJsonStr(shopType);
        }).collect(Collectors.toList());
        stringRedisTemplate.opsForList().rightPushAll(CACHE_SHOPTYPE_KEY, redisTypeList);

        return Result.ok(typeList);
    }
}

缓存更新策略

上面的作用模型可能会造成数据一致性问题,当我们对数据库进行修改的时候,缓存并没有同步进行修改,页面在缓存中获取数据的时候,其实并不是最新的数据。这肯定是不允许的。

下面是缓存更新策略:

内存淘汰 超时剔除 主动更新
说明 不用自己维护,利用Redis的内存淘汰机制,内存不足时自动淘汰部分数据,下次查询时更新缓存 给缓存数据添加TTL时间,到期后自动删除缓存。下次查询即可实现缓存的更新 自己编写业务逻辑,在修改数据库的同时,更新缓存
一致性 一般
维护成本

上述的策略选择要根据具体的业务场景:
1、低一致性需求/数据很少更新:使用内存淘汰机制。例如店铺类型的查询缓存。
2、高一致性需求:主动更新,以超时剔除作兜底方案。例如店铺详情查询的缓存。

主动更新策略

1、Cache Aside Pattern(最常用) – 由缓存的调用者,在更新数据库的同时更新缓存
2、Read/Write Through Pattern – 缓存与数据库整合为一个服务,由服务来维护一致性(调用者不知道服务内部)。调用者调用该服务无需关注一致性问题。但这种服务的成本肯定是很高的。
3、Write Behind Caching Pattern(写回) – 调用者只操作缓存由其它线程异步的将缓存数据持久化到数据库,保证最终一致。
比如我们一直对缓存进行更新,更新10次以后轮到这个线程工作,就维护一下数据库的数据为更新10次后的数据,中途的其他9次更新操作根本不重要,这样的性能显然是很高的。这种方式当然也有很大问题,比如长期的数据不一致、缓存宕机造成的严重后果等。

操作缓存和数据库时要考虑如下几个问题:

  • 删除缓存还是更新缓存?
    • 更新缓存:每次更新数据库都更新缓存,无效写操作较多。(写多读少的情况,假设更新100次数据,读1次数据,这边会有100次无效更新缓存)
    • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存更优) 。(写多读少时,更新100次也只需要删一次缓存,避免无效写操作)
  • 如何保证缓存和数据库的操作的同时成功或失败?(原子性)
    - 单体系统:事务控制,将缓存与数据库操作放在一个事务
    - 分布式系统:利用TCC等分布式事务方案
  • 先操作缓存还是先操作数据库?(线程安全)
    - 先删除缓存,再操作数据库
    - 先操作数据库,再删除缓存 - 由于 redis 的速度远比MySQL要快,所以方案二为优选

先删缓存再操作数据库的线程安全问题:正常情况如左图,两个线程得到的数据一致。异常情况下如果线程2(读操作)在线程1写数据库之前查缓存,就会使得缓存与数据库不同步。因为写MySQL数据库操作比查Redis缓存操作慢,所以异常情况很容易发生。

先操作数据库再删缓存的线程安全问题:正常情况如左图。异常情况如右图。异常情况发生条件是(1)线程并行(2)缓存失效(3)在线程1(读操作)查数据库过程中,线程2完成了更新数据库和删缓存两个操作。由于MySQL写数据比查数据慢,在微秒级别内完成更新数据库和删除缓存概率很低。所以这个方式更好。

image-20240516122155989

总结Redis读写策略

读操作:

  • 缓存命中则直接返回
  • 缓存未命中则查询数据库,并写入缓存,设定超时时间

写操作:

  • 先写数据库,然后再删除缓存
  • 要确保数据库与缓存操作的原子性

商铺缓存与双写一致

现在我们要给查询商铺的缓存添加主动更新超时剔除策略。
修改ShopController的业务逻辑满足:
(1)根据id查询店铺,没命中就查数据库,然后写入缓存,并设置超时时间
(2)根据id修改店铺,先操作数据库,再删除缓存

@Override
@Transactional  // 通过事务控制数据库和缓存操作的原子性
public Result update(Shop shop) {
    Long id = shop.getId();
    if (id == null)
        return Result.fail("店铺不存在");
    // 更新数据库
    updateById(shop);
    // 删除缓存
    stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
    return Result.ok();
}

缓存穿透、雪崩、击穿

缓存穿透

缓存穿透是指客户端请求的数据在缓存和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。不断发起这样的请求,给数据库带来巨大压力。过程即:
(1)客户端访问Redis,未命中 (2)接着访问数据库,未命中
这样的话,如果有人恶意多线程地访问不存在的内容,可能就把我们的系统弄垮了。

解决方案

1、缓存空对象:
(1)客户端请求Redis,未命中
(2)接着访问数据库,未命中
(3)数据库将空值null缓存到Redis里
这样如果继续访问的话,就会访问Redis了,不会一直去对数据库造成攻击,尽管访问Redis以后返回的内容是NULL。
优点:实现简单,维护方便
缺点额外内存消耗(每次进行不同的访问,都创建null,不过设置TTL可以解决);可能造成短期的不一致(设置为NULL之后,数据库真的新增了这个数据,不过设置TTL可以有效缓解这种情况的出现概率)

2、布隆过滤:
这其实是一种算法,它在客户端与Redis交互之间加了一个布隆过滤器
(1)用户请求布隆过滤器,不存在就直接拒绝
(2)存在的话就放行,让客户端去访问Redis,有就返回,没有就访问数据库
布隆过滤器存储的一系列的二进制位,这种二进制数是先对数据库数据进行某种哈希运算以后再转成二进制存储到布隆过滤器的,具体原理可以自行查询,这种算法实现方式决定了过滤器存在概率性:如果过滤器返回不存在,那就是不存在;如果返回存在,那就不一定了。

优点内存占用较少,没有多余key
缺点:实现复杂(不过Redis里面存在,可以简化开发);存在误判可能

因为布隆过滤器存在误判,所以我们的开发过程中,会选择缓存空对象的方式来解决缓存穿透。

image-20240516130239055

缓存穿透的其他解决方案:

  • 增强id的复杂度,避免被猜测id规律
  • 做好数据的基础格式校验
  • 加强用户权限校验
  • 做好热点参数的限流

解决商铺查询的缓存穿透问题

1、我们需要在之前业务流程环节中增加缓存空对象的环节,即可解决,也就是根据id查询数据库的时候,判断商铺不存在之后,不再直接结束,而是将空值写入Redis。
2、那么我们之后的查询,可以在缓存中查询出null值,因此我们的查询就需要对查询出来的值进行判断,不是空值的话才能返回商铺信息到前端。

image-20240516214714449

缓存雪崩

缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

正常情况下,大量请求会到达Redis,少数请求到达数据库。而Redis一旦宕机,或者Redis中的大量key都因为TTL到期而失效了,这时候的很多请求都会指向数据库。
针对这个问题,我们可以提出一些解决方案:
1、给不同的key的TTL添加随机值,避免大量的key在同一个小时段内失效
2、利用Redis集群提高服务的可用性(Redis哨兵机制可以实现服务的监控,发现宕机的主Redis,就可以立刻将从Redis替代上去),这个内容相对比较高级,在之后讲。
3、给缓存业务添加降级限流策略(微服务部分)(如果整个集群的Redis全部都宕机了,我们可以提前做容错处理,当这些Redis都失效的时候,我们要及时的拒绝请求,防止大量请求到达数据库)
4、给业务添加多级缓存

缓存击穿

缓存击穿也叫作热点key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

我们可以这么理解,网站中有一些内容是非常的重要的,很可能在同一时段被多个用户给同时访问,也就是高并发访问,而这个被高并发访问的key失效了,这时候访问就会到达数据库,大量请求到达数据库是很危险的,很容易造成缓存雪崩。
即便数据库比较坚强,也有可能用户进行访问的内容是很复杂的,可能涉及到了多表查询,也可能其转换到Redis中进行存储的时候需要进行一系列的业务。当缓存重建业务复杂的时候,如此大的请求在那一瞬间给数据库带来的冲击是非常巨大的。

缓存击穿问题,有两种比较主流的解决方法:
1、互斥锁
2、逻辑过期

互斥锁

1、当第一个线程未命中缓存的时候,获取互斥锁,直到这个线程查询数据库完,并且重建了缓存数据并存入Redis,才能释放互斥锁;
2、后面的线程在缓存数据存入Redis的过程中,同样会发生查询Redis未命中的情况,那么这些线程无法获得互斥锁,只能进行休眠,休眠一段时间后再重试,直到锁被解开(Redis中已经有数据了)。

逻辑过期

缓存击穿会出现的原因,其实无非就是TTL到期,Redis失效了,因此我们可以不给其设置TTL。但是我们该如何知道key过期了呢?我们要给这个key设置一个逻辑过期,类似:

KEY VALUE
wxj:user:1 {name:“Jack”, age:21, expire:151467}

这里的expire不是TTL,而是我们添加到Redis之前设定的,用代码逻辑来进行维护

那么这个key一旦存储到了Redis里面,没有任何干预的情况下是永不过期的。

也就是说有线程在查询缓存的时候,代码逻辑里发现逻辑时间过期了,我们也直接把旧数据返还给客户端,同时数据更新交给另一个线程去做。此时有更多线程要查缓存,因为拿不到锁所以直接返回旧数据,避免了等待。

毕竟已经是高并发,一时的旧数据在很多时候也能接受,在我看来这是一种牺牲策略,客户端无须等待新数据到来,当然了,旧数据迟早要进行修改,但数据的更新操作完全可以交给其他线程,这样可以提高效率

总结

解决方案 优点 缺点
互斥锁 没有额外内存消耗;保持一致性;实现简单 线程要等待,性能受影响;可能死锁
逻辑过期 线程无需等待,性能较好 不保证一致性;有额外内存消耗;实现复杂
image-20240516215835297

解决商铺查询的缓存击穿问题

互斥锁

加互斥锁。获取锁与释放锁的方法定义如下:

private boolean tryLock(String key) {
    // 利用redis setnx - 向Redis中添加一个key,只用当key不存在的时候才添加并返回1
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
    // 直接返回flag的话,拆箱可能拿到空指针
    return BooleanUtil.isTrue(flag);
}

private void unlock(String key) {
    stringRedisTemplate.delete(key);
}

整合了缓存穿透和缓存击穿的代码。和前面的代码相比主要是在没有命中缓存,且没有存储空值的情况下,在查数据库前要先获取互斥锁。防止多个线程同时打到数据库。拿到锁以后查了数据库同步重建缓存返回数据,也就是这个线程的任务完成后,才能释放锁给其他线程用

private Shop queryWithMutex(Long id) {
    // 查缓存,若存在直接返回,不存在就查数据库
    String key = CACHE_SHOP_KEY + id;
    String shopJson = stringRedisTemplate.opsForValue().get(key);
    if (StrUtil.isNotBlank(shopJson)) {
        // 存在店铺信息,反序列化json为bean对象并返回
        Shop shop = JSONUtil.toBean(shopJson, Shop.class);
        return  shop;
    }
    // 如果没有命中缓存。命中空值,也就是shopJson是“” 空字符串,而非null,返回错误
    if (shopJson != null) {
        return null;
    }

    // ====== 没缓存,没空值,需要查数据库 -- 查之前先尝试获取互斥锁,拿到锁以后才能查数据库 ======
    String lockKey = "lock:shop:" + id; // 每个商户都对应一个锁
    try {
        boolean isLock = tryLock(lockKey);
        // 判断是否获取成功
        if (!isLock){   // 失败则休眠并重试(递归)
            Thread.sleep(50);   // 单位:ms
            // 如果担心递归造成爆栈,可以用循环,一样的
            return queryWithMutex(id);
        }
        // 拿到了锁,查数据库
        Shop shop = getById(id);
        // 模拟重建的延时,测试用
        Thread.sleep(200);

        if (shop == null) {
            // 空值有效期相对更短
            stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
            return null;

        }
        // 数据库中存在,将店铺信息序列化为json字符串,存储到redis缓存中,并添加超时剔除
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
        return shop;
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }finally {
        unlock(lockKey);    // 释放锁
    }
}

使用Jmeter做并发测试,设定并发量为1000。配置好路径和接口。

image-20240516223930023

运行完以后打开查看结果树:

image-20240516224240314

打开汇总报告可以看到我们的吞吐量为204

image-20240516224424100

如此大的数据量打下去,但是我们的日志显示我们的数据库只执行了一次的查询语句:

image-20240516224134785

说明我们已经使用互斥锁成功避免了缓存击穿。

逻辑过期

其实通常情况,因为是热点key,一般都是会出现在Redis里面的,且因为我们没有设置TTL,所以热点key是一定会一直存在的,但为了严谨起见,还是在判定缓存未命中的时候返回空。

主要流程:

1、我们判断一下缓存是否逻辑过期了,如果没有过期,我们直接返回信息到客户端即可
2、如果缓存逻辑过期了,这个线程就尝试获取互斥锁,如果获取成功,说明它是第一个访问Redis的这个过期key的线程,那么这个线程要做2件事:
(1)返回这个旧数据给客户,虽然数据是旧的,但是这是一种暂时的牺牲
(2)开辟新的线程来进行缓存数据的重建,重建完毕就释放这个互斥锁
3、除了第2种情况说的这个线程,其他线程在知道自己访问的数据过期之后,获取互斥锁都会失败,那么这时候只需要直接返还数据给客户就好了,可能是旧数据,也可能是新数据(第一个线程释放锁或者缓存数据重建成功了)

image-20240517164654459

首先我们要对Shop类增加逻辑过期时间这样一个字段,一种方案是直接添加,这种会违背开闭原则,一种是可以新增加一个类,并且类中包含了逻辑过期时间expireTime,但是该怎么把这个属性添加到Shop里面呢?可以让Shop继承这个类,就可以获得这个类中的属性,但同样会修改Shop这个类的源代码,同样违背开闭原则,所以最好的方法就是用关联来代替继承:

@Data
public class RedisData {
    /**
     *  万能的逻辑过期数据类
     */
    private LocalDateTime expireTime;
    private Object data;
}

商铺查询逻辑:

// 线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire(Long id){
    // 查缓存,若不存在直接返回空
    String key = CACHE_SHOP_KEY + id;
    String shopJson = stringRedisTemplate.opsForValue().get(key);
    if (StrUtil.isBlank(shopJson)) {
        return null;
    }
    // 命中,将json反序列化为对象,判断过期时间。
    RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
    // 获取到的redisdata存的数据本质是jsonobject类型
    JSONObject obj = (JSONObject) redisData.getData();
    Shop shop = JSONUtil.toBean(obj, Shop.class);
    LocalDateTime expireTime = redisData.getExpireTime();
    // 没过期直接返回
    if (expireTime.isAfter(LocalDateTime.now()))
        return shop;

    // 过期了重建缓存,尝试获取互斥锁
    String lockKey = LOCK_SHOP_KEY + id; // 每个商户都对应一个锁
    boolean isLock = tryLock(lockKey);
    // 判断是否获取成功
    if (!isLock){   // 失败说明已经有别的线程处理了,不需要再尝试重建
        return shop;
    }
    // 拿到了锁,开启独立线程重建缓存
    CACHE_REBUILD_EXECUTOR.submit(() -> {
        // 为了便于测试缓存重建机制设置20秒逻辑缓存
        try {
            this.saveShop2Redis(id, 20L);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }finally {
            //释放锁
            unlock(lockKey);
        }
    });
    return shop;
}

使用Jmeter测试,100个线程,1s执行完。查看idea后台,可以发现我们只执行了一次重构,说明只有一个线程操作了数据库,其他返回的在重建前是旧数据,重建完是新数据。这也证明了逻辑过期方法会造成短暂的数据不一致的情况。

image-20240517172349168

缓存工具封装

基于StringRedisTemplate封装一个缓存工具类,可以满足以下的需求:
1、将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间。 — 存储对象
2、将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题。 — 存储逻辑过期对象
3、根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题。 — 取对象
4、根据指定的key查询缓存,并反序列化指定类型,需要利用逻辑过期解决缓存击穿问题。 — 取逻辑过期对象
1+3:存储普通对象;2+4:存储热点数据

@Slf4j
@Component
public class CacheClient {

    private final StringRedisTemplate stringRedisTemplate;

    public CacheClient(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    // 将任意Java对象序列化为json并存储在string类型的key中
    public void set(String key, Object value, Long time, TimeUnit unit) {
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
    }

    // 带逻辑过期的存储方法
    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
        // 逻辑过期类
        RedisData redisData = new RedisData();
        redisData.setData(value);
        // 设置逻辑过期时间
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }

    /**
     * 解决缓存穿透,这里用泛型接收任意类型id和对象类型,以便返回对应类型bean
     * 参数:key的前缀,对应对象的id,以及对象类型
     * 这里的id也同样是泛型,因为没办法保证用户传过来的id类型是Int还是Long或者其他
     * 如果redis不存在,需要查询数据库,通用的函数根本不知道从数据库的哪张表进行查询(如果写到service可以利用mybatis-plus getById查对应类型),这里要自行传入 -- 函数式编程
     * Function<T, R>表示有参数有返回值的类型,“dbFallback”表示数据库降级逻辑,代表查询Redis失败后要去做的后备方案
     */
    public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
        String key = keyPrefix + id;
        String json = stringRedisTemplate.opsForValue().get(key);
        if (StrUtil.isNotBlank(json))
            return JSONUtil.toBean(json, type);
        if (json != null) return null;
        // 传入的函数,参数T=id,返回值R为对象类型
        R r = dbFallback.apply(id);
        //不存在,返回错误
        if (r == null){
            //存一个null到Redis中,避免缓存穿透
            stringRedisTemplate.opsForValue().set(key, "", 2, TimeUnit.MINUTES);
            return null;
        }
        //存在,写入Redis,直接用set方法
        this.set(key, r, time, unit);
        //返回
        return r;

    }

    //逻辑过期解决缓存击穿问题
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

    public <ID, R> R queryWithLogicalExpire(String keyPrefix, String lockKeyPrefix, ID id, Class<R> type,Function<ID, R> dbFallback, Long time, TimeUnit unit){
        String key = keyPrefix + id;
        String json = stringRedisTemplate.opsForValue().get(key);
        // 判断redis中是否存在
        if (StrUtil.isBlank(json)) {
            // 未命中,直接返回
            return null;
        }
        // 命中,先把json反序列化成对象
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        JSONObject data = (JSONObject) redisData.getData();
        // 获取到对象类型的r
        R r = JSONUtil.toBean(data, type);
        LocalDateTime expireTime = redisData.getExpireTime();
        // 判断是否过期,未过期直接返回r
        if (expireTime.isAfter(LocalDateTime.now())){
            return r;
        }
        // 已过期,缓存重建。尝试获取互斥锁。互斥锁key由参数传递
        String lockKey = lockKeyPrefix + id;
        boolean isLock = tryLock(lockKey);
        //判断是否获取锁成功
        if (isLock){
            //成功获取锁,开启独立线程来实现缓存重建,用线程池来做
            CACHE_REBUILD_EXECUTOR.submit(() -> {
                try {
                    //查询数据库,这里依旧使用函数式编程
                    R r1 = dbFallback.apply(id);
                    //写入Redis
                    this.setWithLogicalExpire(key, r1, time, unit);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }finally {
                    //释放锁
                    unlock(lockKey);
                }
            });
        }
        //没有拿到锁,直接返回信息
        return r;
    }

    private boolean tryLock(String key){
        // opsForValue里面没有真正的setNx,而是setIfAbsent,表示如果不存在就执行set
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    private void unlock(String key){
        stringRedisTemplate.delete(key);
    }
}

工具类调用

public Result queryById(Long id) {
    // 缓存穿透
    // Shop shop = queryWithPassTrough(id);
    // 缓存击穿 -- 互斥锁
    // Shop shop = queryWithMutex(id);
    // 缓存击穿 -- 逻辑过期
    // Shop shop = queryWithLogicalExpire(id);
    // 封装的工具类,解决缓存穿透
    // Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);
    // 封装的工具类,解决缓存击穿
    Shop shop = cacheClient.queryWithLogicalExpire(CACHE_SHOP_KEY, LOCK_SHOP_KEY, id, Shop.class, this::getById, 10L, TimeUnit.SECONDS);
    if (shop == null)
        return Result.fail("商铺不存在");
    return Result.ok(shop);
}

==优惠券秒杀==

全局唯一ID

每个店铺都可以发布优惠券(代金券),当用户抢购的时候,就会生成订单并且保存到tb_voucher_order这张表中:

CREATE TABLE `tb_voucher_order`  (
  `id` bigint(20) NOT NULL COMMENT '主键',
  `user_id` bigint(20) UNSIGNED NOT NULL COMMENT '下单的用户id',
  `voucher_id` bigint(20) UNSIGNED NOT NULL COMMENT '购买的代金券id',
  `pay_type` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '支付方式 1:余额支付;2:支付宝;3:微信',
  `status` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下单时间',
  `pay_time` timestamp NULL DEFAULT NULL COMMENT '支付时间',
  `use_time` timestamp NULL DEFAULT NULL COMMENT '核销时间',
  `refund_time` timestamp NULL DEFAULT NULL COMMENT '退款时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;

可以发现,我们的主键ID没有使用自增长,这是因为如果使用数据库自增ID就会存在一些问题:
1、ID的规律性太明显,容易让别人猜测到信息(比如商城在一天时间内,卖出了多少单)
2、受单表数据量的限制(订单可能数据非常大,可能会分多表进行存储,但表的自增长相互之间不受影响,所以不同表之间可能会出现ID相同的情况,也就是说这种时候会违背ID的唯一性,这显然是不可以的)
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:1、唯一性 2、高可用 3、高性能 4、递增性 5、安全性

除了第5点,Redis及其数据结构已经可以直接满足前4点的要求了,为了增加ID的安全性,不要直接使用Redis自增的数值,而是拼接一些其他信息,最终我们将ID组成定义为64位的二进制数,分别是1位符号位,31位时间戳,32位序列号。

Redis实现

@Component
public class RedisIdWorker {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 开始时间戳由main函数运行得到
     */
    public static final long BEGIN_TIMESTAMP = 1716076800L;
    /**
     * 序列号的位数:32bit,秒内的计数器,支持每秒产生2^32个不同ID
     */
    public static final int COUNT_BITS = 32;

    public long nextId(String keyPrefix){
        //获得当前时间
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        //生成时间戳:31bit,以秒为单位,可以使用69年
        long timestamp = nowSecond - BEGIN_TIMESTAMP;
        /**
         * 接下来生成序列号
         * 我们的key的设置除了加上icr表示是自增长的,还需要在最后拼接一个日期字符串
         * 这是因为我们的序列号上限是2^32,并不大,如果每天的key都是一样的,这是很有可能超过上限的
         * 在后面拼接一个日期字符串,可以保证每一天的key都是不一样的,而且一天内也基本不可能到达2^32的上限
         * 这样做还有一个好处,我们以后可以根据每天或者每月来查看value值,起到统计效果
         */
        //获取当前日期,精确到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

        //拼接并返回,这里灵活用位运算
        return timestamp << COUNT_BITS | count;
    }

    public static void main(String[] args) {
        //定义时间为2024年5月19日00:00:00
        LocalDateTime time = LocalDateTime.of(2024, 5, 19, 0, 0, 0);
        //将时间变成变成秒数的形式
        long second = time.toEpochSecond(ZoneOffset.UTC);
        //在这里运行出来的时间作为BEGIN_TIMESETAMP
        System.out.println(second);
    }
}

全局唯一ID生成策略

  • UUID:优点使用简单,不依赖其他组件,不影响数据库扩展;缺点数据库索引效率低,太过于无意义.用户不友好,空间占用大,集群环境容易重复
  • Redis自增:优点利用redis操作原子性的特征,可以保证在并发的时候不会重复,拓展性强,可以方便的结合业务进行处理;缺点增加一侧网络开销 需要对reids服务实现高可用
  • snowflake算法:优点性能较优,速度快 无需第三方依赖,实现也简单 可以根据实际情况调整和拓展算法,方便灵活;缺点依赖时间机器,如果发生回拨会导致生成id重复
  • 数据库自增:优点无需编码,性能也过得去,索引友好;缺点大表不能做水平分表,否则插入删除易出现问题,依赖前期规划,拓展麻烦 依赖mysql内部维护自增锁,高并发下插入数据影响性能

秒杀数据表

每个店铺都可以发布优惠券,分为平价券和特价券,平价券可以任意购买,而特价券需要秒杀抢购,表关系如下

tb_voucher(平价与秒杀券都有):优惠券基本信息(金额,规则等):上面的type可以表示标识出是平价券还是特价券,如果是特价券我们也需要一些特定的信息,因此我们会专门拓展出一张表。
tb_seckill_voucher(对应秒杀券):优惠券库存、开始抢购时间、结束抢购时间(特价券需要此表)

CREATE TABLE `tb_voucher`  (
  `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
  `shop_id` bigint(20) UNSIGNED NULL DEFAULT NULL COMMENT '商铺id',
  `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '代金券标题',
  `sub_title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '副标题',
  `rules` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '使用规则',
  `pay_value` bigint(10) UNSIGNED NOT NULL COMMENT '支付金额,单位是分。例如200代表2元',
  `actual_value` bigint(10) NOT NULL COMMENT '抵扣金额,单位是分。例如200代表2元',
  `type` tinyint(1) UNSIGNED NOT NULL DEFAULT 0 COMMENT '0,普通券;1,秒杀券',
  `status` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '1,上架; 2,下架; 3,过期',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 10 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;

CREATE TABLE `tb_seckill_voucher`  (
  `voucher_id` bigint(20) UNSIGNED NOT NULL COMMENT '关联的优惠券的id',
  `stock` int(8) NOT NULL COMMENT '库存',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `begin_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '生效时间',
  `end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '失效时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`voucher_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '秒杀优惠券表,与优惠券是一对一关系' ROW_FORMAT = Compact;

添加优惠券

在VoucherController中提供一个接口,调用就可以实现添加普通券。mapper中普通券除了定义对应tb_voucher的内容,还添加了库存、生效时间和失效时间字段。这样添加普通券和添加秒杀券都可以通过这个类传递对象,传递过来以后如果是普通券,这几个字段默认exsit=false(也就是没有该字段到数据库表的映射),直接利用VoucherServiceImpl的save方法保存即可。如果是秒杀券,就首先save到tb_voucher,再对库存、生效时间、失效时间等手动保存,利用seckillVoucherService添加到tb_seckill_voucher表。也就实现了通过前端一次传递,就在后端保存两张表的信息,简化代码逻辑。

image-20240519145646945

实现秒杀下单

接口地址:/voucher-order/seckill/{id}

请求方式:POST

请求参数:

参数名称 参数说明 请求类型 是否必须 数据类型 schema
id 优惠券id path true integer(int64)

返回值:订单id

基础功能实现(不考虑并发):

@Override
@Transactional  // 涉及多表操作,添加事务以便回滚
public Result seckillVoucher(Long voucherId) {
    // 查询优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    if (voucher == null) {
        return Result.fail("秒杀券不存在!");
    }
    // 判断时间区间
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        return Result.fail("秒杀尚未开始!");
    }else if (voucher.getEndTime().isBefore((LocalDateTime.now()))) {
        return Result.fail("秒杀已经结束!");
    }

    // 判断库存
    if (voucher.getStock() < 1) {
        return Result.fail("库存不足!");
    }
    // 扣减库存
    boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
    if (!success) {
        return Result.fail("库存不足!");
    }

    // 创建订单
    VoucherOrder voucherOrder = new VoucherOrder();
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    // 从threadlocal中取用户id
    Long userId = UserHolder.getUser().getId();
    if (userId == null) {
        return Result.fail("用户未登录!");
    }
    voucherOrder.setUserId(userId);
    voucherOrder.setVoucherId(voucherId);
    // 保存订单
    save(voucherOrder);

    // 返回订单id
    return Result.ok(orderId);
}

库存超卖问题

以上代码存在线程安全问题。高并发场景下会有库存超卖 – 在库存为1的时候,多个线程同时查询库存,得到库存1,后面的线程读到的都是脏数据。

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:

悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。(Synchronized、Lock等)

乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。(如果没有修改,那就是安全的;如果已经被其他线程修改说明发生了安全问题,此时可以重试或异常

显然乐观锁的性能会好很多,但是实现起来会更复杂,我们要处理好关键的一点,那就是更新数据的时候,该如何去判断有没有其它线程对数据做了修改

乐观锁的实现方式有2种方法(其实思想相同):
1、版本号法:
给数据增加一个字段version,初始值为1,每次我们要修改库存量之前都需要先查询库存量与版本号,然后线程执行SQL语句,执行SQL语句必须要确定数据库中的这条数据的版本号就是查询出来的版本号,如果不相同说明有其他线程修改了数据,导致当前数据的版本号与之前查询的不一样:
2、CAS法
上面的方法加一个版本号其实是一种标识,但是我们不一定要借助version,实际上我们可以直接依靠库存量来做标识,在对数据库进行修改的时候,我们要首先判断当前数据的库存量与之前线程查询出来的库存量是否相同,不相同则说明发生线程安全问题,不能修改

乐观锁解决超卖

我们选用CAS法来解决超卖,根据上述思想,我们只需要在SQL语句那增加一个判断库存量的条件:

// 扣减库存
boolean success = seckillVoucherService.update()
    .setSql("stock = stock - 1")
    .eq("voucher_id", voucherId)
    .eq("stock", voucher.getStock())
    .update();

用Jmeter测试发现200个线程抢100张票,只卖出了20+张票。

问题:假设stock=100,当线程查询出来的stock与数据库的stock不一致的时候,并不能说明票卖完了,只能说明其他线程抢先了票。理论上库存量大概率不为0,该线程还是应该要能够实现买票操作,但全都因为查询的stock与数据库不一致导致有大量线程买票失败。

修改:只需要查询数据库中的stock是否大于0即可。— .gt("stock", 0)

boolean success = seckillVoucherService.update()
    .setSql("stock = stock - 1")
    .eq("voucher_id", voucherId)
    .gt("stock", 0)	// /where id = ? and stock > 0
    .update();

重新测试发现能够正好下单100份秒杀券。

但是这不代表乐观锁就是完美的,很显然代码逻辑中要操作数据库,大量的线程就会给数据库带来压力,仅仅使用乐观锁在更高并发的场景下还是不太够的。

一人一单

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单。

解决:只需要判断当前尝试抢优惠券的线程,其用户id在订单表中是否已经存在了,如果存在则不允许下单。

这里仍然会有多线程问题,由于这段逻辑不涉及数据库更新而是要插入数据,所以加悲观锁。这里锁放在函数内部,因为如果在方法上加锁,那么不管是哪个用户来下单都变成串行了,所以我们需要在内部调用synchronized,并指定锁定对象为userId。

悲观锁

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Resource
    private ISeckillVoucherService seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        if (voucher == null) {
            return Result.fail("秒杀券不存在!");
        }
        // 判断时间区间
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            return Result.fail("秒杀尚未开始!");
        }else if (voucher.getEndTime().isBefore((LocalDateTime.now()))) {
            return Result.fail("秒杀已经结束!");
        }

        // 判断库存
        if (voucher.getStock() < 1) {
            return Result.fail("库存不足!");
        }
        return createVoucherOrder(voucherId);
    }

    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        // 从threadlocal中取用户id
        Long userId = UserHolder.getUser().getId();

        /**
         * userId值一样的,我们用同一把锁,即为同一个用户的多线程访问加锁。
         * 但是每个请求一来,我们的id对象都是全新的(toString得到的是新的string对象)
         * 所以要加上intern()方法,从字符串常量池中返回字符串的规范表示,保证多个请求访问一个用户字符串时得到的是同样的地址
         */
        synchronized (userId.toString().intern()) {
            // 查询用户是否已经下单了对应优惠券
            long count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
            if (count > 0) {
                return Result.fail("用户已经购买!");
            }

            // 没买过,扣减库存
            boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherId)
                .gt("stock", 0)
                .update();
            if (!success) {
                return Result.fail("库存不足!");
            }

            // 创建订单
            long orderId = redisIdWorker.nextId("order");
            VoucherOrder voucherOrder = new VoucherOrder();
            voucherOrder.setId(orderId);
            voucherOrder.setUserId(userId);
            voucherOrder.setVoucherId(voucherId);
            // 保存订单
            save(voucherOrder);

            // 返回订单id
            return Result.ok(orderId);
        }
    }
}

需要注意一个细节,上面代码还是会发生并发安全问题:

我们这边的整个函数已经是被Spring托管了,所以事务的提交会在函数执行完毕之后,也就是说我们会先释放锁,再提交事务,当我们事务还没有提交完成,修改数据还没写入数据库,却又有其他线程进来了,再次发生线程并发问题。

事务与锁顺序

所以,锁的范围太小了,我们应该要把整个函数都锁起来。这样就是先获取锁,提交事务,再释放锁。

image-20240519174917100

事务生效问题

直接调用createVoucherOrder方法是不行的,因为它相当于调用了this.createVoucherOrder,然而当前类并不是代理对象,这会导致Sping代理失效。

Spring的事务是通过AOP来实现的,只有通过代理对象调用@Transactional注解的对象方法时,事务才会生效,也就是直接调用createVoucherOrder()方法事务才会生效,调用seckillVoucher()方法后间接调用createVoucherOrder()方法事务是不会生效的。因为这次调用并不是通过代理对象来实现的。

image-20240519175418682

所以我们要先获得当前对象的代理对象,然后再去调用这个函数(这个函数也要创建在service接口中,表明它属于代理对象能调用的方法):

synchronized (userId.toString().intern()) {
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
}

需要引入依赖:

<dependency>
        <groupId>org.aspectj</groupId>
        <artifactId>aspectjweaver</artifactId>
</dependency>

并且在启动类中需要暴露代理对象:

image-20240519180101176

使用Jmeter测试,同一用户200个线程,最终只插入一条数据。问题解决。

image-20240519222713031

集群下的线程并发安全问题

现在已经通过加锁解决一人一单问题安全,但是这只能解决单机情况的,集群模式依旧不行。

利用idea模拟集群模式,复制一个新的服务,设置端口8082(program arguments)

image-20240519225432012

重启形成2个机子的集群:

image-20240519231204101

修改nginx的conf目录下的nginx.conf文件,配置反向代理、负载均衡:

image-20240519230039737

最后重新加载一下Nginx:nginx.exe -s reload

最后访问网址,并连续刷新2次:

http://localhost:8080/api/voucher/list/1

查看后台可以发现两个启动服务都可以接受到信息,因为api(8080)包括了8081与8082,访问是以轮询的方式进行的。这样就实现了负载均衡。

测试:需要在锁那里打个断点,并且在postman里面分别抢券(都用同一个用户)来进行优惠券抢购,可以发现只用1个用户信息,数据库中却少了2张券,说明又一次发生了并发问题。

我们部署另外一台Tomcat,这是锁的锁监视器其监视的内容和之前锁中的监视器内容是不一样的,那么新Tomcat的线程获取锁就会成功(获取的userId.toString()是不一样的,不理解的可以去看toString方法的源码),并成功的操作数据库,因此才会造成线程并行问题。
如下图,线程1、3发生了线程安全问题。因此我们只能保证单个JVM下的线程安全,却无法保证集群中多个JVM的线程安全,我们需要在集群中加锁,也就是分布式锁,将在后续讲解。

image-20240519231759949

==分布式锁==

基本原理

JVM内的线程之间可以用实现互斥,synchronized锁只能保证单个JVM内部的多个线程之间互斥,不能保证多JVM下多进程的互斥。因为每个JVM都只有一个锁监视器,但是多个JVM就会有多个锁监视器,导致多个线程获取到锁,从而发生线程安全问题。
因此,要实现互斥,可以让多个JVM都共用一个锁监视器,这样让JVM与JVM之间、每个JVM的线程之间都共用这个锁,就不会发生线程安全问题了。

由此引出分布式锁的定义:满足分布式系统或集群模式下多进程可见并且互斥的锁。
需要满足的特点:多进程可见、互斥、高可用、高性能、安全性

不同实现方式对比

MySQL Redis Zookeeper
互斥 本身的互斥锁机制 利用互斥命令setnx 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用锁超时时间,到时释放 临时节点,断开连接自动释放

基于Redis的分布式锁

实现分布式锁时需要实现的两个基本方法:

  • 获取锁:

    • 互斥:确保只能有一个线程获取锁
    • 非阻塞:尝试一次,成功返回true,失败返回false
  • 释放锁:

    • 手动释放
    • 超时释放:获取锁时添加一个超时时间

我们利用redis 的setNx 方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个key 了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的哥们,等待一定时间后重试即可。(解决缓存击穿的互斥锁方案就用了)

image-20240520153020729

实现分布式锁版本1

直接在utils包下创建ILock接口与SimpleRedisLock 类,这个内容和之前的差不多

public class SimpleRedisLock implements ILock{
    public static final String KEY_PREFIX = "lock:";

    private String name;    // 不同业务有不同的锁,业务name即为锁的name
    private StringRedisTemplate stringRedisTemplate;

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        //获取线程表示
        long threadId = Thread.currentThread().getId();
        //获取锁
        Boolean success = stringRedisTemplate.opsForValue().
                setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
        //防止拆箱操作,不能直接返回success
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

接着修改我们的下单业务的impl,改变之前的加锁逻辑:

//创建锁对象,key需要加上用户id,因为不同的用户无所谓,只有同一个用户才要锁起来,因此要指定好用户id
SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
boolean isLock = simpleRedisLock.tryLock(1200);
if (!isLock) {
    // 获取锁失败,由于是对一个人限制锁,所以获取失败直接返回
    return Result.fail("不允许重复下单!");
}
try {
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
}finally {
    simpleRedisLock.unlock();
}

集群下测试:

image-20240520155946888 image-20240520160019728

Redis分布式锁误删问题

情况:持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况说明

解决方案:解决方案就是在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果不属于自己,则不进行锁的删除,假设还是上边的情况,线程1卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1反应过来,然后删除锁,但是线程1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程2走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。

1653385920025

解决

根据上述的分析,我们需要修改一下分布式锁,使得满足:
1、在获取锁时存入线程标识

在这里增加了UUID来作为线程的标识,不再使用线程自己的ID了,这是因为虽然每个JVM的线程都是递增的,每个JVM内部之间的都会维护线程的唯一ID,但是不同的JVM之间还是会产生冲突,因此让JVM自己去维护线程的ID,会导致不同JVM之间的ID冲突。
事实上,也可以用UUID来表示不同的JVM,用线程ID来区分JVM内部的线程,两者拼接在一块。

// 增加线程标识的前缀
private static final String ID_PREFIX = UUID.randomUUID().toString() + "-";

2、在释放锁时限获取锁中的线程标识,判断是否与当前线程标识一致(一致才可释放)

@Override
public boolean tryLock(long timeoutSec) {
    //获取线程表示
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    //获取锁
    Boolean success = stringRedisTemplate.opsForValue().
        setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
    //防止拆箱操作,不能直接返回success
    return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
    // 获取线程表示
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    // 获取线程中的id,判断是否一致
    String redisId = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
    if (threadId.equals(redisId))
        stringRedisTemplate.delete(KEY_PREFIX + name);
}

分布式锁的原子性问题

上述的方式已经可以解决业务阻塞导致的误删操作,但是还会有一些问题:

如果我们阻塞的不是业务,而是业务执行完了,并且判断锁标识成功,即将释放锁的时候发生的阻塞(这种阻塞不是业务阻塞,而可能是JVM内部的垃圾回收机制异常导致阻塞),这时候还会发生新的问题。
如果被阻塞的时间足够长,导致锁的TTL到期了,一旦释放,其他线程又开始乘虚而入,成功获取锁,执行业务。
这时候,被阻塞的线程恢复正常了,但是因为已经进行锁标识的逻辑判断了,这时候被阻塞的线程就可以完成这个释放锁的操作,再次造成误删问题。

分析一下问题发生的原因,之所以会出现这种情况,主要原因是锁标识的逻辑判断与锁的释放操作,是两个不同的操作,不满足原子性,所以当在两个操作之间发生了阻塞,那么线程并发问题依旧会出现。
所以,我们必须要保证判断锁标识的动作与释放锁的动作必须得保证原子性

Lua脚本解决多条命令原子性问题

想到原子性,我们很容易就想到MySQL中的事务,但是Redis中的事务却不太一样,Redis事务虽然能保障原子性,但是无法保证事务的一致性。Redis事务的操作是一系列的批处理,是在最终一次性执行的,必须要有乐观锁来做判断,会麻烦很多。

Lua语言能够保证原子性,是因为它在执行原子操作时会将其他线程或进程阻塞,直到该操作完成。

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍Redis提供的调用函数,语法如下:

redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行set name jack,则脚本是这样:

# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下:

# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

写好脚本以后,需要用Redis命令来调用脚本

EVAL script numkeys key… arg…

例如,要执行redis.call(‘set’, ‘name’, ‘jack’)这个脚本。0表示key类型的参数的个数

EVAL “return redis.call(‘set’, ‘name’, ‘jack’)0

如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数 – 1代表key类型的参数有一个,也就是紧接着的name,会放入KEYS[1],而Rose则放入ARGV[1]中

1653392438917

Java调用Lua脚本改造分布式锁

在resources下新建Lua文件:

-- 锁中的线程标识与当前线程一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
    -- 释放锁
    return redis.call('del', KEYS[1])
end
-- 不一致则什么也不做
return 0

在impl中增加静态变量,防止每次调用unLock函数都要重新调用Lua脚本。修改unLock函数,调用Lua脚本。

image-20240520171239197

==分布式锁-Redisson==

基于setnx的分布式锁存在下面的问题:
1、不可重入:同一个线程无法多次获取同一把锁(当同一个线程内,方法A获取了锁,然后调用方法B,方法B中没办法获取同一把锁)
2、不可重试:获取锁只尝试一次就返回false,没有重试机制
3、超时释放:虽然可以避免死锁,但如果业务耗时很长,也会导致锁释放,会再次发生线程安全问题
4、主从一致性问题:若Redis提供了主从集群,主从同步存在延迟。当主节点宕机时,从节点充当主节点。如果从节点没有同步主节点中的锁数据,即没有锁标识,则会让其他节点拿到锁

Redisson是一个在Redis基础上实现的分布式工具集合,提供了很多分布式服务,包含了各种分布式锁的实现。

Redisson快速入门

引入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.23.1</version>
</dependency>

配置客户端

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
        //配置
        Config config = new Config();
        //添加Redis地址,这里添加的是单点的地址,也可以使用config.userClusterServer()来添加集群的地址
        config.useSingleServer().setAddress("redis://ip:6379").setPassword("pwd");
        //创建客户端
        return Redisson.create(config);
    }
}

使用Redisson的分布式锁:

image-20240520185500237

订单业务中的修改:

//创建锁对象,key需要加上用户id,因为不同的用户无所谓,只有同一个用户才要锁起来,因此要指定好用户id
//        SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//        boolean isLock = simpleRedisLock.tryLock(1200);

// 获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
…………

Redisson的可重入锁原理

image-20240520192526025

我们需要找到一种数据结构,能够在一个key里面获取多个东西——Hash:

Hash结构(hset)的KEY对应的VALUE包含了field与value,因此我们可以让KEY对应锁名称,让field对应线程标识,让value位置记录锁的重入次数(初始为0)。

因此,发生上述情况的时候,虽然线程的标识是相同的,但我们可以将重入次数+1,代表第二次获取锁,这时候整体的VALUE是不相同的。
需要注意的是,method2执行完毕以后不能直接释放这个key对应的锁,因为这样的话会导致method1没有执行完毕就被删掉了,解决的方法是让重入次数-1,只有所有业务都执行完了(重入次数=0)的时候才能真正释放。
这样我们的流程就会发生变化(哈希结构没有直接的EX来设置有效期):

image-20240520193101664

用Lua脚本来保证代码的原子性,而Lua代码获取锁与释放锁的逻辑已经是保存到RedissonLock类中了,我们只需要直接调用tyrLock与unlock方法就行。

总结:Redisson的可重入原理的核心就是因为我们使用了hash结构,记录了获取锁的线程以及可重用的次数

Redisson的锁重试和WatchDog机制

redisson在尝试获取锁的时候,如果传了时间参数,就不会在获取锁失败时立即返回失败,而是会进行重试。

  • waitTime:最大等待时间,如果使用 tryLock() 的时候,有传参数表明是可重试的锁;反之,不是!

  • leaseTime:超时释放时间,默认是-1,建议不要设定,Redisson看门狗机制可以进行锁续约

锁重试

public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return this.tryLock(waitTime, -1L, unit);
}

tryLock() :里面实现了重试机制。通过消息订阅信号量机制,避免了 while(true) 让其一直无效尝试,避免了CPU空转问题

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 转成毫秒,后面都是以毫秒为单位
    long time = unit.toMillis(waitTime);
    // 当前时间
    long current = System.currentTimeMillis();
    // 线程ID-线程标识
    long threadId = Thread.currentThread().getId();

    // 尝试获取锁,返回锁的剩余过期时间(对应获取锁的lua脚本的返回值) tryAcquire() ==========!!!
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        
    // 如果上面尝试获取锁返回的是null,表示成功;如果返回的是时间则表示失败。
    if (ttl == null) {
        return true;
    }

    // 剩余等待时间 = 最大等待时间 -(用现在时间 - 获取锁前的时间)
    time -= System.currentTimeMillis() - current;

    // 剩余等待时间 < 0 失败
    if (time <= 0) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }

    // 再次获取当前时间
    current = System.currentTimeMillis();
    
    // ==================重试逻辑,但不是简单的直接重试!==================
    // subscribe是订阅的意思,订阅锁的释放事件
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    // 等待一段时间,看是否能获取到锁的释放事件
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
        // 如果等待超时,则取消订阅并获取锁失败,返回false
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.onComplete((res, e) -> {
                if (e == null) {   
                    // 取消订阅
                    unsubscribe(subscribeFuture, threadId);
                }
            });
        }
        // 获取锁失败
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    // =======循环重试=======
    try {
        // 减去已经消耗的时间
        time -= System.currentTimeMillis() - current;
        // 如果还有剩余等待时间
        if (time > 0L) {
            do {
                // 获取当前时间戳
                long currentTime = System.currentTimeMillis();
                // 再次尝试获取锁,返回锁的剩余过期时间
                ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                // 如果锁成功获取
                if (ttl == null) {
                    return true;
                }
                // 减去已经消耗的时间
                time -= System.currentTimeMillis() - currentTime;
                // 如果等待时间已经用完,则获取锁失败,返回false
                if (time <= 0L) {
                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                }
                currentTime = System.currentTimeMillis();
                // 根据剩余时间尝试获取锁
                if (ttl >= 0L && ttl < time) {
                    // 这里采用====信号量机制=======,等待释放锁的线程释放锁
                    ((RedissonLockEntry)subscribeFuture.getNow()).
                        getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    // 如果这个ttl时间比当前线程尝试获取锁的时间还长
                    // 那么就直接等待尝试获取锁的时间
                    ((RedissonLockEntry)subscribeFuture.getNow()).
                        getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
                // 减去已经消耗的时间
                time -= System.currentTimeMillis() - currentTime;
            } while(time > 0L); // 如果还有剩余等待时间,则继续尝试获取锁
            // 如果等待时间已经用完,则获取锁失败,返回false
            this.acquireFailed(waitTime, unit, threadId);
            return false;
        }
        // 如果等待时间已经用完,则获取锁失败,返回false
        this.acquireFailed(waitTime, unit, threadId);
        return false;
    } finally {
        // 释放订阅
        this.unsubscribe(subscribeFuture, threadId);
    }
}

在 tryLock() 中调 tryAcquire() 执行获取锁的操作

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

/**
 * 尝试以异步方式获取锁的剩余过期时间。
 * @param waitTime 等待时间
 * @param leaseTime 锁的租期时间
 * @param unit 时间单位
 * @param threadId 当前线程ID
 * @return 表示剩余过期时间的Future对象
 */
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 锁自动释放时间为默认的-1,所以会走else。如果设定了释放时间,走if
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }

    // leaseTime我们没有传,这里设定默认值(看门狗)30s
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
            commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);

    // 在获取剩余过期时间的异步结果完成后,执行回调、回调函数 ttlRemaining:剩余有效期,e:异常
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // 剩余有效期为null,表示获取锁成功!
        if (ttlRemaining == null) {
            // 锁续约
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    // 把 leaseTime锁释放时间 记录成一个本地的成员变量
    internalLockLeaseTime = unit.toMillis(leaseTime);

    // 获取锁成功返回nil(空),失败返回时间,锁的剩余有效期(pttl是以毫秒为单位)
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

超时续约

那么此时还有一个问题,如果当前持有锁的线程业务阻塞了,TTL到期了别其它线程获取到了锁,那么此时就会有安全问题了

而Redisson是通过看门狗来解决这个问题的

因为锁的失效时间是30s,当10s之后,此时这个timeTask 就触发了,他就去进行续约,把当前这把锁续约成30s,如果操作成功,那么此时就会递归调用自己,再重新设置一个timeTask(),于是再过10s后又再设置一个timerTask,完成不停的续约

那么大家可以想一想,假设我们的线程出现了宕机他还会续约吗?当然不会,因为没有人再去调用renewExpiration这个方法,所以等到时间之后自然就释放了。

/**
 * 更新锁的有效期
 * @param threadId 当前线程ID
 */
private void scheduleExpirationRenewal(long threadId) {
    // 这个entry里主要存储了两个东西,一个是更新锁释放时间的定时任务,还有一个就是线程ID
    ExpirationEntry entry = new ExpirationEntry();
    // 将entry添加到ConcurrentHashMap中,如果是第一次添加则会返回null
    // 保证每次重入拿到的是同一个extry
    ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    if (oldEntry != null) {
        // 将当前线程ID添加到旧的entry中
        oldEntry.addThreadId(threadId);
    } else { 
        // 如果旧的条目为null,说明是第一次添加该条目
        // 将当前线程ID添加到map中
        entry.addThreadId(threadId);
        // 第一次来,就需要创建的更新释放时间的定时任务
        this.renewExpiration();
    }
}

/**
 * 续约锁的过期时间。
 */
private void renewExpiration() {
    // 获取锁的过期续约条目
    ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    // 如果续约条目不为null
    if (ee != null) {
        // 创建一个定时任务,用于===定时执行续约操作===
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                // 获取锁的过期续约条目
                ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                // 如果续约条目不为null
                if (ent != null) {
                    // 获取第一个等待续约的线程ID
                    Long threadId = ent.getFirstThreadId();
                    // 如果线程ID不为null
                    if (threadId != null) {
                        // 异步执行续约操作
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        // 在续约操作完成后执行回调
                        future.onComplete((res, e) -> {
                            if (e != null) {
                                // 如果续约操作出现异常,则记录日志
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            } else {
                                // 如果更新超时时间成功,继续递归更新超时时间
                                if (res) {
                                    RedissonLock.this.renewExpiration();
                                }
                            }
                        });
                    }
                }
            }
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 更新间隔为看门狗时间的1/3也就是30秒
        // 将定时任务设置到续约条目中
        ee.setTimeout(task);
    }
}

再来看一下释放锁的源码

/**
 * 异步释放锁。
 * @param threadId 当前线程ID
 * @return 表示释放结果的Future对象
 */
public RFuture<Void> unlockAsync(long threadId) {
    // 创建一个Promise对象,用于表示释放结果
    RPromise<Void> result = new RedissonPromise();
    // 异步执行内部的解锁操作
    RFuture<Boolean> future = this.unlockInnerAsync(threadId);
    // 在解锁操作完成后执行回调
    future.onComplete((opStatus, e) -> {
        // 取消锁的自动更新释放时间
        this.cancelExpirationRenewal(threadId);
        if (e != null) { // 如果解锁操作出现异常
            // 设置Promise为失败状态,并将异常作为失败原因
            result.tryFailure(e);
        } else if (opStatus == null) { // 如果操作状态为null,说明锁未被当前线程持有
            // 设置Promise为失败状态,并抛出IllegalMonitorStateException异常
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
            result.tryFailure(cause);
        } else { // 否则,解锁成功
            // 设置Promise为成功状态
            result.trySuccess(null);
        }
    });
    // 返回表示释放结果的Future对象
    return result;
}

取消自动更新锁的释放时间方法

EXPIRATION_RENEWAL_MAP对应上面添加更新任务,这个Map里存的是一个个ExpirationEntry,ExpirationEntry里主要包含定时更新锁释放时间的任务和线程ID

/**
 * 取消锁的过期续约。
 * @param threadId 要取消续约的线程ID,如果为null,则表示取消所有线程的续约
 */
void cancelExpirationRenewal(Long threadId) {
    // 获取锁的过期续约任务
    ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    // 如果过期续约任务不为null
    if (task != null) {
        // 如果线程ID不为null,则移除指定线程的续约
        if (threadId != null) {
            task.removeThreadId(threadId);
        }
        // 如果线程ID为null,或者任务已经没有任何线程在续约
        if (threadId == null || task.hasNoThreads()) {
            // 获取任务的定时器任务
            Timeout timeout = task.getTimeout();
            // 如果定时器任务不为null,则取消定时器任务
            if (timeout != null) {
                timeout.cancel();
            }
            // 从过期续约映射中移除该任务
            EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
        }
    }
}

小结

  • 首先线程来获尝试获取锁(调用Lua脚本),判断Lua脚本的返回值TTL是否为NULL(为剩余释放时间说明获取失败)

  • ttl为空说明获取成功,判断锁的释放时间是否设置,为-1表示未设置,则开启看门狗(看门狗的释放锁时间为30秒)

  • 如果自己设置了锁的释放时间则不会启用开门狗,也就是是不会自动更新释放时间

  • 如果获取TTL不为null说明锁被其他线程给占用了,被占用就去判断当前线程的剩余等待时间是否大于0

  • 剩余等待时间不大于0则说明等待超时直接返回false表示获取锁失败

  • 剩余等待时间大于0则订阅等待锁的释放信号,等待别的线程的剩余释放时间

  • 如果别的线程的释放时间大于等于当前获取锁线程的最大等待时间,则当前线程直接等待最大等待时间

  • 等待完毕后判断等待时间是否超时,是则返回false获取锁失败

  • 没有超时则继续尝试获取锁

image-20240520194740741

Redisson分布式锁原理:
1、可重入:利用hash结构记录线程id和重入次数
2、可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
3、超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间

Redisson锁的MutiLock原理

为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例

此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。

1653553998403

为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

1653554055048

那么MutiLock 加锁原理是什么呢?

当我们去设置了多个锁时,redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试.

1653553093967

总结

1)不可重入Redis分布式锁:

原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示
缺陷:不可重入、无法重试、锁超时失效

2)可重入的Redis分布式锁:

原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
缺陷:redis宕机引起锁失效问题

3)Redisson的multiLock:

原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维成本高、实现复杂

参考

黑马程序员Redis部分资料

Cookie、Session、Token概念、区别、如何实现:https://blog.csdn.net/weixin_44369049/article/details/132062232

CSDN笔记:https://blog.csdn.net/m0_52380556?type=blog

redisson:https://blog.csdn.net/weixin_53946852/article/details/137270519


文章作者: wolf-ll
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 wolf-ll !
  目录