李成笔记网

专注域名、站长SEO知识分享与实战技巧

【redis实战一】扩展SpringCache解决缓存击穿,穿透,雪崩

1、问题描述

我们在使用SpringCache的@Cacheable注解时,发现并没有设置过期时间这个功能。

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Cacheable {

    // cacheNames的别名。与cacheNames二选一即可
    @AliasFor("cacheNames")
    String[] value() default {};

    // 也就是我们存储到Redis的key的前缀部分。比如user:, 后面部分来源于参数
    @AliasFor("value")
    String[] cacheNames() default {};

    // 同一个缓存名称的不同参数,key是显式指定。如#id, 表示去参数种的id字段。支持SpEL表达式
    String key() default "";

    // 同一个缓存名称的不同参数,keyGenerator是因为无法直接取到参数,参数需要经过一系列较为复杂的处理才能获得。通过KeyGenerator生成
    String keyGenerator() default "";

    // 指定缓存管理器,通常不会指定,使用默认的即可
    String cacheManager() default "";

    // 指定缓存解析器
    String cacheResolver() default "";

    // 存入缓存的条件,支持SpEL表达式,结果为true才会存入缓存
    String condition() default "";

    // 不存入缓存的条件,支持SpEL表达式,结果为true则不会存入缓存
    String unless() default "";

    // 是否同步回填缓存,并发访问@Cacheable时,因为线程安全问题,缓存还没来得及写入Redis, 就已经开始新的访问了,从而导致数据库被N次访问。
    boolean sync() default false;

}

由上面的接口描述可以知道,我们没有设置缓存过期时间的地方。

而我们的缓存基本都是存在过期时间的,否则Redis的数据会越堆越多,并且没有释放,而最终的结果将会是Redis被撑爆。

因此,我们需要让SpringCache支持过期时间的设置,我们可以把这个过期时间放在cacheNames的值后面,并且通过#号设置, 形如下面这种写法

@Cacheable(cacheNames = "test2#3", key = "#id")
public TestEntity getById2(Long id){
    TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
    log.info("模拟查询数据库:{}", testEntity);
    return testEntity;
}

我们将过期时间跟在value这个key的后面,通过#号分割,这样可以方便我们将过期时间分散设置。

2、为什么要将缓存过期时间分散设置?

其实为什么要将缓存过期时间分散设置,就是因为缓存过期时间设置相同存在一个巨大的问题:“缓存集中失效,导致缓存雪崩”。

因为大量的缓存key在同一时间失效,导致大量的请求直接穿透缓存(缓存穿透),命中到数据库。

从而把数据库瞬间压垮,甚至导致服务宕机,从而导致服务雪崩。所以将缓存时间分散设置能够非常有效的避免缓存雪崩。

当然,缓存雪崩的解决方案还有另外一种加锁回填缓存方式,我们会在后面提到并给出实际操作代码,但两者一起用丝毫不会冲突。

3、扩展SpringCache使其支持过期时间设置

我们不仅仅要让SpringCache支持分散时间,后续还会将其封装为一个spring-boot-stater,成为一个公共的模块,让你在以后使用时能够直接复用。

3.1、思路说明

主要就是要重写CacheManager的逻辑,并且重写CacheManager的createRedisCache方法,将@Cacheble的cacheNames或value的值根据#号进行拆分。

然后使用RedisCacheConfiguration的entryTtl设置过期时间即可。

但是我们为了后面封装为stater作为基础,我们这里会自定义RedisConnectionFactory,RedisTemplate和CacheManager.

3.2 自定义Redis属性配置文件

/**
 * 自定义Redis属性文件
 */
@ConfigurationProperties(prefix = "redis", ignoreInvalidFields = true)  // prefix:属性配置前缀, ignoreInvalidFields: 忽略字段的校验
@Data   // 简化Setter和Getter
public class CustomRedisProperties {

    /**
     * 是否开启Redis的配置
     */
    private boolean enable = false;

    /**
     * Redis连接地址,IP:PORT,IP:PORT
     */
    private String host = "";
    /**
     * Redis连接密码
     */
    private String password = "";
    /**
     * 如果是单机版,可以选择哪个数据库
     */
    private Integer database = 0;

