本地缓存:mybatis实现:装饰器模式实践
- PerpetualCache:永久缓存:通过HashMap实现最大容量为Integer.MaxValue 的不过期缓存
- LruCache:固定缓存大小,实现最近最少使用的Key过期:通过LinkedHashMap实现LRU算法的缓存
- 覆写removeEldestEntry方法(达到最大容量返回true)
- accessOrder 设为true
public void setSize(final int size) {
keyMap = new LinkedHashMap<Object, Object>(size, .75F, true) {
private static final long serialVersionUID = 4267176411845948333L;
@Override
protected boolean removeEldestEntry(Map.Entry<Object, Object> eldest) {
boolean tooBig = size() > size;
if (tooBig) {
eldestKey = eldest.getKey();
}
return tooBig;
}
};
}
- FifoCache:固定缓存大小,实现按添加顺序,最老的key过期:利用LinkedList 实现FIFO算法的缓存,使用Deque 的addLast removeFirst 方法
private void cycleKeyList(Object key) {
keyList.addLast(key);
if (keyList.size() > size) {
Object oldestKey = keyList.removeFirst();
delegate.removeObject(oldestKey);
}
}
- BlockingCache:通过ReentrantLock + ConcurrentHashMap 实现阻塞式缓存,锁粒度为单个缓存Key,以达到单机多线程环境下,只触发一次缓存数据load(业务逻辑为,get加锁,先get为null,从db load出来后put进缓存,并释放锁)
public class BlockingCache implements Cache {
private long timeout;
private final Cache delegate;
private final ConcurrentHashMap<Object, ReentrantLock> locks;
}
- TransactionalCache:实现类似不可重复读的事务缓存
集中式缓存:Memcached
业务逻辑:查询缓存,若未命中,触发加载db数据进缓存
//查询缓存
Object value = cache.get(key);
if (value != null) return value;
//查询db数据放入缓存中再返回。
value = loadingFromDb(key);
cache.put(key, value)
return value;
考虑有一下几个问题:
- 缓存穿透:若一直在查询一条不存在的数据,每次访问缓存拿到的结果都是null,那么每次都会触发一次db加载数据的操作:改进,维护一个黑名单,直接返回空或者默认数据。
- 缓存雪崩:若某一时间点,有大量请求需要查询缓存,这时缓存没数据或者缓存集中过期,会触发大量从数据库 加载数据的请求,给数据库造成很大压力,严重时可能产生数据库雪崩:
- 改进思路:
- 首先要避免缓存集中过期的情况,设置过期时间,给个浮动范围,让其可以在几分钟内平滑过期;
- 大量请求进来,还是可能会出现缓存没值的场景,触发大量db loading,分析可知,只要有一次db loading成功,刷新过缓存,后续请求进来后就可以直接走缓存了,加个ReentrantLock,然后tryLock等待个几秒钟就可以解决这个问题,实现预热缓存
//查询缓存
Object value = cache.get(key);
if (value != null) return value;
//多线程环境下加锁防止加载逻辑被触发多次,或者加分布式锁
reentrantLock.tryLock(ms);
//查询db数据放入缓存中再返回。
value = loadingFromDb(key);
cache.put(key, value);
//解锁
reentrantLock.unlock();
return value;
- 缓存一致性问题:存在场景需要缓存多个不同Key的订单数据,但由于订单数据会在多个环节被修改,一旦数据被修改了,就会出现数据不一致,即缓存的是脏数据
- 改进一(偏向时效性):利用MQ来保证一致性,约定一个topic,在数据被修改时,发消息触发刷新逻辑从数据库加载最新改动的数据进缓存。
- 改进二(业务容忍一定的延迟):为缓存记录设定过期时间,在缓存过期后到来的请求,会自动触发缓存刷新逻辑,这里面带来个小问题,一般对热点数据的访问请求都会持续的进行。在热点数据过期后,再请求,会进入缓存刷新逻辑,表现出来会有周期性的接口请求慢的场景,对用户体验不友好。对此在进行优化,设计二阶段缓存实现+异步刷新逻辑:即通过给缓存对象设定 软过期和硬过期时间(软过期<硬过期),每次查询缓存对象时,判断下软过期是否小于当前时间,如果是,先返回缓存对象,提交一个Runable,通过线程池触发异步刷新缓存的逻辑:考虑到微服务下有多个实例部署,加一个redis锁,同一时刻,保证数据加载逻辑只会被触发一次
/**
* 异步缓存重刷
*
* @param cachedObject 缓存数据实体
* @param realKey 真实的缓存KEY
* @param logicalExpireSeconds 逻辑失效
* @param physicalExpireSeconds 物理失效
* @param cacheNullable 是否允许cacheNull
* @param func 异步reload业务回调
* @param clazz 数据返回类型
* @param <T> 数据加载回调返回类型
*/
private <T> T asyncReloadAndGetCachedResult(CacheObject<T> cachedObject, String realKey, int logicalExpireSeconds,
int physicalExpireSeconds, boolean cacheNullable, Func<T> func,
Class<T> clazz, Lock lock) {
T result = null;
if (cachedObject != null) {
if (cachedObject.value != null) {
if (clazz == null || clazz.isAssignableFrom(cachedObject.value.getClass())) {
//clazz没设定,类型转换异常只能再业务层发现了
result = cachedObject.value;
if (cachedObject.isLogicExpire()) {
//软过期,触发异步重刷机制,返回缓存中数据
asyncReload(realKey, logicalExpireSeconds, physicalExpireSeconds, cacheNullable, func, clazz,
lock);
}
} else {
//如果缓存中的数据会导致类型转换异常
LOGGER.error("cached result class type is not match! expect:{}, actual:{}", clazz,
cachedObject.value.getClass());
result = syncReloadAndGet(realKey, logicalExpireSeconds, physicalExpireSeconds, cacheNullable,
func, clazz);
}
} else {
if (cacheNullable) {
if (cachedObject.isLogicExpire()) {
//软过期,触发异步重刷机制,返回缓存中数据
asyncReload(realKey, logicalExpireSeconds, physicalExpireSeconds, true, func, clazz, lock);
}
} else {
//不允许cache缓存却拿到了null(可能修改了nullable参数),这里强制进行一次缓存刷新,避免业务层发生错误
result = syncReloadAndGet(realKey, logicalExpireSeconds, physicalExpireSeconds, false,
func, clazz);
}
}
}
return result;
}
/**
* 异步缓存重刷
*
* @param realKey 真实的缓存KEY
* @param logicalExpireSeconds 逻辑失效
* @param physicalExpireSeconds 物理失效
* @param cacheNullable 是否允许cacheNull
* @param func 异步reload业务回调
* @param tClass 缓存的对象类型
* @param <T> 数据加载回调返回类型
*/
private <T> void asyncReload(String realKey, int logicalExpireSeconds, int physicalExpireSeconds,
boolean cacheNullable, Func<T> func, Class<T> tClass, Lock lock) {
try {
//if (!LOCAL_QUEUED_WORKING_TASK_SET.contains(realKey)) {
JedisLock jedisLock = cacheRefreshLockFactory.buildLock(realKey);
if (jedisLock.tryLock()) {
try {
//更新待处理任务列表,避免重入
//LOCAL_QUEUED_WORKING_TASK_SET.add(realKey);
CACHE_REFRESH_EXECUTOR.execute(new CacheRefreshTask(realKey, () -> {
try {
if (lock != null) {
//阻塞等待主线程完成剩余操作
lock.lock();
}
try {
T value = func.invoke();
if (cacheNullable) {
refreshCache(realKey, value, logicalExpireSeconds, physicalExpireSeconds, tClass);
} else {
if (value != null) {
refreshCache(realKey, value, logicalExpireSeconds, physicalExpireSeconds,
tClass);
}
}
} finally {
if (lock != null) {
lock.unlock();
}
}
} catch (Exception ex) {
LOGGER.error("reload cache of {} failed!", realKey, ex);
} finally {
//移除任务
//LOCAL_QUEUED_WORKING_TASK_SET.remove(realKey);
}
}));
} finally {
//do nothing
}
} else {
//若任务正在处理中,不做任何处理。
//此处不能更新LOCAL_QUEUED_WORKING_TASK_SET,因为任务不再当前实例上得到运行,任务不会得到更新。
}
//} else {
// LOGGER.warn("task is in queue waiting for being executed!");
//}
} catch (Exception e) {
LOGGER.error("asyncReload failed! realKey:{}", realKey, e);
}
}
分布式缓存
参考:https://www.cnblogs.com/moonandstar08/p/5405991.html
当缓存的数据量超过单机缓存的上线,就需要引入多台缓存服务器 组成缓存集群,对缓存请求做负载均衡,每台机器负责 1/n的 请求,常用算法有:轮循算法(Round Robin)、哈希算法(HASH)、最少连接算法(Least Connection)、响应速度算法(Response Time)、加权法(Weighted ),但考虑到缓存服务的特殊性,他需要在机器上存储数据,常见的这些算法(除hash算法外)会造成缓存数据冗余,并且命中率不高,造成集群利用率降低,而且随着系统访问量上升,压力越来越大,实际压力超过服务器的能承受的上限,出现节点宕机的情况,后续会通过加节点机器的形式来缓解压力,在分布式缓存中,经常会出现增删机器的情况,引发缓存的重分布,在重分布过程中,大量请求到达db,极端情况下引起数据库宕机
哈希算法:i = hash(key) mod N,对将要存入的key做hash计算,然后对可用的机器总数取模,得出的编号i,就是实际缓存需要存放的目标机器编号,在增删节点机器的情况下,N发生变化,导致现有缓存失效,引发大量的缓存重分布,造成db压力过大,极端情况下引起数据库宕机
一致性哈希算法:Consistent Hashing,思路是将所有缓存机器编号,计算hash值,放入一个拥有2的31次方的hash环中,对将要存入的缓存key做hash计算,得出hash值,取hash环按顺时针寻找,找到的第一个可用的hash值,它所对应的机器,就是目标缓存机器。
分析:
- 场景1:假如集群中原先有N个节点,第N个节点宕机,重新发布业务系统后,那按照一致性哈希算法,原先分步在第N个节点上的数据(hash(cacheKey) > hash(N-1))会重分步到第一个节点上,前N-1个节点的缓存数据依然有效可用,
- 假如集群中原先有N个节点,现增加一个节点,重新发布业务系统后,那按照一致性哈希算法,原先分步在第N个节点上的一部分数据(hash(N)< hash(cacheKey)< hash(N + 1))会重分布到新增节点上
结论:由此可见,一致性哈希算法,在增减集群节点时,只会引起一部分缓存数据重分布,相比普通的hash算法,减少了缓存失效的范围,减少了系统颠簸
缺点:一致性哈希算法的使用,因为集群机器的hash值在hash环中的位置问题,可能会引发数据倾斜,出现集群中某些节点量很大,其他节点存储量很小,可以引入虚拟节点(即为集群中每台机器计算多次hash值)hash环中放置虚拟节点,多个虚拟节点和机器之间维护一份映射,存储数据时,通过虚拟节点找到机器,通过这总方式,让集群机器在hash环中的分布更分散,已达到均匀分布的效率。
举例:dubbo消费方对提供方的调用,采用一致性hash算法来做的话,会为每个提供方Invoker生成160份虚拟节点,以此让请求的分发更加均匀。
实现:TreeMap(存储hash环) + hashMap(维护虚拟节点和机器的映射),参考dubbo的一致性hash算法实现
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
//初始化hash环
public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = System.identityHashCode(invokers);
URL url = invokers.get(0).getUrl();
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i ++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(invoker.getUrl().toFullString() + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
//顺时针查找满足条件的机器
private Invoker<T> sekectForKey(long hash) {
Invoker<T> invoker;
Long key = hash;
if (!virtualInvokers.containsKey(key)) {
SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
if (tailMap.isEmpty()) {
key = virtualInvokers.firstKey();
} else {
key = tailMap.firstKey();
}
}
invoker = virtualInvokers.get(key);
return invoker;
}
}