按照上步搭建好的redis哨兵集群配合Jedis客户端进行Springboot整合
1、配置
# redis 主从哨兵配置
spring:
redis:
database: 0
password:
pool:
max-active: 8
max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 8
min-idle: 0
timeout: 10000
# 主节点哨兵名
sentinel:
master: mymaster
# comma-separated list of host:port pairs 哨兵节点配置列表
nodes: 127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381
2、新建redis 配置类
package com.example.demo.redis;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* redis 配置类
* @author fansongsong
* @since 2020/09/10
*/
@Configuration
@EnableCaching
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
3、编写redis服务类
简单列举get/set方法
/**
* redis 服务类
* @author fansongsong
* @since 2020/09/10
*/
@Component
@Slf4j
public class RedisService {
@Resource
private RedisTemplate<Object, Object> redisTemplate;
/**
* 普通缓存获取
*
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
*
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
log.error("exception when set key {}. ", key, e);
return false;
}
}
}
3、验证数据同步
编写测试类方法
执行方法
执行成功查看其它从服数据是否同步(目前主服务:6379,从1服务: 6380,从2服务: 6381)
数据同步成功!
4、Redis客户端是如何进行主从切换的
上述配置只指定了哨兵节点的地址与master的名称 = mymaster,但Redis客户端最终访问操作的是master节点,那么Redis客户端是如何获取master节点的地址,并在发生故障转移时,如何自动切换master地址的呢?以Jedis连接池为例,通过源码来了解内部实现原理。
在 JedisSentinelPool 类的构造函数中,对连接池做了初始化,如下
public JedisSentinelPool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout,
final String password, final int database, final String clientName) {
this.poolConfig = poolConfig;
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = password;
this.database = database;
this.clientName = clientName;
HostAndPort master = initSentinels(sentinels, masterName);
initPool(master);
}
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
for (String sentinel : sentinels) {
final HostAndPort hap = HostAndPort.parseString(sentinel);
log.fine("Connecting to Sentinel " + hap);
Jedis jedis = null;
try {
jedis = new Jedis(hap.getHost(), hap.getPort());
List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
// connected to sentinel...
sentinelAvailable = true;
if (masterAddr == null || masterAddr.size() != 2) {
log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap
+ ".");
continue;
}
master = toHostAndPort(masterAddr);
log.fine("Found Redis master at " + master);
break;
} catch (JedisException e) {
// resolves #1036, it should handle JedisException there's another chance
// of raising JedisDataException
log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
+ ". Trying next one.");
} finally {
if (jedis != null) {
jedis.close();
}
}
}
//省略了非关键代码
for (String sentinel : sentinels) {
final HostAndPort hap = HostAndPort.parseString(sentinel);
MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}
return master;
}
initSentinels() 方法中主要干了两件事:
遍历哨兵节点,通过get-master-addr-by-name命令获取master节点的地址信息,找到了就退出循环。get-master-addr-by-name命令执行结果如下所示
[root@dev-server-1 master-slave]# redis-cli -p 26379
127.0.0.1:26379> sentinel get-master-addr-by-name mymaster
1) "192.168.40.201"
2) "7001"
127.0.0.1:26379>
2.对每一个哨兵节点通过一个 MasterListener 进行监听(Redis的发布订阅功能),订阅哨兵节点+switch-master频道,当发生故障转移时,客户端能收到哨兵的通知,通过重新初始化连接池,完成主节点的切换。
MasterListener.run方法中监听哨兵部分代码如下
j.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
if (masterName.equals(switchMasterMsg[0])) {
initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
} else {
log.fine("Ignoring message on +switch-master for master name "
+ switchMasterMsg[0] + ", our master name is " + masterName);
}
} else {
log.severe("Invalid message received on Sentinel " + host + ":" + port
+ " on channel +switch-master: " + message);
}
}
}, "+switch-master");
initPool() 方法如下:如果发现新的master节点与当前的master不同,则重新初始化。
private void initPool(HostAndPort master) {
if (!master.equals(currentHostMaster)) {
currentHostMaster = master;
if (factory == null) {
factory = new JedisFactory(master.getHost(), master.getPort(), connectionTimeout,
soTimeout, password, database, clientName, false, null, null, null);
initPool(poolConfig, factory);
} else {
factory.setHostAndPort(currentHostMaster);
// although we clear the pool, we still have to check the
// returned object
// in getResource, this call only clears idle instances, not
// borrowed instances
internalPool.clear();
}
log.info("Created JedisPool to master at " + master);
}
}
通过以上两步,Jedis客户端在只知道哨兵地址的情况下便能获得master节点的地址信息,并且当发生故障转移时能自动切换到新的master节点地址。