    /**
     * 最大重定向次数,可选配置
     */
    private Integer maxRedirects = null;
    /**
     * SpringCache前缀, 方便key管理
     */
    private String keyPrefix = "myrds";

    /**jedis连接池配置*/
    private CustomRedisPoolConfig<?> pool = new CustomRedisPoolConfig<>();

    @Data
    @EqualsAndHashCode(callSuper = false)
    public static class CustomRedisPoolConfig<T> extends GenericObjectPoolConfig<T> {
        // 定义扩展属性
    }
}

我们使用自定义的Redis属性配置文件,同时支持单机和集群两种模式,并且延用了原来的连接池配置。

3.3 自定义Redis配置抽象类

抽象类定义了一些RedisConnectionFactory,RedisTemplate等构建对象的构建方法。

/**
 * Redis配置抽象类
 */
@Slf4j
public abstract class AbstractRedisConfig {
    /**
     * Redis是否是集群的标识
     */
    protected AtomicBoolean redisCluster = new AtomicBoolean(false);

    /**
     * 根据Redis属性文件构造RedisConnectionFactory
     * @param redisProperties Redis属性文件
     * @return RedisConnectionFactory
     */
    protected RedisConnectionFactory getRedisConnectionFactory(CustomRedisProperties redisProperties) {
        if (StringUtils.isBlank(redisProperties.getHost())){
            throw new RuntimeException("redis host is not null");
        }

        // 根据逗号切割host列表
        Set<String> hosts = org.springframework.util.StringUtils.commaDelimitedListToSet(redisProperties.getHost());
        if (CollectionUtils.isEmpty(hosts)){
            throw new RuntimeException("redis host address cannot be empty");
        }

        // 只有一个host, 表示是单机host
        if (hosts.size() == 1){
            String hostPort = hosts.stream().findFirst().get();
            String[] hostArr = hostStr2Arr(hostPort);
            return getSingleConnectionFactory(redisProperties, hostArr);
        }

        // 集群处理
        RedisClusterConfiguration configuration = new RedisClusterConfiguration();
        List<RedisNode> listNodes = new ArrayList<>();
        for (String host : hosts) {
            String[] split = hostStr2Arr(host);
            RedisNode redisNode = new RedisClusterNode(split[0], Integer.parseInt(split[1]));
            listNodes.add(redisNode);
        }
        return getClusterConnectionFactory(redisProperties, configuration, listNodes);
    }

    /**
     * 构造单机版Redis连接工厂
     * @param redisProperties Redis属性文件
     * @param hostArr 连接地址列表,单机传一个
     * @return ConnectionFactory
     */
    protected LettuceConnectionFactory getSingleConnectionFactory(CustomRedisProperties redisProperties, String[] hostArr) {
        redisCluster.set(false);
        // 构造单机版Redis连接工厂
        RedisStandaloneConfiguration singleConf = new RedisStandaloneConfiguration();
        singleConf.setHostName(hostArr[0]);
        singleConf.setPassword(RedisPassword.of(redisProperties.getPassword()));
        singleConf.setPort(Integer.parseInt(hostArr[1]));
        singleConf.setDatabase(redisProperties.getDatabase());

        // 创建连接池
        final LettucePoolingClientConfiguration configuration = LettucePoolingClientConfiguration.builder().poolConfig(redisProperties.getPool()).build();
        LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(singleConf, configuration);
        log.info("==============创建单机版Redis连接工厂成功==================");
        log.info("=============={}==================", redisProperties.getHost());
        return connectionFactory;
    }

    /**
     * 构造集群版Redis连接工厂
     * @param redisProperties Redis属性文件
     * @param listNodes 集群节点列表
     * @return ConnectionFactory
     */
    protected LettuceConnectionFactory getClusterConnectionFactory(CustomRedisProperties redisProperties, RedisClusterConfiguration configuration, List<RedisNode> listNodes) {
        redisCluster.set(true);
        // 设置最大重定向次数
        configuration.setMaxRedirects(redisProperties.getMaxRedirects());
        configuration.setClusterNodes(listNodes);

        // 设置密码
        configuration.setPassword(RedisPassword.of(redisProperties.getPassword()));

        // 构造集群版Redis连接工厂
        final LettucePoolingClientConfiguration lettucePoolingClientConfiguration =
                LettucePoolingClientConfiguration.builder().poolConfig(redisProperties.getPool()).build();

        LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration, lettucePoolingClientConfiguration);

