主要成员变量
public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage {
// 实际存储数据的位置
private final EntryLogger entryLogger;
// -----------------
// index 相关
// -----------------
// 记录fence,exist,masterKey等信息
private final LedgerMetadataIndex ledgerIndex;
// 关于位置的index
private final EntryLocationIndex entryLocationIndex;
// 临时的ledgerCache
private final ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache;
// -----------------
// 写入相关
// -----------------
// 用来写入的memtable,2个互相swap
private final StampedLock writeCacheRotationLock = new StampedLock();
// Write cache where all new entries are inserted into
protected volatile WriteCache writeCache;
// Write cache that is used to swap with writeCache during flushes
protected volatile WriteCache writeCacheBeingFlushed;
// Cache where we insert entries for speculative reading
private final ReadCache readCache;
// checkpoint 相关
private final CheckpointSource checkpointSource;
private Checkpoint lastCheckpoint = Checkpoint.MIN;
}
主要作用
- 可以读写ledger,维护ledger的位置(index)
- 保存ledger相关的metadata
- 支持checkpoint
写入Entry
写入会直接写入到WriteCache
里面,这里面使用了StampLock,将swap cache的操作进行了保护,StampLock是一个乐观读的读写锁,并发更高。
public long addEntry(ByteBuf entry) throws IOException, BookieException {
long startTime = MathUtils.nowInNano();
long ledgerId = entry.getLong(entry.readerIndex());
long entryId = entry.getLong(entry.readerIndex() + 8);
long lac = entry.getLong(entry.readerIndex() + 16);
// 这里的模板是StampLock乐观读取的通用模板
// 相对的互斥操作实际上是swap cache的操作
// First we try to do an optimistic locking to get access to the current write cache.
// This is based on the fact that the write cache is only being rotated (swapped) every 1 minute. During the
// rest of the time, we can have multiple thread using the optimistic lock here without interfering.
// 乐观读锁
long stamp = writeCacheRotationLock.tryOptimisticRead();
boolean inserted = false;
inserted = writeCache.put(ledgerId, entryId, entry);
// 如果插入过程中发生了cache swap 则再次插入
if (!writeCacheRotationLock.validate(stamp)) {
// The write cache was rotated while we were inserting. We need to acquire the proper read lock and repeat
// the operation because we might have inserted in a write cache that was already being flushed and cleared,
// without being sure about this last entry being flushed or not.
// 说明插入到被swap的那个cache里面了
// 如果insert是true TODO
// 如果是false的话没啥影响
stamp = writeCacheRotationLock.readLock();
try {
inserted = writeCache.put(ledgerId, entryId, entry);
} finally {
writeCacheRotationLock.unlockRead(stamp);
}
}
// 如果这里写入到writeCache失败的话,触发Flush WriteCache
// 走到这里说明可能2个buffer都满了?
if (!inserted) {
triggerFlushAndAddEntry(ledgerId, entryId, entry);
}
// 更新LAC的缓存
// after successfully insert the entry, update LAC and notify the watchers
updateCachedLacIfNeeded(ledgerId, lac);
return entryId;
}
writeCache满了,触发flush的流程
这里的逻辑比较容易,一直不断循环插入到writeCache 里面,如果超时的话就跳出循环标记,这个写入失败。
如果没有触发flush动作的话,会提交一个flush task。
private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry)
throws IOException, BookieException {
// metric 打点
dbLedgerStorageStats.getThrottledWriteRequests().inc();
...
// 最大等待写入时间,超时之前不断重试
while (System.nanoTime() < absoluteTimeoutNanos) {
// Write cache is full, we need to trigger a flush so that it gets rotated
// If the flush has already been triggered or flush has already switched the
// cache, we don't need to trigger another flush
// 提交一个flush任务,如果之前有了就不提交了
if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) {
// Trigger an early flush in background
log.info("Write cache is full, triggering flush");
executor.execute(() -> {
try {
flush();
} catch (IOException e) {
log.error("Error during flush", e);
}
});
}
long stamp = writeCacheRotationLock.readLock();
try {
if (writeCache.put(ledgerId, entryId, entry)) {
// We succeeded in putting the entry in write cache in the
return;
}
} finally {
writeCacheRotationLock.unlockRead(stamp);
}
// Wait some time and try again
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId);
}
}
// Timeout expired and we weren't able to insert in write cache
dbLedgerStorageStats.getRejectedWriteRequests().inc();
throw new OperationRejectedException();
}
flush 流程
实际上flush流程是触发checkpoint的逻辑,
主要动作
交换2个writeCache,正在写入的cache会被交换成flush的batch
遍历writeCache,将内容写到EntryLogger里面
sync EntryLogger将上一步写入的内容落盘
更新ledgerLocationIndex,同时flush这个index到rocksDb里面
public void flush() throws IOException {
// journal
Checkpoint cp = checkpointSource.newCheckpoint();
checkpoint(cp);
checkpointSource.checkpointComplete(cp, true);
}
public void checkpoint(Checkpoint checkpoint) throws IOException {
// journal
Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
// 这里检查是否在这个点之前做过checkpoint了
if (lastCheckpoint.compareTo(checkpoint) > 0) {
return;
}
long startTime = MathUtils.nowInNano();
// Only a single flush operation can happen at a time
flushMutex.lock();
try {
// Swap the write cache so that writes can continue to happen while the flush is
// ongoing
// 这里逻辑比较容易,交换当前的writeCache和后备的writeCache
// 获取的是StampLock的writeLock
swapWriteCache();
long sizeToFlush = writeCacheBeingFlushed.size();
// Write all the pending entries into the entry logger and collect the offset
// position for each entry
// 刷cache到实际的保存位置上、
// 构建一个rocksDb的batch
Batch batch = entryLocationIndex.newBatch();
writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
try {
// 把写入的entry刷到entryLogger里面
// 这里返回的这个entry的offset
long location = entryLogger.addEntry(ledgerId, entry, true);
// 这里的逻辑实际上就是把3个long 拆分成k/v 写入到RocksDb的batch 里面
entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
// 这里不展开说了,实际上会把刚才写入的entryLogger进行flush && fsync 到磁盘上。
entryLogger.flush();
// 这里触发RocksDb的batch flush
// 这个写入是sync的
long batchFlushStarTime = System.nanoTime();
batch.flush();
batch.close();
// flush ledgerIndex
// 这里的内容变化比较少,因为记录的是metadata
ledgerIndex.flush();
// 调度一个cleanUp的逻辑
cleanupExecutor.execute(() -> {
// There can only be one single cleanup task running because the cleanupExecutor
// is single-threaded
try {
if (log.isDebugEnabled()) {
log.debug("Removing deleted ledgers from db indexes");
}
entryLocationIndex.removeOffsetFromDeletedLedgers();
ledgerIndex.removeDeletedLedgers();
} catch (Throwable t) {
log.warn("Failed to cleanup db indexes", t);
}
});
// 保存checkpoint
lastCheckpoint = thisCheckpoint;
// 清空这个cache
// Discard all the entry from the write cache, since they're now persisted
writeCacheBeingFlushed.clear();
} catch (IOException e) {
// Leave IOExecption as it is
throw e;
} catch (RuntimeException e) {
// Wrap unchecked exceptions
throw new IOException(e);
} finally {
try {
isFlushOngoing.set(false);
} finally {
flushMutex.unlock();
}
}
}
这样写入就完成了
读取Entry
这里会从3个位置开始读取
writeCache,包括正在刷新的和正在写入的
readCache,预读的缓存
entryLogger,读文件,这部分已经落盘了
读取成功之后会尝试增加预读的buffer
如果正在flush这个时候有触发读取会怎么样?
上面的flush流程是在所有内容已经落盘之后才把刷新的writeCache 清空的
即使有并发读,如果最后还是落到了读文件这一步,那怎么都能读到
还有个问题就是这个先后顺序,不确定是否有相同ledgerId,entry,但是内容不同的请求出现。
这样的话感觉可能有问题
public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
long startTime = MathUtils.nowInNano();
// 读LAC的情况
if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
return getLastEntry(ledgerId);
}
// We need to try to read from both write caches, since recent entries could be found in either of the two. The
// write caches are already thread safe on their own, here we just need to make sure we get references to both
// of them. Using an optimistic lock since the read lock is always free, unless we're swapping the caches.
long stamp = writeCacheRotationLock.tryOptimisticRead();
WriteCache localWriteCache = writeCache;
WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;
if (!writeCacheRotationLock.validate(stamp)) {
// Fallback to regular read lock approach
stamp = writeCacheRotationLock.readLock();
try {
localWriteCache = writeCache;
localWriteCacheBeingFlushed = writeCacheBeingFlushed;
} finally {
writeCacheRotationLock.unlockRead(stamp);
}
}
// First try to read from the write cache of recent entries
ByteBuf entry = localWriteCache.get(ledgerId, entryId);
if (entry != null) {
recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
return entry;
}
// If there's a flush going on, the entry might be in the flush buffer
entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
if (entry != null) {
recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
return entry;
}
// Try reading from read-ahead cache
entry = readCache.get(ledgerId, entryId);
if (entry != null) {
recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
return entry;
}
// Read from main storage
long entryLocation;
try {
entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
if (entryLocation == 0) {
throw new NoEntryException(ledgerId, entryId);
}
entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
} catch (NoEntryException e) {
recordFailedEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
throw e;
}
readCache.put(ledgerId, entryId, entry);
// Try to read more entries
long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();
fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheMissStats(), startTime);
recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
return entry;
}