Apache Bookkeeper —— SingleDirectoryDBLedgerStorage

主要成员变量

public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage {

    // 实际存储数据的位置
    private final EntryLogger entryLogger;

    // -----------------
    // index 相关
    // -----------------
 
    // 记录fence,exist,masterKey等信息
    private final LedgerMetadataIndex ledgerIndex;
    // 关于位置的index
    private final EntryLocationIndex entryLocationIndex;
  
    // 临时的ledgerCache
    private final ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache;
  
    // -----------------
    // 写入相关
    // -----------------
  
  
    // 用来写入的memtable,2个互相swap
    private final StampedLock writeCacheRotationLock = new StampedLock();
    // Write cache where all new entries are inserted into
    protected volatile WriteCache writeCache;
    // Write cache that is used to swap with writeCache during flushes
    protected volatile WriteCache writeCacheBeingFlushed;
  

    // Cache where we insert entries for speculative reading
    private final ReadCache readCache;

  
    // checkpoint 相关
    private final CheckpointSource checkpointSource;
    private Checkpoint lastCheckpoint = Checkpoint.MIN;
}

主要作用

  1. 可以读写ledger,维护ledger的位置(index)
  2. 保存ledger相关的metadata
  3. 支持checkpoint

写入Entry

写入会直接写入到WriteCache里面,这里面使用了StampLock,将swap cache的操作进行了保护,StampLock是一个乐观读的读写锁,并发更高。

public long addEntry(ByteBuf entry) throws IOException, BookieException {
        long startTime = MathUtils.nowInNano();

        long ledgerId = entry.getLong(entry.readerIndex());
        long entryId = entry.getLong(entry.readerIndex() + 8);
        long lac = entry.getLong(entry.readerIndex() + 16);

        // 这里的模板是StampLock乐观读取的通用模板
        // 相对的互斥操作实际上是swap cache的操作
  
        // First we try to do an optimistic locking to get access to the current write cache.
        // This is based on the fact that the write cache is only being rotated (swapped) every 1 minute. During the
        // rest of the time, we can have multiple thread using the optimistic lock here without interfering.
       
        // 乐观读锁
        long stamp = writeCacheRotationLock.tryOptimisticRead();
        boolean inserted = false;

        inserted = writeCache.put(ledgerId, entryId, entry);
        // 如果插入过程中发生了cache swap 则再次插入
        if (!writeCacheRotationLock.validate(stamp)) {
            // The write cache was rotated while we were inserting. We need to acquire the proper read lock and repeat
            // the operation because we might have inserted in a write cache that was already being flushed and cleared,
            // without being sure about this last entry being flushed or not.
          
            // 说明插入到被swap的那个cache里面了
            // 如果insert是true TODO
            // 如果是false的话没啥影响
            stamp = writeCacheRotationLock.readLock();
            try {
                inserted = writeCache.put(ledgerId, entryId, entry);
            } finally {
                writeCacheRotationLock.unlockRead(stamp);
            }
        }

        // 如果这里写入到writeCache失败的话,触发Flush WriteCache
        // 走到这里说明可能2个buffer都满了?
        if (!inserted) {
            triggerFlushAndAddEntry(ledgerId, entryId, entry);
        }

        // 更新LAC的缓存
        // after successfully insert the entry, update LAC and notify the watchers
        updateCachedLacIfNeeded(ledgerId, lac);
  
        return entryId;
}

writeCache满了,触发flush的流程

这里的逻辑比较容易,一直不断循环插入到writeCache 里面,如果超时的话就跳出循环标记,这个写入失败。

如果没有触发flush动作的话,会提交一个flush task。

private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry)
            throws IOException, BookieException {
        // metric 打点
        dbLedgerStorageStats.getThrottledWriteRequests().inc();
        ...
        // 最大等待写入时间,超时之前不断重试
        while (System.nanoTime() < absoluteTimeoutNanos) {
            // Write cache is full, we need to trigger a flush so that it gets rotated
            // If the flush has already been triggered or flush has already switched the
            // cache, we don't need to trigger another flush
          
            // 提交一个flush任务,如果之前有了就不提交了
            if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) {
                // Trigger an early flush in background
                log.info("Write cache is full, triggering flush");
                executor.execute(() -> {
                        try {
                            flush();
                        } catch (IOException e) {
                            log.error("Error during flush", e);
                        }
                    });
            }

            long stamp = writeCacheRotationLock.readLock();
            try {
                if (writeCache.put(ledgerId, entryId, entry)) {
                    // We succeeded in putting the entry in write cache in the
                    return;
                }
            } finally {
                writeCacheRotationLock.unlockRead(stamp);
            }

            // Wait some time and try again
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId);
            }
        }

        // Timeout expired and we weren't able to insert in write cache
        dbLedgerStorageStats.getRejectedWriteRequests().inc();
        throw new OperationRejectedException();
}

flush 流程

实际上flush流程是触发checkpoint的逻辑,

主要动作

  • 交换2个writeCache,正在写入的cache会被交换成flush的batch

  • 遍历writeCache,将内容写到EntryLogger里面

  • sync EntryLogger将上一步写入的内容落盘

  • 更新ledgerLocationIndex,同时flush这个index到rocksDb里面