        log.info("==============创建集群版Redis连接工厂成功==================");
        log.info("=============={}==================", redisProperties.getHost());

        return connectionFactory;
    }


    /**
     * host字符串转换为数组
     * @param hostPort Redis连接地址和端口
     * @return hostPort数组
     */
    protected String[] hostStr2Arr(String hostPort) {
        String[] hostArr = hostPort.split(":");
        if (hostArr.length != 2) {
            throw new RuntimeException("host or port err");
        }
        return hostArr;
    }


    protected RedisTemplate<String, Object> buildRestTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 设置连接工厂
        template.setConnectionFactory(redisConnectionFactory);

        // String序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<?> jacksonSerializer = buildRedisJackson();

        // key and hashKey 序列化方式
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);

        // value serializer
        template.setDefaultSerializer(jacksonSerializer);
        template.setValueSerializer(jacksonSerializer);
        template.setHashValueSerializer(jacksonSerializer);

        template.afterPropertiesSet();
        return template;
    }

    /**
     * 构建JSON序列化器
     */
    protected Jackson2JsonRedisSerializer<?> buildRedisJackson() {
        // JSON序列化
        Jackson2JsonRedisSerializer<?> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // enableDefaultTyping 方法已经过时,使用新的方法activateDefaultTyping
        // objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
        jacksonSerializer.setObjectMapper(om);
        return jacksonSerializer;
    }

    /**
     * 获取Redis集群状态 是单机模式还是集群模式
     */
    public AtomicBoolean getRedisCluster() {
        return redisCluster;
    }
}

可以看到,我们将一些公共的东西提取了出来,并且将每个方法声明为protect, 方便子类重写各自的方法。

3.4 自定义Redis配置

/**
 * 自定义Redis配置文件
 */
@Configuration  // 标识为一个配置项,注入Spring容器
@EnableConfigurationProperties(CustomRedisProperties.class) // 启动Redis配置文件
@ConditionalOnProperty(value = "redis.enable", havingValue = "true")
@EnableCaching
@Slf4j
public class CustomRedisConfig extends AbstractRedisConfig{
    @Resource
    private CustomRedisProperties redisProperties;

    /**
     * 注册RedisConnectionFactory
     */
    @Bean
    @ConditionalOnMissingBean(RedisConnectionFactory.class)
    public RedisConnectionFactory redisConnectionFactory() {
        return getRedisConnectionFactory(redisProperties);
    }

    /**
     * 自定义注册RedisTemplate
     * @param redisConnectionFactory 自定义的Redis连接工厂
     * @return RedisTemplate
     */
    @Bean
    @ConditionalOnMissingBean(RedisTemplate.class)
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        return buildRestTemplate(redisConnectionFactory);
    }

    /**
     * 自定义cacheManager
     */
    @Bean
    @ConditionalOnMissingBean(RedisCacheManager.class)
    public RedisCacheManager cacheManager() {
        // String序列化
        RedisSerializer<String> strSerializer = new StringRedisSerializer();
        // json序列化
        Jackson2JsonRedisSerializer<?> jsonRedisSerializer = buildRedisJackson();

        // set RedisCacheConfiguration
        RedisCacheConfiguration config =
                RedisCacheConfiguration.defaultCacheConfig()
                        // 增加自定义的前缀
                        .computePrefixWith(cacheName -> redisProperties.getKeyPrefix() + ":" + cacheName + ":")
                        // 设置key value序列化器
                        .serializeKeysWith(RedisSerializationContext.SerializationPair
                                .fromSerializer(strSerializer))
                        .serializeValuesWith(RedisSerializationContext.SerializationPair
                                .fromSerializer(jsonRedisSerializer))
                        // 禁用缓存空值
                        .disableCachingNullValues();

        // 创建自定义缓存管理器
        return new CustomRedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory()), config);
    }
}

