[TOC]
Redis Sentinel
Redis Sentinel 是用于监控Redis集群中Master状态的工具 , Sentinel 只在 Server 端做主从切换 , 客户端需要额外的开发 , 所以我们的介绍 , 会分为Redis服务端 , 和 客户端两部分.
Sentinel的作用 :
1.Master的状态检测
2.如果Master出现异常 , 则会进行Master-Slave切换, 将其中一个Slave作为Master , 而将之前的Master实例切换为Slave .
3.Master-Slave切换后 , master_redis.conf , slave_redis.conf 和 sentinel.conf的内容都会发生改变 , 即 master_redis.conf中会多出一行slaveof的配置 , sentinel.conf的监控目标会随着变更
服务端工作方式:
Sentinel 对于不可用有两种定义
主观不可用(SDOWN) : SDOWN 是单个Sentinel实例检测到redis实例的状态 , 自己主观上判断为不可用
客观不可用(ODOWN) : ODOWN 需要Sentinel集群内一定数量(配置文件)的实例达成一致 , 才会认为Master节点不可用 , 然后执行failover策略 .
1.每个Sentinel实例以每秒一次的频率向自己所监控的Master,Slave以及其他Sentinel实例发送一个PING命令
2.如果一个实例距离最后一次有效回复PING命令的时间超过 down-after-millisenconds配置所指定的值 , 则这个实例会被Sentinel标记为主观不可用(SDOWN) .
3.如果一个Master实例被标记为主观不可用 , 则Sentinel 集群会进行投票 , 通过SENTINEL is_master_down_by_addr 命令 来获得其他Sentinel对Master的检测结果 , 如果超过指定数量的Sentinel认为该实例不可用 , 则Master会被标记为客观不可用(ODOWN) .
4.从SDOWN切换到ODOWN状态 , 不需要使用一致性算法 , 只使用gossip协议.
5.ODOWN状态只适用于Master节点 , Slave节点和Sentinel节点不会存在ODOWN状态 , 也不需要进行投票
客户端工作方式:
对于客户端的工作方式 , 我们以分析Java版本客户端 Jedis的源码为主。
构造器 :
public JedisSentinelPool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig,
final int connectionTimeout, final int soTimeout,
final String password,
final int database,
final String clientName) {
this.poolConfig = poolConfig;
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = password;
this.database = database;
this.clientName = clientName;
//初始化 Sentinels
HostAndPort master = initSentinels(sentinels, masterName);
// 初始化连接池
initPool(master);
}
初始化方法 (initSentinels) :
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
HostAndPort master = null;
boolean sentinelAvailable = false;
log.info("Trying to find master from available Sentinels...");
// 遍历Sentinel集群
for (String sentinel : sentinels) {
// 解析 Sentinel 地址
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
log.fine("Connecting to Sentinel " + hap);
Jedis jedis = null;
try {
// 创建Sentinel 连接
jedis = new Jedis(hap.getHost(), hap.getPort());
// 根据MasterName获取Master地址 , 返回一个集合 , 下标 0 是地址 , 下标 1 是端口
List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
// connected to sentinel...
sentinelAvailable = true;
if (masterAddr == null || masterAddr.size() != 2) {
log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + ".");
continue;
}
// 实例化地址
master = toHostAndPort(masterAddr);
log.fine("Found Redis master at " + master);
// 如果在任何一个Sentinel中找到了master , 跳出循环
break;
} catch (JedisException e) {
// resolves #1036, it should handle JedisException there's another chance
// of raising JedisDataException
log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e + ". Trying next one.");
} finally {
if (jedis != null) {
jedis.close();
}
}
}
if (master == null) {
if (sentinelAvailable) {
// can connect to sentinel, but master name seems to not
// monitored
throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored...");
} else {
throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running...");
}
}
log.info("Redis master running at " + master + ", starting Sentinel listeners...");
// 遍历Sentinel集群地址 , 针对每一个实例 , 启动一个监听器
for (String sentinel : sentinels) {
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}
return master;
}
在initSentinels方法中 , 遍历Sentinel集群 , 并通过与Jedis绑定的客户端 , 发送一个 get-master-addr-by-name命令 ,来询问master节点的地址 。 直到找到master节点的地址 , 或者确认不存在指定名字的master节点。
在这段代码的最后 , 我们可以看到针对每一个Sentinel实例 都启动了一个监听器 , 我们来分析一下 MasterListener做了什么。
MasterListener
protected class MasterListener extends Thread {
protected String masterName;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis = 5000;
// 因监听器可能被多个线程访问 , 所以jedis对象被修饰为**可见的** ,
// 即一个线程修改了Jedis实例 , 其他的线程也可以得到最新的实例
protected volatile Jedis j;
protected AtomicBoolean running = new AtomicBoolean(false);
protected MasterListener() { }
public MasterListener(String masterName, String host, int port) {
super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
this.masterName = masterName;
this.host = host;
this.port = port;
}
public MasterListener(String masterName, String host, int port, long subscribeRetryWaitTimeMillis) {
this(masterName, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}
public void run() {
running.set(true);
while (running.get()) {
j = new Jedis(host, port);
try {
// double check that it is not being shutdown
if (!running.get()) {
break;
}
// 订阅 channelName 为 "+switch-master" 的消息
j.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
if (masterName.equals(switchMasterMsg[0])) {
// 初始连接池
initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
} else {
log.fine("Ignoring message on +switch-master for master name "+ switchMasterMsg[0] + ", our master name is " + masterName);
}
} else {
log.severe("Invalid message received on Sentinel " + host + ":" + port + " on channel +switch-master: " + message);
}
}
//channelName
}, "+switch-master");
} catch (JedisConnectionException e) {
if (running.get()) {
log.log(Level.SEVERE, "Lost connection to Sentinel at " + host + ":" + port + ". Sleeping 5000ms and retrying.",e);
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
log.log(Level.SEVERE, "Sleep interrupted: ", e1);
}
} else {
log.fine("Unsubscribing from Sentinel at " + host + ":" + port);
}
} finally {
j.close();
}
}
}
protected volatile Jedis j;
protected AtomicBoolean running = new AtomicBoolean(false);
从成员变量中我们可以看到 为了保障线程安全 , 代码中使用了volatile(可见性关键字) , 和布尔的原子变量 , 我会在另外的文章里 , 描述他们在使用上的区别 。
在 MasterListener 这个监听器里 , 我们可以看到这里订阅了一个名为 "+switch-master" 的事件 , 当得到这个事件的时候,调用 initPool 方法 , 用来更新Master节点的地址 , 并且初始化连接池 。