public void flush() throws IOException {
    // journal
    Checkpoint cp = checkpointSource.newCheckpoint();
    checkpoint(cp);
    checkpointSource.checkpointComplete(cp, true);
}

public void checkpoint(Checkpoint checkpoint) throws IOException {
        // journal
        Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
  
        // 这里检查是否在这个点之前做过checkpoint了
        if (lastCheckpoint.compareTo(checkpoint) > 0) {
            return;
        }

        long startTime = MathUtils.nowInNano();

        // Only a single flush operation can happen at a time
        flushMutex.lock();

        try {
            // Swap the write cache so that writes can continue to happen while the flush is
            // ongoing
            // 这里逻辑比较容易,交换当前的writeCache和后备的writeCache
            // 获取的是StampLock的writeLock
            swapWriteCache();

            long sizeToFlush = writeCacheBeingFlushed.size();
            

            // Write all the pending entries into the entry logger and collect the offset
            // position for each entry
            
            // 刷cache到实际的保存位置上、
          
            // 构建一个rocksDb的batch
            Batch batch = entryLocationIndex.newBatch();
            writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
                try {
                    // 把写入的entry刷到entryLogger里面
                    // 这里返回的这个entry的offset
                    long location = entryLogger.addEntry(ledgerId, entry, true);
                  
                    // 这里的逻辑实际上就是把3个long 拆分成k/v 写入到RocksDb的batch 里面
                    entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });

            // 这里不展开说了,实际上会把刚才写入的entryLogger进行flush && fsync 到磁盘上。
            entryLogger.flush();

            // 这里触发RocksDb的batch flush
            // 这个写入是sync的
            long batchFlushStarTime = System.nanoTime();
            batch.flush();
            batch.close();
            
          
            // flush ledgerIndex
            // 这里的内容变化比较少,因为记录的是metadata
            ledgerIndex.flush();

            // 调度一个cleanUp的逻辑
            cleanupExecutor.execute(() -> {
                // There can only be one single cleanup task running because the cleanupExecutor
                // is single-threaded
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("Removing deleted ledgers from db indexes");
                    }

                    entryLocationIndex.removeOffsetFromDeletedLedgers();
                    ledgerIndex.removeDeletedLedgers();
                } catch (Throwable t) {
                    log.warn("Failed to cleanup db indexes", t);
                }
            });
            
            // 保存checkpoint 
            lastCheckpoint = thisCheckpoint;

            // 清空这个cache
          
            // Discard all the entry from the write cache, since they're now persisted
            writeCacheBeingFlushed.clear();

            
        } catch (IOException e) {
            // Leave IOExecption as it is
            throw e;
        } catch (RuntimeException e) {
            // Wrap unchecked exceptions
            throw new IOException(e);
        } finally {
            try {
                isFlushOngoing.set(false);
            } finally {
                flushMutex.unlock();
            }
        }
}

这样写入就完成了

读取Entry

这里会从3个位置开始读取

  1. writeCache,包括正在刷新的和正在写入的

  2. readCache,预读的缓存

  3. entryLogger,读文件,这部分已经落盘了

读取成功之后会尝试增加预读的buffer


如果正在flush这个时候有触发读取会怎么样?

上面的flush流程是在所有内容已经落盘之后才把刷新的writeCache 清空的

即使有并发读,如果最后还是落到了读文件这一步,那怎么都能读到


还有个问题就是这个先后顺序,不确定是否有相同ledgerId,entry,但是内容不同的请求出现。

这样的话感觉可能有问题

public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
        long startTime = MathUtils.nowInNano();
        
        // 读LAC的情况
        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
            return getLastEntry(ledgerId);
        }

        // We need to try to read from both write caches, since recent entries could be found in either of the two. The
        // write caches are already thread safe on their own, here we just need to make sure we get references to both
        // of them. Using an optimistic lock since the read lock is always free, unless we're swapping the caches.
        long stamp = writeCacheRotationLock.tryOptimisticRead();
        WriteCache localWriteCache = writeCache;
        WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;
        if (!writeCacheRotationLock.validate(stamp)) {
            // Fallback to regular read lock approach
            stamp = writeCacheRotationLock.readLock();
            try {
                localWriteCache = writeCache;
                localWriteCacheBeingFlushed = writeCacheBeingFlushed;
            } finally {
                writeCacheRotationLock.unlockRead(stamp);
            }
        }

        // First try to read from the write cache of recent entries
        ByteBuf entry = localWriteCache.get(ledgerId, entryId);
        if (entry != null) {
            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            return entry;
        }

        // If there's a flush going on, the entry might be in the flush buffer
        entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
        if (entry != null) {
            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            return entry;
        }

        // Try reading from read-ahead cache
        entry = readCache.get(ledgerId, entryId);
        if (entry != null) {
            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            return entry;
        }

        // Read from main storage
        long entryLocation;
        try {
            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
            if (entryLocation == 0) {
                throw new NoEntryException(ledgerId, entryId);
            }
            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
        } catch (NoEntryException e) {
            recordFailedEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            throw e;
        }

        readCache.put(ledgerId, entryId, entry);

        // Try to read more entries
        long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();
        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);

        recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheMissStats(), startTime);
        recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
        return entry;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容