3.5、自定义CacheManager(核心)

/**
 * 自定义Redis缓存管理器, 增加自定义缓存过期时间
 */
public class CustomRedisCacheManager extends RedisCacheManager {
    public CustomRedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration) {
        super(cacheWriter, defaultCacheConfiguration);
    }

    @Override
    protected RedisCache createRedisCache(String name, RedisCacheConfiguration cacheConfig) {
        // 根据#号分隔
        String[] array = StringUtils.delimitedListToStringArray(name, "#");
        name = array[0];
        if (array.length > 1) { // 解析TTL
            long ttl = Long.parseLong(array[1]);
            cacheConfig = cacheConfig.entryTtl(Duration.ofMinutes(ttl)); // 注意单位我此处用的是分钟,而非毫秒
        }
        return super.createRedisCache(name, cacheConfig);
    }
}

可以看到重写了createRedisCache方法,对缓存的key做了一个处理,将key按照#号分开,后面那一段就是过期时间,然后将过期时间设置到对应的key上。

注意:而我们#号后面可以是数字,其实是不是也可以是配置文件的全限定名称。比如:test.name.expire。这东西可以从配置文件读取,放在数据库,或者nacos都是可以的。

这样子就实现了过期时间的配置化处理。

3.6、Redis配置文件

server:
  port: 8080

redis:
  # Redis开关
  enable: true
  # Redis地址,格式ip:port,ip:port。集群使用逗号分割
  host: 162.14.74.11:6379
  # 密码
  password:
  # 数据库
  database: 0
  # 最大重试次数
  max-redirects: 3
  # 使用统一前缀管理
  key-prefix: itdl

当我们不配置Redis相关属性时,则不会创建相关的Bean对象。只有开启了开关,才会去创建对象。这样子我们就具备了做成一个stater的基础条件了。

4、测试

4.1、编写Service层级测试接口

/**
 * 测试service层次
 */
public interface MyTestService {

    // 用于测试缓存和过期时间是否生效
    TestEntity getById(Long id);

    // 用于测试缓存并发的安全性和过期时间是否生效
    TestEntity getById2(Long id);
}

4.2、编写Service层级测试接口实现类

@Service
@Slf4j
public class MyTestServiceImpl implements MyTestService {

    @Cacheable(value = "test#3", key = "#id")
    public TestEntity getById(Long id){
        TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
        log.info("模拟查询数据库:{}", testEntity);
        return testEntity;
    }

    @Cacheable(value = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存
    public TestEntity getById2(Long id){
        TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
        log.info("模拟查询数据库:{}", testEntity);
        return testEntity;
    }
}

可以看到,我写了两个方法,唯一的区别就是value的值和下面方法多了一个sync = true。

效果我们随后使用测试类来进行从测试便知。

4.3、测试缓存即过期时间

@SpringBootTest
public class TestServiceRunner {

    @Autowired
    private MyTestService myTestService;

    /**
     * 测试两个请求(同参),只有一个请求到达,另一个走缓存
     */
    @Test
    public void testMyTestService(){
        TestEntity t1 = myTestService.getById(1L);
        TestEntity t2 = myTestService.getById(1L);
    }
}  

理论上来说,我们将有一个itdl:test:1这样的key,存入redis, 过期时间为3分钟。并且第二次请求会命中Redis而不会打印模拟查询数据库。

看似已经没有什么问题了,但是我们目前是串行化调用的缓存,如果并发去调用呢?会不会有问题?

4.4、测试缓存即过期时间(并发调用)

为什么要做这样的测试?

因为这样就可以模拟同一个热key(该key经常被访问,随时都有大量的请求),突然失效了,如果存在并发问题,此时缓存将会被击穿(缓存击穿),大量请求直达数据库。这样子可能数据库直接就嗝屁了。

所以我们必然要经过并发测试的。来吧~~~

我们使用一个大小为8的线程池,使用for循环不断的去执行线程,调用同一个方法。

如果模拟数据库操作只打印了多次,我们就认为一定存在线程安全性问题。如果只打印了一次,那就可能不存在,当然只是可能而已。

@SpringBootTest
public class TestServiceRunner2 {

