由于cluster模式下,key的存放需要先使用JedisClusterCRC16计算出solt,在定位solt所在的节点,所以直接使用pipeline会报错
Pipeline is currently not supported for JedisClusterConnection.
思路
1.将需要操作的key计算出对应的solt,得到hostAndPort,分组存放在一个map中。(Map<JedisPool, List<String>> node2keys = new HashMap<>())
2.通过得到的JedisPool,分开使用jedis的pipeline执行
3.将返回数据组装返回
code
1.先得到一个JedisCluster
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import java.util.HashSet;
import java.util.Set;
@Configuration
public class JedisClusterConfig {
@Value("${spring.redis.cluster.nodes}")
private String nodes;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.pool.max-wait}")
private long maxWait;
@Bean("jedisCluster")
public JedisCluster jedisCluster() {
String[] redisNodes = nodes.split(",");
Set<HostAndPort> nodes = new HashSet<>();
for (String node : redisNodes) {
String[] ipPortPair = node.split(":");
nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
}
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(maxIdle);
config.setMaxWaitMillis(maxWait);
JedisCluster cluster = new JedisCluster(nodes, 8000, 5000, 3, password, config);
return cluster;
}
}
2.实现key的分组
import com.sqyc.carnet.util.ChineseUtil;
import com.sqyc.carnet.util.Constants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;
import java.util.*;
/**
* 基于JedisCluster实现管道的使用
* 核心对象:JedisClusterInfoCache和JedisSlotBasedConnectionHandler
*/
@Slf4j
public class JedisClusterPipeline {
/**
* Redis集群缓存信息对象 Jedis提供
*/
private JedisClusterInfoCache clusterInfoCache;
/**
* Redis链接处理对象 继承于JedisClusterConnectionHandler,对其提供友好的调用方法 Jedis提供
*/
private JedisSlotBasedConnectionHandler connectionHandler;
/**
* Redis集群操作对象 Jedis提供
*/
private JedisCluster jedisCluster;
/**
* 构造方法
* 通过JedisCluster获取JedisClusterInfoCache和JedisSlotBasedConnectionHandler
*
* @param jedisCluster
*/
public JedisClusterPipeline(JedisCluster jedisCluster) {
this.jedisCluster = jedisCluster;
MetaObject metaObject = SystemMetaObject.forObject(jedisCluster);
clusterInfoCache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
connectionHandler = (JedisSlotBasedConnectionHandler) metaObject.getValue("connectionHandler");
}
/**
* 批量get key
*
* @param keys
*/
public Map<String, Object> clusterPipelineGet(List<String> keys) {
//要返回的结果
Map<String, Object> result = new HashMap<>();
//节点对应keys分组
Map<JedisPool, List<String>> node2keys = new HashMap<>();
for (String key : keys) {
// 计算key对应的slot
int slot = JedisClusterCRC16.getSlot(key);
// 根据slot获取对应的节点信息,将同一节点的key收在一组
JedisPool jedisPool = this.clusterInfoCache.getSlotPool(slot);
if (null == jedisPool) {
/** 刷新缓存的SlotPool */
this.connectionHandler.renewSlotCache();
jedisPool = this.clusterInfoCache.getSlotPool(slot);
if (jedisPool == null) {
log.error("clusterPipelineGet , No reachable node in cluster for slot---{}", slot);
continue;
}
}
if (node2keys.containsKey(jedisPool)) {
node2keys.get(jedisPool).add(key);
} else {
List<String> list = new ArrayList<>();
list.add(key);
node2keys.put(jedisPool, list);
}
}
if (node2keys.isEmpty()) {
log.error("clusterPipelineGet , node2keys is empty");
return result;
}
// 分组执行
for (Map.Entry<JedisPool, List<String>> group : node2keys.entrySet()) {
Jedis jedis = group.getKey().getResource();
Pipeline pipeline = jedis.pipelined();
// 执行本组keys
List<String> values = group.getValue();
for (String carNo : values) {
pipeline.get(carNo);
}
List<Object> pipelineReturns = pipeline.syncAndReturnAll();
//组装redis返回结果,以免请求的key和返回的values顺序错乱
for (int i = 0; i < values.size(); i++) {
result.put(values.get(i), pipelineReturns.get(i));
}
jedis.close();
}
return result;
}
}
调用示例
@Override
public String queryMobileCarStatusBatch(List<String> carNos) {
JedisClusterPipeline jedisClusterPipeline = new JedisClusterPipeline(jedisCluster);
try {
Map<String, Object> result = jedisClusterPipeline.clusterPipelineGet(carNos);
return createReturnValue(RequsetReturnValueUtils.REQUSET_RETURN_CODE_NORMAL, "成功", result);
} catch (Throwable t){
log.error("批量查询统计失败,carNos= {},error=={}",carNos,t.getMessage());
}
return createReturnValue(RequsetReturnValueUtils.REQUSET_RETURN_CODE_FAIL, "失败", "");
}