1. 简介
Redis Cluster(Redis集群模式)会将key经过hash后分片存储到不同的节点。每个Redis节点只能操作本节点下的key。故Redis Cluster不支持mget等的批量操作。
Redis Cluster
不支持mget等命令,是因为一个命令含有多个key,而多个key可能在不同的节点,所以Redis Cluster是直接不支持的。
Jedis客户端访问Redis Cluster
时,已经计算出key所在的slot(hash槽),并根据(slot-JedisPool)的缓存可以获取到对应节点的Connection对象。使用对应节点的Connection对象去对应节点查询数据。详细请见——JedisCluster源码分析
但是一个节点上支持发送多条Redis命令(即单节点支持pipeline操作)。
而SpringBoot下若是使用Jedis
客户端,没有实现对Redis Cluster的pipeline操作。
而SpringBoot2.0以后的版本,使用的是lettuce
客户端,Spring实现了对Redis Cluster的pipeline操作。
2. SpringBoot的使用
引入依赖,SpringBoot2.x版本2.1.3
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
使用方式:
@RestController
@Slf4j
public class TestPipelineRedisController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/redis")
public String redis1() {
//得到的就是Redis返回的有序的结果,Spring已经帮助序列化。
List<Object> originalDatas = stringRedisTemplate.executePipelined(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.set("HH:AA".getBytes(),"我是中国人".getBytes());
connection.get("HH:123".getBytes());
connection.hGetAll("HH:HASH:cc".getBytes());
return null;
}
});
return JSON.toJSONString(originalDatas);
}
}
关注事项:
-
doInRedis()
方法必须返回null,要不会出现异常; -
doInRedis()
方法中,命令并不会被执行,故也不会返回结果; -
originalDatas
的结果已经按照stringRedisTemplate
配置的方式进行对key和value进行反序列化; -
originalDatas
的结果顺序和命令是一致的,若命令出现异常,那么List<Object>对应节点存储的是异常对象,不会出现错位的情况。
SpringBoot的源码实现
public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>, BeanClassLoaderAware {
@Override
public List<Object> executePipelined(RedisCallback<?> action) {
return executePipelined(action, valueSerializer);
}
/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.RedisOperations#executePipelined(org.springframework.data.redis.core.RedisCallback, org.springframework.data.redis.serializer.RedisSerializer)
*/
@Override
public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer) {
return execute((RedisCallback<List<Object>>) connection -> {
//将pipeline对象放入到connection对象中
connection.openPipeline();
boolean pipelinedClosed = false;
try {
//用户代码,使用pipeline进行操作,只是将命令保存到buf缓存中
Object result = action.doInRedis(connection);
//【注意】此处就是若用户不返回null,抛出异常的原因
if (result != null) {
throw new InvalidDataAccessApiUsageException(
"Callback cannot return a non-null value as it gets overwritten by the pipeline");
}
//和Redis进行socket通信发送命令获取结果【此时才会得到Redis的结果】
List<Object> closePipeline = connection.closePipeline();
pipelinedClosed = true;
//对返回的结果按照RedisTemple配置的方式进行反序列化
return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {
if (!pipelinedClosed) {
connection.closePipeline();
}
}
});
}
}
注意:代码使用JedisConnection进行的pipeline操作。Jedis对pipeline操作的原理以及源码分析可见:
Jedis源码分析
JedisCluster源码分析