    @Autowired
    private MyTestService myTestService;

    // 创建一个固定线程池
    private ExecutorService executorService = Executors.newFixedThreadPool(8);

    /**
     * 多线程访问请求,测试切面的线程安全性
     */
    @Test
    public void testMultiMyTestService() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            executorService.submit(() -> {
                TestEntity t1 = myTestService.getById(1L);
            });
        }

        // 主线程休息10秒种
        Thread.sleep(10000);
    }

}

此时三分钟早已溜走,之前的缓存已经过期。我们开始测试并发吧。

看样子确实存在并发问题啊,这可咋整呢,别急,山人自有妙计!

4.5、使用sync=true解决并发问题

@Cacheable(cacheNames = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存

时间过得真快,三分钟又悄悄溜走了。我们来测试一下sync的效果

@SpringBootTest
public class TestServiceRunner3 {

    @Autowired
    private MyTestService myTestService;

    // 创建一个固定线程池
    private ExecutorService executorService = Executors.newFixedThreadPool(8);

    /**
     * 多线程访问请求,测试切面的线程安全性(加上同步回填缓存)
     */
    @Test
    public void testMultiSyncMyTestService() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            executorService.submit(() -> {
                TestEntity t1 = myTestService.getById2(1L);
            });
        }

        // 主线程休息10秒种
        Thread.sleep(10000);
    }
}

可以看到,即使我们使用多线程去调用getById2,仍然只有一个线程执行了数据库查询,其余都是走的缓存。

这其实就是在查询数据库和回填缓存时,加了一把锁。在高并发的情况下,我们查询数据库,和存缓存这个操作加锁排队进行。那么第二个请求一定就是从缓存获取数据。

但是如果查询数据库结果为空呢?在瞬时并发的情况下,虽然加了锁查询数据库,但是每次的查询结果都是空,于是数据不会存储到Redis.

这样每次还是会查询数据库,而每次查询都为空值,结果同样是大量的请求是数据库承受的,高并发下,可能会直接压垮数据库,导致雪崩。

我们做个测试,多线程查询一条为空的数据。

4.6、测试结果为空情况

首先,在MyTestServiceImpl的getById2中,添加id为10的数据返回空值

@Cacheable(cacheNames = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存
public TestEntity getById2(Long id){
    if (id == 10){
        log.info("id为10没有查询到数据,返回空值");
        return null;
    }
    TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
    log.info("模拟查询数据库:{}", testEntity);
    return testEntity;
}

然后,使用一个多线程去请求这个为空的数据。

@SpringBootTest
public class TestServiceRunner4 {

    @Autowired
    private MyTestService myTestService;

    // 创建一个固定线程池
    private ExecutorService executorService = Executors.newFixedThreadPool(8);

    /**
     * 多线程访问请求,测试切面的线程安全性(加上同步回填缓存)
     */
    @Test
    public void testMultiSyncMyTestService() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            executorService.submit(() -> {
                TestEntity t1 = myTestService.getById2(10L);
            });
        }

        // 主线程休息10秒种
        Thread.sleep(10000);
    }
}

可以看到,这样的请求在高并发下,很容易就会将数据库给压垮。

结论:高并发情境下,我们务必也要对空值也进行处理。

5、数据库结果为空处理

properties文件增加缓存空值的开关属性

/**
 * 是否缓存空值,默认不缓存
 */
private boolean cacheNullValues = false;

修改RedisCacheManager的Bean实例化过程,只有开关关闭时才将缓存空值禁用。

/**
 * 自定义cacheManager
 */
