1、问题描述
我们在使用SpringCache的@Cacheable注解时,发现并没有设置过期时间这个功能。
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Cacheable {
// cacheNames的别名。与cacheNames二选一即可
@AliasFor("cacheNames")
String[] value() default {};
// 也就是我们存储到Redis的key的前缀部分。比如user:, 后面部分来源于参数
@AliasFor("value")
String[] cacheNames() default {};
// 同一个缓存名称的不同参数,key是显式指定。如#id, 表示去参数种的id字段。支持SpEL表达式
String key() default "";
// 同一个缓存名称的不同参数,keyGenerator是因为无法直接取到参数,参数需要经过一系列较为复杂的处理才能获得。通过KeyGenerator生成
String keyGenerator() default "";
// 指定缓存管理器,通常不会指定,使用默认的即可
String cacheManager() default "";
// 指定缓存解析器
String cacheResolver() default "";
// 存入缓存的条件,支持SpEL表达式,结果为true才会存入缓存
String condition() default "";
// 不存入缓存的条件,支持SpEL表达式,结果为true则不会存入缓存
String unless() default "";
// 是否同步回填缓存,并发访问@Cacheable时,因为线程安全问题,缓存还没来得及写入Redis, 就已经开始新的访问了,从而导致数据库被N次访问。
boolean sync() default false;
}
由上面的接口描述可以知道,我们没有设置缓存过期时间的地方。
而我们的缓存基本都是存在过期时间的,否则Redis的数据会越堆越多,并且没有释放,而最终的结果将会是Redis被撑爆。
因此,我们需要让SpringCache支持过期时间的设置,我们可以把这个过期时间放在cacheNames的值后面,并且通过#号设置, 形如下面这种写法
@Cacheable(cacheNames = "test2#3", key = "#id")
public TestEntity getById2(Long id){
TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
log.info("模拟查询数据库:{}", testEntity);
return testEntity;
}
我们将过期时间跟在value这个key的后面,通过#号分割,这样可以方便我们将过期时间分散设置。
2、为什么要将缓存过期时间分散设置?
其实为什么要将缓存过期时间分散设置,就是因为缓存过期时间设置相同存在一个巨大的问题:“缓存集中失效,导致缓存雪崩”。
因为大量的缓存key在同一时间失效,导致大量的请求直接穿透缓存(缓存穿透),命中到数据库。
从而把数据库瞬间压垮,甚至导致服务宕机,从而导致服务雪崩。所以将缓存时间分散设置能够非常有效的避免缓存雪崩。
当然,缓存雪崩的解决方案还有另外一种加锁回填缓存方式,我们会在后面提到并给出实际操作代码,但两者一起用丝毫不会冲突。
3、扩展SpringCache使其支持过期时间设置
我们不仅仅要让SpringCache支持分散时间,后续还会将其封装为一个spring-boot-stater,成为一个公共的模块,让你在以后使用时能够直接复用。
3.1、思路说明
主要就是要重写CacheManager的逻辑,并且重写CacheManager的createRedisCache方法,将@Cacheble的cacheNames或value的值根据#号进行拆分。
然后使用RedisCacheConfiguration的entryTtl设置过期时间即可。
但是我们为了后面封装为stater作为基础,我们这里会自定义RedisConnectionFactory,RedisTemplate和CacheManager.
3.2 自定义Redis属性配置文件
/**
* 自定义Redis属性文件
*/
@ConfigurationProperties(prefix = "redis", ignoreInvalidFields = true) // prefix:属性配置前缀, ignoreInvalidFields: 忽略字段的校验
@Data // 简化Setter和Getter
public class CustomRedisProperties {
/**
* 是否开启Redis的配置
*/
private boolean enable = false;
/**
* Redis连接地址,IP:PORT,IP:PORT
*/
private String host = "";
/**
* Redis连接密码
*/
private String password = "";
/**
* 如果是单机版,可以选择哪个数据库
*/
private Integer database = 0;
/**
* 最大重定向次数,可选配置
*/
private Integer maxRedirects = null;
/**
* SpringCache前缀, 方便key管理
*/
private String keyPrefix = "myrds";
/**jedis连接池配置*/
private CustomRedisPoolConfig<?> pool = new CustomRedisPoolConfig<>();
@Data
@EqualsAndHashCode(callSuper = false)
public static class CustomRedisPoolConfig<T> extends GenericObjectPoolConfig<T> {
// 定义扩展属性
}
}
我们使用自定义的Redis属性配置文件,同时支持单机和集群两种模式,并且延用了原来的连接池配置。
3.3 自定义Redis配置抽象类
抽象类定义了一些RedisConnectionFactory,RedisTemplate等构建对象的构建方法。
/**
* Redis配置抽象类
*/
@Slf4j
public abstract class AbstractRedisConfig {
/**
* Redis是否是集群的标识
*/
protected AtomicBoolean redisCluster = new AtomicBoolean(false);
/**
* 根据Redis属性文件构造RedisConnectionFactory
* @param redisProperties Redis属性文件
* @return RedisConnectionFactory
*/
protected RedisConnectionFactory getRedisConnectionFactory(CustomRedisProperties redisProperties) {
if (StringUtils.isBlank(redisProperties.getHost())){
throw new RuntimeException("redis host is not null");
}
// 根据逗号切割host列表
Set<String> hosts = org.springframework.util.StringUtils.commaDelimitedListToSet(redisProperties.getHost());
if (CollectionUtils.isEmpty(hosts)){
throw new RuntimeException("redis host address cannot be empty");
}
// 只有一个host, 表示是单机host
if (hosts.size() == 1){
String hostPort = hosts.stream().findFirst().get();
String[] hostArr = hostStr2Arr(hostPort);
return getSingleConnectionFactory(redisProperties, hostArr);
}
// 集群处理
RedisClusterConfiguration configuration = new RedisClusterConfiguration();
List<RedisNode> listNodes = new ArrayList<>();
for (String host : hosts) {
String[] split = hostStr2Arr(host);
RedisNode redisNode = new RedisClusterNode(split[0], Integer.parseInt(split[1]));
listNodes.add(redisNode);
}
return getClusterConnectionFactory(redisProperties, configuration, listNodes);
}
/**
* 构造单机版Redis连接工厂
* @param redisProperties Redis属性文件
* @param hostArr 连接地址列表,单机传一个
* @return ConnectionFactory
*/
protected LettuceConnectionFactory getSingleConnectionFactory(CustomRedisProperties redisProperties, String[] hostArr) {
redisCluster.set(false);
// 构造单机版Redis连接工厂
RedisStandaloneConfiguration singleConf = new RedisStandaloneConfiguration();
singleConf.setHostName(hostArr[0]);
singleConf.setPassword(RedisPassword.of(redisProperties.getPassword()));
singleConf.setPort(Integer.parseInt(hostArr[1]));
singleConf.setDatabase(redisProperties.getDatabase());
// 创建连接池
final LettucePoolingClientConfiguration configuration = LettucePoolingClientConfiguration.builder().poolConfig(redisProperties.getPool()).build();
LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(singleConf, configuration);
log.info("==============创建单机版Redis连接工厂成功==================");
log.info("=============={}==================", redisProperties.getHost());
return connectionFactory;
}
/**
* 构造集群版Redis连接工厂
* @param redisProperties Redis属性文件
* @param listNodes 集群节点列表
* @return ConnectionFactory
*/
protected LettuceConnectionFactory getClusterConnectionFactory(CustomRedisProperties redisProperties, RedisClusterConfiguration configuration, List<RedisNode> listNodes) {
redisCluster.set(true);
// 设置最大重定向次数
configuration.setMaxRedirects(redisProperties.getMaxRedirects());
configuration.setClusterNodes(listNodes);
// 设置密码
configuration.setPassword(RedisPassword.of(redisProperties.getPassword()));
// 构造集群版Redis连接工厂
final LettucePoolingClientConfiguration lettucePoolingClientConfiguration =
LettucePoolingClientConfiguration.builder().poolConfig(redisProperties.getPool()).build();
LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration, lettucePoolingClientConfiguration);
log.info("==============创建集群版Redis连接工厂成功==================");
log.info("=============={}==================", redisProperties.getHost());
return connectionFactory;
}
/**
* host字符串转换为数组
* @param hostPort Redis连接地址和端口
* @return hostPort数组
*/
protected String[] hostStr2Arr(String hostPort) {
String[] hostArr = hostPort.split(":");
if (hostArr.length != 2) {
throw new RuntimeException("host or port err");
}
return hostArr;
}
protected RedisTemplate<String, Object> buildRestTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 设置连接工厂
template.setConnectionFactory(redisConnectionFactory);
// String序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer<?> jacksonSerializer = buildRedisJackson();
// key and hashKey 序列化方式
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
// value serializer
template.setDefaultSerializer(jacksonSerializer);
template.setValueSerializer(jacksonSerializer);
template.setHashValueSerializer(jacksonSerializer);
template.afterPropertiesSet();
return template;
}
/**
* 构建JSON序列化器
*/
protected Jackson2JsonRedisSerializer<?> buildRedisJackson() {
// JSON序列化
Jackson2JsonRedisSerializer<?> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// enableDefaultTyping 方法已经过时,使用新的方法activateDefaultTyping
// objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
jacksonSerializer.setObjectMapper(om);
return jacksonSerializer;
}
/**
* 获取Redis集群状态 是单机模式还是集群模式
*/
public AtomicBoolean getRedisCluster() {
return redisCluster;
}
}
可以看到,我们将一些公共的东西提取了出来,并且将每个方法声明为protect, 方便子类重写各自的方法。
3.4 自定义Redis配置
/**
* 自定义Redis配置文件
*/
@Configuration // 标识为一个配置项,注入Spring容器
@EnableConfigurationProperties(CustomRedisProperties.class) // 启动Redis配置文件
@ConditionalOnProperty(value = "redis.enable", havingValue = "true")
@EnableCaching
@Slf4j
public class CustomRedisConfig extends AbstractRedisConfig{
@Resource
private CustomRedisProperties redisProperties;
/**
* 注册RedisConnectionFactory
*/
@Bean
@ConditionalOnMissingBean(RedisConnectionFactory.class)
public RedisConnectionFactory redisConnectionFactory() {
return getRedisConnectionFactory(redisProperties);
}
/**
* 自定义注册RedisTemplate
* @param redisConnectionFactory 自定义的Redis连接工厂
* @return RedisTemplate
*/
@Bean
@ConditionalOnMissingBean(RedisTemplate.class)
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
return buildRestTemplate(redisConnectionFactory);
}
/**
* 自定义cacheManager
*/
@Bean
@ConditionalOnMissingBean(RedisCacheManager.class)
public RedisCacheManager cacheManager() {
// String序列化
RedisSerializer<String> strSerializer = new StringRedisSerializer();
// json序列化
Jackson2JsonRedisSerializer<?> jsonRedisSerializer = buildRedisJackson();
// set RedisCacheConfiguration
RedisCacheConfiguration config =
RedisCacheConfiguration.defaultCacheConfig()
// 增加自定义的前缀
.computePrefixWith(cacheName -> redisProperties.getKeyPrefix() + ":" + cacheName + ":")
// 设置key value序列化器
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(strSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(jsonRedisSerializer))
// 禁用缓存空值
.disableCachingNullValues();
// 创建自定义缓存管理器
return new CustomRedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory()), config);
}
}
3.5、自定义CacheManager(核心)
/**
* 自定义Redis缓存管理器, 增加自定义缓存过期时间
*/
public class CustomRedisCacheManager extends RedisCacheManager {
public CustomRedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration) {
super(cacheWriter, defaultCacheConfiguration);
}
@Override
protected RedisCache createRedisCache(String name, RedisCacheConfiguration cacheConfig) {
// 根据#号分隔
String[] array = StringUtils.delimitedListToStringArray(name, "#");
name = array[0];
if (array.length > 1) { // 解析TTL
long ttl = Long.parseLong(array[1]);
cacheConfig = cacheConfig.entryTtl(Duration.ofMinutes(ttl)); // 注意单位我此处用的是分钟,而非毫秒
}
return super.createRedisCache(name, cacheConfig);
}
}
可以看到重写了createRedisCache方法,对缓存的key做了一个处理,将key按照#号分开,后面那一段就是过期时间,然后将过期时间设置到对应的key上。
注意:而我们#号后面可以是数字,其实是不是也可以是配置文件的全限定名称。比如:test.name.expire。这东西可以从配置文件读取,放在数据库,或者nacos都是可以的。
这样子就实现了过期时间的配置化处理。
3.6、Redis配置文件
server:
port: 8080
redis:
# Redis开关
enable: true
# Redis地址,格式ip:port,ip:port。集群使用逗号分割
host: 162.14.74.11:6379
# 密码
password:
# 数据库
database: 0
# 最大重试次数
max-redirects: 3
# 使用统一前缀管理
key-prefix: itdl
当我们不配置Redis相关属性时,则不会创建相关的Bean对象。只有开启了开关,才会去创建对象。这样子我们就具备了做成一个stater的基础条件了。
4、测试
4.1、编写Service层级测试接口
/**
* 测试service层次
*/
public interface MyTestService {
// 用于测试缓存和过期时间是否生效
TestEntity getById(Long id);
// 用于测试缓存并发的安全性和过期时间是否生效
TestEntity getById2(Long id);
}
4.2、编写Service层级测试接口实现类
@Service
@Slf4j
public class MyTestServiceImpl implements MyTestService {
@Cacheable(value = "test#3", key = "#id")
public TestEntity getById(Long id){
TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
log.info("模拟查询数据库:{}", testEntity);
return testEntity;
}
@Cacheable(value = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存
public TestEntity getById2(Long id){
TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
log.info("模拟查询数据库:{}", testEntity);
return testEntity;
}
}
可以看到,我写了两个方法,唯一的区别就是value的值和下面方法多了一个sync = true。
效果我们随后使用测试类来进行从测试便知。
4.3、测试缓存即过期时间
@SpringBootTest
public class TestServiceRunner {
@Autowired
private MyTestService myTestService;
/**
* 测试两个请求(同参),只有一个请求到达,另一个走缓存
*/
@Test
public void testMyTestService(){
TestEntity t1 = myTestService.getById(1L);
TestEntity t2 = myTestService.getById(1L);
}
}
理论上来说,我们将有一个itdl:test:1这样的key,存入redis, 过期时间为3分钟。并且第二次请求会命中Redis而不会打印模拟查询数据库。
看似已经没有什么问题了,但是我们目前是串行化调用的缓存,如果并发去调用呢?会不会有问题?
4.4、测试缓存即过期时间(并发调用)
为什么要做这样的测试?
因为这样就可以模拟同一个热key(该key经常被访问,随时都有大量的请求),突然失效了,如果存在并发问题,此时缓存将会被击穿(缓存击穿),大量请求直达数据库。这样子可能数据库直接就嗝屁了。
所以我们必然要经过并发测试的。来吧~~~
我们使用一个大小为8的线程池,使用for循环不断的去执行线程,调用同一个方法。
如果模拟数据库操作只打印了多次,我们就认为一定存在线程安全性问题。如果只打印了一次,那就可能不存在,当然只是可能而已。
@SpringBootTest
public class TestServiceRunner2 {
@Autowired
private MyTestService myTestService;
// 创建一个固定线程池
private ExecutorService executorService = Executors.newFixedThreadPool(8);
/**
* 多线程访问请求,测试切面的线程安全性
*/
@Test
public void testMultiMyTestService() throws InterruptedException {
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
TestEntity t1 = myTestService.getById(1L);
});
}
// 主线程休息10秒种
Thread.sleep(10000);
}
}
此时三分钟早已溜走,之前的缓存已经过期。我们开始测试并发吧。
看样子确实存在并发问题啊,这可咋整呢,别急,山人自有妙计!
4.5、使用sync=true解决并发问题
@Cacheable(cacheNames = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存
时间过得真快,三分钟又悄悄溜走了。我们来测试一下sync的效果
@SpringBootTest
public class TestServiceRunner3 {
@Autowired
private MyTestService myTestService;
// 创建一个固定线程池
private ExecutorService executorService = Executors.newFixedThreadPool(8);
/**
* 多线程访问请求,测试切面的线程安全性(加上同步回填缓存)
*/
@Test
public void testMultiSyncMyTestService() throws InterruptedException {
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
TestEntity t1 = myTestService.getById2(1L);
});
}
// 主线程休息10秒种
Thread.sleep(10000);
}
}
可以看到,即使我们使用多线程去调用getById2,仍然只有一个线程执行了数据库查询,其余都是走的缓存。
这其实就是在查询数据库和回填缓存时,加了一把锁。在高并发的情况下,我们查询数据库,和存缓存这个操作加锁排队进行。那么第二个请求一定就是从缓存获取数据。
但是如果查询数据库结果为空呢?在瞬时并发的情况下,虽然加了锁查询数据库,但是每次的查询结果都是空,于是数据不会存储到Redis.
这样每次还是会查询数据库,而每次查询都为空值,结果同样是大量的请求是数据库承受的,高并发下,可能会直接压垮数据库,导致雪崩。
我们做个测试,多线程查询一条为空的数据。
4.6、测试结果为空情况
首先,在MyTestServiceImpl的getById2中,添加id为10的数据返回空值
@Cacheable(cacheNames = "test2#3", key = "#id", sync = true) // sync表示同步回填缓存
public TestEntity getById2(Long id){
if (id == 10){
log.info("id为10没有查询到数据,返回空值");
return null;
}
TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
log.info("模拟查询数据库:{}", testEntity);
return testEntity;
}
然后,使用一个多线程去请求这个为空的数据。
@SpringBootTest
public class TestServiceRunner4 {
@Autowired
private MyTestService myTestService;
// 创建一个固定线程池
private ExecutorService executorService = Executors.newFixedThreadPool(8);
/**
* 多线程访问请求,测试切面的线程安全性(加上同步回填缓存)
*/
@Test
public void testMultiSyncMyTestService() throws InterruptedException {
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
TestEntity t1 = myTestService.getById2(10L);
});
}
// 主线程休息10秒种
Thread.sleep(10000);
}
}
可以看到,这样的请求在高并发下,很容易就会将数据库给压垮。
结论:高并发情境下,我们务必也要对空值也进行处理。
5、数据库结果为空处理
properties文件增加缓存空值的开关属性
/**
* 是否缓存空值,默认不缓存
*/
private boolean cacheNullValues = false;
修改RedisCacheManager的Bean实例化过程,只有开关关闭时才将缓存空值禁用。
/**
* 自定义cacheManager
*/
@Bean
@ConditionalOnMissingBean(RedisCacheManager.class)
public RedisCacheManager cacheManager() {
// String序列化
RedisSerializer<String> strSerializer = new StringRedisSerializer();
// json序列化
Jackson2JsonRedisSerializer<?> jsonRedisSerializer = buildRedisJackson();
// set RedisCacheConfiguration
RedisCacheConfiguration config =
RedisCacheConfiguration.defaultCacheConfig()
// 增加自定义的前缀
.computePrefixWith(cacheName -> redisProperties.getKeyPrefix() + ":" + cacheName + ":")
// 设置key value序列化器
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(strSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(jsonRedisSerializer))
;
if (!redisProperties.isCacheNullValues()){
// 禁用缓存空值
config.disableCachingNullValues();
}
// 创建自定义缓存管理器
return new CustomRedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory()), config);
}
在application.yml种,开启对空值的缓存
redis:
# Redis开关
enable: true
# Redis地址,格式ip:port,ip:port。集群使用逗号分割
host: 162.14.74.11:6379
# 密码
password:
# 数据库
database: 0
# 最大重试次数
max-redirects: 3
# 使用统一前缀管理
key-prefix: itdl
# 缓存空值开关开启
cache-null-values: true
重新测试
可以看出来,将空值缓存之后,我们的数据库压力又变小了,它又安全了,不会被压垮了。
但是,我们缓存了空值之后,假如我们又新增了一条id为10的数据呢? 我们再次查询居然还是空值,这可咋整呢?
- 1、我们本身已经设置了缓存过期时间,如果数据允许有延迟,可以等待缓存的空值过期。或者缩短缓存的过期时间。
- 2、新增数据或修改数据后,根据相关的key主动删除相关缓存
- 3、编写定时任务,定期清空Redis中为SpringCache缓存的空值的数据。或者预留接口去手动删除相关的空值缓存。
- 4、使用一个巨大的bitmap,提前查询已存在的key,编写一个布隆过滤器,提前把空值数据拦截,不经过缓存,不经过数据库。
- 5、扩展DefaultRedisCacheWriter重写put方法,收集空值的key列表
7、扩展DefaultRedisCacheWriter重写put方法
我们先来看一下RedisCacheManager的构造方法
private RedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration,
boolean allowInFlightCacheCreation) {
Assert.notNull(cacheWriter, "CacheWriter must not be null!");
Assert.notNull(defaultCacheConfiguration, "DefaultCacheConfiguration must not be null!");
this.cacheWriter = cacheWriter;
this.defaultCacheConfig = defaultCacheConfiguration;
this.initialCacheConfiguration = new LinkedHashMap<>();
this.allowInFlightCacheCreation = allowInFlightCacheCreation;
}
可以看到,构造函数有一个重要的参数RedisCacheWriter。该接口就是定义了一些写入Redis的操作。
public interface RedisCacheWriter extends CacheStatisticsProvider {
// 创建一个无锁的RedisCacheWriter
static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory) {
return nonLockingRedisCacheWriter(connectionFactory, BatchStrategies.keys());
}
// 实际上就是创建了一个DefaultRedisCacheWriter
static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory,
BatchStrategy batchStrategy) {
return new DefaultRedisCacheWriter(connectionFactory, batchStrategy);
}
}
可以看到,实际上就是创建了一个DefaultRedisCacheWriter。所以只要我们构建时传入DefaultRedisCacheWriter或者其扩展类,就可以拦截put方法(缓存入库的方法)。
但是DefaultRedisCacheWriter不是一个public的类,无法继承它进行扩展,所以我直接拷贝了一份,然后在put方法内增加了Redis存储的空值NullValue的判断,即可收集空值集合。
public class CustomRedisCacheWriter implements RedisCacheWriter {
// 其实在Redis中,值为null的数据并不会存储为空字符串或者null, 而是存储的一个NullValue的实例的序列化结果
private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);
private final RedisConnectionFactory connectionFactory;
private final Duration sleepTime;
private final CacheStatisticsCollector statistics;
private final BatchStrategy batchStrategy;
// 引入上下文,方便从上下文获取Bean做收集动作
private ApplicationContext applicationContext;
// 构造方法增加ApplicationContext参数
public CustomRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy, ApplicationContext applicationContext) {
this(connectionFactory, Duration.ZERO, batchStrategy);
this.applicationContext = applicationContext;
}
// put方法,增加空值判断
@Override
public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
Assert.notNull(name, "Name must not be null!");
Assert.notNull(key, "Key must not be null!");
Assert.notNull(value, "Value must not be null!");
// 注意:这里就是结果为空值
if (new String(BINARY_NULL_VALUE, StandardCharsets.UTF_8).equals(new String(value, StandardCharsets.UTF_8))){
// 存储的数据就是NULLValue, 将空值的key收集起来
applicationContext.getBean(CacheNullValuesHandle.class).collectNullValueKeys(new String(key, StandardCharsets.UTF_8));
}
execute(name, connection -> {
if (shouldExpireWithin(ttl)) {
connection.set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
} else {
connection.set(key, value);
}
return "OK";
});
statistics.incPuts(name);
}
}
收集空值的处理类
/**
* Redis存储的空值处理
*/
@ConditionalOnProperty(value = "redis.enable", havingValue = "true")
@Component
@Slf4j
public class CacheNullValuesHandle implements DisposableBean {
@Resource
private RedisTemplate<String, Object> redisTemplate;
private static final String cacheNullKeySetKey = ":cacheNullKeySetKey";
@Autowired
private CustomRedisProperties redisProperties;
// 收集的空值集合,放在set里,自动去重
private final Set<String> cacheNullValueKeys = new LinkedHashSet<>();
public synchronized void collectNullValueKeys(String key){
// 如果已经存在了,则直接
if (cacheNullValueKeys.contains(key)){
return;
}
// 空值数量小于最大数量,才存储在内存中
if (cacheNullValueKeys.size() < redisProperties.getCacheNullValueKeySizeInMemory()){
cacheNullValueKeys.add(key);
}else {
// 超过,直接存储在Redis Set中
redisTemplate.opsForSet().add(redisProperties.getKeyPrefix() + cacheNullKeySetKey, key);
}
}
public synchronized Set<String> getCacheNullValueKeys(){
return cacheNullValueKeys;
}
@Override
public void destroy() throws Exception {
// 销毁时,删除自己对应的空值key
try {
log.info("清空空值key列表:{}", String.join(",", cacheNullValueKeys));
// 销毁的时候,那些空值key, 防止过多的空值key占用空间
redisTemplate.delete(cacheNullValueKeys);
// 删除空值key列表集合key
redisTemplate.delete(redisProperties.getKeyPrefix() + cacheNullKeySetKey);
cacheNullValueKeys.clear();
} catch (Exception e) {
e.printStackTrace();
}
}
}
我们将空值key收集起来了,只要服务不挂,那么全量的空值key列表就会存放在Redis Set集合中。如果空值key的缓存时间比较长的话,可以读取集合,然后遍历集合删除即可。通过页面对这些空值key进行管理,或者通过定时任务进行删除。
8、总结
我们给@Cacheable添加了自定义的缓存过期时间设置。可以分散设置从而防止缓存集中时效。
又发现@Cacheable缓存数据库存在线程安全问题,在多线程的场景同一个key将会多次访问到数据库,从而让数据库压力剧增。
然后使用sync=true,加锁排队查询数据库和回填缓存,从而解决并发问题。
随后又测试了查询数据结果为空,每次都会去查询数据库。
而空值的数据通常就是一些非法的参数,查询不到结果。可能是黑客模拟的参数,这些参数缓存在Redis,是极为正确的选择。
如果是我们后续新增或者修改数据的情况,主动删除对应的空值缓存即可。这样子数据就是实时的了。
如果说有些毫无意义的空值存储在Redis, 并且过期时间又比较长,就可以使用定时任务,定时去删除空值。或者使用缓存key管理,手动删除那些对应的key。