@Bean
@ConditionalOnMissingBean(RedisCacheManager.class)
public RedisCacheManager cacheManager() {
    // String序列化
    RedisSerializer<String> strSerializer = new StringRedisSerializer();
    // json序列化
    Jackson2JsonRedisSerializer<?> jsonRedisSerializer = buildRedisJackson();

    // set RedisCacheConfiguration
    RedisCacheConfiguration config =
            RedisCacheConfiguration.defaultCacheConfig()
                    // 增加自定义的前缀
                    .computePrefixWith(cacheName -> redisProperties.getKeyPrefix() + ":" + cacheName + ":")
                    // 设置key value序列化器
                    .serializeKeysWith(RedisSerializationContext.SerializationPair
                            .fromSerializer(strSerializer))
                    .serializeValuesWith(RedisSerializationContext.SerializationPair
                            .fromSerializer(jsonRedisSerializer))
                    ;

    if (!redisProperties.isCacheNullValues()){
        // 禁用缓存空值
        config.disableCachingNullValues();
    }

    // 创建自定义缓存管理器
    return new CustomRedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory()), config);
}

在application.yml种,开启对空值的缓存

redis:
  # Redis开关
  enable: true
  # Redis地址,格式ip:port,ip:port。集群使用逗号分割
  host: 162.14.74.11:6379
  # 密码
  password:
  # 数据库
  database: 0
  # 最大重试次数
  max-redirects: 3
  # 使用统一前缀管理
  key-prefix: itdl
  # 缓存空值开关开启
  cache-null-values: true

重新测试

可以看出来,将空值缓存之后,我们的数据库压力又变小了,它又安全了,不会被压垮了。

但是,我们缓存了空值之后,假如我们又新增了一条id为10的数据呢? 我们再次查询居然还是空值,这可咋整呢?

  • 1、我们本身已经设置了缓存过期时间,如果数据允许有延迟,可以等待缓存的空值过期。或者缩短缓存的过期时间
  • 2、新增数据或修改数据后,根据相关的key主动删除相关缓存
  • 3、编写定时任务,定期清空Redis中为SpringCache缓存的空值的数据。或者预留接口去手动删除相关的空值缓存。
  • 4、使用一个巨大的bitmap,提前查询已存在的key,编写一个布隆过滤器,提前把空值数据拦截,不经过缓存,不经过数据库。
  • 5、扩展DefaultRedisCacheWriter重写put方法,收集空值的key列表

7、扩展DefaultRedisCacheWriter重写put方法

我们先来看一下RedisCacheManager的构造方法

private RedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration,
        boolean allowInFlightCacheCreation) {

    Assert.notNull(cacheWriter, "CacheWriter must not be null!");
    Assert.notNull(defaultCacheConfiguration, "DefaultCacheConfiguration must not be null!");

    this.cacheWriter = cacheWriter;
    this.defaultCacheConfig = defaultCacheConfiguration;
    this.initialCacheConfiguration = new LinkedHashMap<>();
    this.allowInFlightCacheCreation = allowInFlightCacheCreation;
}

可以看到,构造函数有一个重要的参数RedisCacheWriter。该接口就是定义了一些写入Redis的操作。

public interface RedisCacheWriter extends CacheStatisticsProvider {
    // 创建一个无锁的RedisCacheWriter
    static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory) {
        return nonLockingRedisCacheWriter(connectionFactory, BatchStrategies.keys());
    }
    // 实际上就是创建了一个DefaultRedisCacheWriter
    static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory,
                                                       BatchStrategy batchStrategy) {

        return new DefaultRedisCacheWriter(connectionFactory, batchStrategy);
    }
}	

可以看到,实际上就是创建了一个DefaultRedisCacheWriter。所以只要我们构建时传入DefaultRedisCacheWriter或者其扩展类,就可以拦截put方法(缓存入库的方法)。

但是DefaultRedisCacheWriter不是一个public的类,无法继承它进行扩展,所以我直接拷贝了一份,然后在put方法内增加了Redis存储的空值NullValue的判断,即可收集空值集合。

public class CustomRedisCacheWriter implements RedisCacheWriter {
    // 其实在Redis中,值为null的数据并不会存储为空字符串或者null, 而是存储的一个NullValue的实例的序列化结果
    private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);
    private final RedisConnectionFactory connectionFactory;
    private final Duration sleepTime;
    private final CacheStatisticsCollector statistics;
    private final BatchStrategy batchStrategy;
    // 引入上下文,方便从上下文获取Bean做收集动作
    private ApplicationContext applicationContext;

    // 构造方法增加ApplicationContext参数
    public CustomRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy, ApplicationContext applicationContext) {
        this(connectionFactory, Duration.ZERO, batchStrategy);
        this.applicationContext = applicationContext;
    }

    // put方法,增加空值判断
    @Override
    public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {

        Assert.notNull(name, "Name must not be null!");
        Assert.notNull(key, "Key must not be null!");
        Assert.notNull(value, "Value must not be null!");

        // 注意:这里就是结果为空值
        if (new String(BINARY_NULL_VALUE, StandardCharsets.UTF_8).equals(new String(value, StandardCharsets.UTF_8))){
            // 存储的数据就是NULLValue, 将空值的key收集起来
            applicationContext.getBean(CacheNullValuesHandle.class).collectNullValueKeys(new String(key, StandardCharsets.UTF_8));
        }
        execute(name, connection -> {

            if (shouldExpireWithin(ttl)) {
                connection.set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
            } else {
                connection.set(key, value);
            }

            return "OK";
        });

        statistics.incPuts(name);
    }
}	

收集空值的处理类

/**
 * Redis存储的空值处理
 */
@ConditionalOnProperty(value = "redis.enable", havingValue = "true")
@Component
@Slf4j
public class CacheNullValuesHandle implements DisposableBean {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    private static final String cacheNullKeySetKey = ":cacheNullKeySetKey";

    @Autowired
    private CustomRedisProperties redisProperties;

    // 收集的空值集合,放在set里,自动去重
    private final Set<String> cacheNullValueKeys = new LinkedHashSet<>();

    public synchronized void collectNullValueKeys(String key){
        // 如果已经存在了,则直接
        if (cacheNullValueKeys.contains(key)){
            return;
        }

        // 空值数量小于最大数量,才存储在内存中
        if (cacheNullValueKeys.size() < redisProperties.getCacheNullValueKeySizeInMemory()){
            cacheNullValueKeys.add(key);
        }else {
            // 超过,直接存储在Redis Set中
            redisTemplate.opsForSet().add(redisProperties.getKeyPrefix() + cacheNullKeySetKey, key);
        }
    }

    public synchronized Set<String> getCacheNullValueKeys(){
        return cacheNullValueKeys;
    }

    @Override
    public void destroy() throws Exception {
        // 销毁时,删除自己对应的空值key
        try {
            log.info("清空空值key列表:{}", String.join(",", cacheNullValueKeys));
            // 销毁的时候,那些空值key, 防止过多的空值key占用空间
            redisTemplate.delete(cacheNullValueKeys);
            // 删除空值key列表集合key
            redisTemplate.delete(redisProperties.getKeyPrefix() + cacheNullKeySetKey);
            cacheNullValueKeys.clear();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

我们将空值key收集起来了,只要服务不挂,那么全量的空值key列表就会存放在Redis Set集合中。如果空值key的缓存时间比较长的话,可以读取集合,然后遍历集合删除即可。通过页面对这些空值key进行管理,或者通过定时任务进行删除。

8、总结

我们给@Cacheable添加了自定义的缓存过期时间设置。可以分散设置从而防止缓存集中时效。

又发现@Cacheable缓存数据库存在线程安全问题,在多线程的场景同一个key将会多次访问到数据库,从而让数据库压力剧增。

然后使用sync=true,加锁排队查询数据库和回填缓存,从而解决并发问题。

随后又测试了查询数据结果为空,每次都会去查询数据库。

而空值的数据通常就是一些非法的参数,查询不到结果。可能是黑客模拟的参数,这些参数缓存在Redis,是极为正确的选择。

如果是我们后续新增或者修改数据的情况,主动删除对应的空值缓存即可。这样子数据就是实时的了。

如果说有些毫无意义的空值存储在Redis, 并且过期时间又比较长,就可以使用定时任务,定时去删除空值。或者使用缓存key管理,手动删除那些对应的key。



发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言