Bluestore源码分析2 NVME Device

NVMEDevice是基于SPDK针对NVME设备的一种BlockDevice实现,模块的主要类图如下:

nvme-device-classes

接下来对主要的流程进行分析

初始化设备 NVMEDevice::open

配置项如:
bluestore_block_path = spdk:55cd2e404bd73932

int NVMEDevice::open(const string& p, int path_fd)
{
  string serial_number;
  int fd = ::open(p.c_str(), O_RDONLY | O_CLOEXEC);
  ...

  char buf[100];
  r = ::read(fd, buf, sizeof(buf));
  ...
  // 读取到设备sn
  serial_number = string(buf, i);

  // 调用NVMEManager进行设备加载
  r = manager.try_get(serial_number, &driver);
  // 将NVMEDevice注册到Driver
  driver->register_device(this);

  block_size = driver->get_block_size();
  size = driver->get_size();
  name = serial_number;

  ...
  return 0;
}

主要的初始化工作在manager.try_get中完成,其中进行NVME设备的发现、注册

int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
{
  // 处理参数bluestore_spdk_coremask指定的core mask
  // 需要至少2个core来运行spdk
  ...
  // 首次init = false
  if (!init) {
    init = true;
    // 启动一个线程,监视probe_queue,当其中有ProbeContext插入时,进行 spdk_nvme_probe 操作来发现设备
    // 省略部分不重要的细节,保留主要代码逻辑
    // probe_cb 用于指示是否连接设备
    // attach_cb 用于自定义连接设备后的操作
    dpdk_thread = std::thread(
      [this, coremask_arg, m_core_arg, mem_size_arg]() {
        ...
        // env 参数初始化
        spdk_env_opts_init(&opts);
        ...
        // 初始化env, DPDK库
        spdk_env_init(&opts);
        ...

        std::unique_lock<std::mutex> l(probe_queue_lock);
        while (true) {
          if (!probe_queue.empty()) {
            ProbeContext* ctxt = probe_queue.front();
            probe_queue.pop_front();

            // 遍历总线NVME设备,并通过uio/vfio将其连接到用户态NVME设备驱动
            // 是否连接设备,取决于probe_cb返回值,返回true的设备才连接
            r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL);
            ...
            probe_queue_cond.notify_all();
          } else {
            probe_queue_cond.wait(l);
          }
        }
      }
    );
    // 用于probe的dpdk_thread将一直运行,监听probe_queue,重复设备发现过程
    dpdk_thread.detach();
  }

  // 向probe_queue插入本次需要发现设备的ProbeContext
  // 触发dpdk_thread的设备probe
  ProbeContext ctx = {sn_tag, this, nullptr, false};
  {
    std::unique_lock<std::mutex> l(probe_queue_lock);
    probe_queue.push_back(&ctx);
    while (!ctx.done)
      probe_queue_cond.wait(l);
  }
  // 等待本次ProbeContext处理完成以后,获得SharedDriverData实例:ctx.driver
  if (!ctx.driver)
    return -1;

  // 至此,NVMEDevice与SharedDriverData的关联关系建立起来,为接下来的IO做好了准备
  *driver = ctx.driver;

  return 0;
}

在上面的probe流程中,有两个最关键的函数:probe_cb和attach_cb,接下来进行分析
probe_cb
注意参数cb_ctx即为此前spdk_nvme_probe调用时的ProbeContext实例指针

static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
{
  NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
  char serial_number[128];
  struct spdk_pci_addr pci_addr;
  struct spdk_pci_device *pci_dev = NULL;
  int result = 0;
  
  // 非本地NVME设备不做连接,还可能存在远程的基于NVMe-oF的设备(NVMe over Fabrics),Ceph仅使用本地NVME设备
  if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
    dout(0) << __func__ << " only probe local nvme device" << dendl;
    return false;
  }
  
  // 获取设备PCI地址,获取失败的设备不做连接
  result = spdk_pci_addr_parse(&pci_addr, trid->traddr);
  if (result) {
    dout(0) << __func__ << " failed to get pci address from %s, " << trid->traddr << " return value is: %d" << result << dendl;
    return false;
  }
  
  // 获取PCI设备信息,获取失败的设备不连接
  pci_dev = spdk_pci_get_device(&pci_addr);
  if (!pci_dev) {
    dout(0) << __func__ << " failed to get pci device" << dendl; 
    return false;
  }
  
  // 读取设备SN,读取失败的设备不连接
  result = spdk_pci_device_get_serial_number(pci_dev, serial_number, 128);
  if (result < 0) {
    dout(10) << __func__ << " failed to get serial number from %p" << pci_dev << dendl;
    return false;
  }
  
  // 对比配置参数的SN和设备的SN,若不一致,则为其它的非指定NVME设备,不连接
  if (ctx->sn_tag.compare(string(serial_number, 16))) { 
    dout(0) << __func__ << " device serial number (" << ctx->sn_tag << ") not match " << serial_number << dendl;
    return false;
  }
  // 一切正常,且SN匹配的设备,连接
  return true;
}

attach_cb

static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
                      struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
{
  struct spdk_pci_addr pci_addr;
  struct spdk_pci_device *pci_dev = NULL;

  spdk_pci_addr_parse(&pci_addr, trid->traddr);

  pci_dev = spdk_pci_get_device(&pci_addr);
  ...
  NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
  
  // 注册NVME设备控制器
  ctx->manager->register_ctrlr(ctx->sn_tag, ctrlr, pci_dev, &ctx->driver);
}

// 设备控制器注册逻辑
void register_ctrlr(const string &sn_tag, spdk_nvme_ctrlr *c, struct spdk_pci_device *pci_dev,
        SharedDriverData **driver) {
    // 确保manage的互斥锁,因为注册操作非线程安全
    assert(lock.is_locked());
    
    // 获取设备的namespace数量,至少要有1个,若超过1个,也仅使用第一个namespace
    spdk_nvme_ns *ns;
    int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
    assert(num_ns >= 1);
    ...
    ns = spdk_nvme_ctrlr_get_ns(c, 1);
    ...
  
    // 实际上,manager仅管理一个driver,因为现在的版本,OSD只能使用一个NVME设备
    assert(shared_driver_datas.empty());
    // 初始化SharedDriverData并将其加入driver列表,实际上只会有一个driver
    shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns));
    *driver = shared_driver_datas.back();
}

// SharedDriverData初始化逻辑
SharedDriverData(unsigned _id, const std::string &sn_tag,
                   spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
      : id(_id),
        sn(sn_tag),
        ctrlr(c),
        ns(ns) {
    int i;
    // 获得size、sector size等基本信息
    sector_size = spdk_nvme_ns_get_sector_size(ns);
    block_size = std::max(CEPH_PAGE_SIZE, sector_size);
    size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
    
    // 之前通过core mask指定了可用的核,这里将在除了主核之外的可用核上,各创建一个SharedDriverQueueData
    RTE_LCORE_FOREACH_SLAVE(i) {
      queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
    }
    // 调用每个SharedDriverQueueData的start()方法
    _aio_start();
  } 

// SharedDriverQueueData的初始化
SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
                          const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
      : driver(driver),
        ...
        // 指定queue的执行函数为 _aio_thread()
        run_func([this]() { _aio_thread(); }),
        completed_op_seq(0), queue_op_seq(0) {
    
    // 核心就一行代码,创建spdk nvme qpair
    qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT);
    ...
}

// SharedDriverQueueData的start()
void start() {
    // DPDK提供的函数,启动一个线程,执行初始化时指定的函数:_aio_thread(),
    // 并设置线程的CPU亲和性,指定到core_id对应的核
    // 达到在一个逻辑核上启动一个工作线程的目的
    int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
                                  core_id);
    assert(r == 0);

}

_aio_thread()的流程,结合后续的读写流程来讨论

WRITE

Bluestore的写流程中,在_txc_add_transaction过程中会调用BlockDeviceaio_write,
aio_write过程主要完成IOContext的装配工作
在Bluestore的_txc_state_proc状态机中,对于SimpleWrite会在STATE_PREPARE阶段调用BlockDeviceaio_submit提交IOContext
对于defferedWrite,会在kv_finalize_thread中调用BlockDeviceaio_submit提交IOContext
接下来分析NVMEDeviceaio_writeaio_submit过程

int NVMEDevice::aio_write(
    uint64_t off, 
    bufferlist &bl, 
    IOContext *ioc,
    bool buffered)
{
  uint64_t len = bl.length();
  
  // 确保IO的合法性,offset和length必须与block_size对齐,且不会越出设备大小边界
  assert(off % block_size == 0);
  assert(len % block_size == 0);
  assert(len > 0);
  assert(off < size);
  assert(off + len <= size);

  // 初始化NVMEDevice特有的Task
  Task *t = new Task(this, IOCommand::WRITE_COMMAND, off, len);
  t->write_bl = std::move(bl);
  
  if (buffered) {
    // buffered write,默认关闭,可通过参数打开
    // 直接将Task提交给SharedDriverQueueData,将被它的_aio_thread()处理
    if(queue_id == -1)
      queue_id = ceph_gettid();
    driver->get_queue(queue_id)->queue_task(t);
  } else {
    // 默认的非Buffered write,此处仅进行IOContext组装,并不提交IO
    // 经由后续的aio_submit来提交Task
    t->ctx = ioc; 
    Task *first = static_cast<Task*>(ioc->nvme_task_first);
    Task *last = static_cast<Task*>(ioc->nvme_task_last);
    if (last)
      last->next = t; 
    if (!first)
      ioc->nvme_task_first = t; 
    ioc->nvme_task_last = t; 
    ++ioc->num_pending;
  }
  return 0;
}

void NVMEDevice::aio_submit(IOContext *ioc)
{
  int pending = ioc->num_pending.load();
  Task *t = static_cast<Task*>(ioc->nvme_task_first);

  // num_pending在aio_write时自增,ioc->nvme_task_first也被设置为Task类型实例指针
  // 对于默认的非buffered write来说,此处的条件总是满足
  if (pending && t) {
    ioc->num_running += pending;
    ioc->num_pending -= pending;
    // 确认只有本线程在处理这个IOContext,不应存在多个线程处理同一个IOContext的情况
    assert(ioc->num_pending.load() == 0);

    // 提交Task到SharedDriverQueueData,将被它的_aio_thread()处理
    if(queue_id == -1)
      queue_id = ceph_gettid();
    driver->get_queue(queue_id)->queue_task(t, pending);
    ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
  }
}

经过aio_submit之后,Task被提交到SharedDriverQueueDatatask_queue队列中
需要注意的是,Task通过其next指针将多个Task串联起来,入队列的只是head Task

READ

BlueStore会调用BlockDevicereadaio_read两个方法来读取数据
相同点在于:

  1. 它们都会产生一个Task,类型为READ_COMMAND;
  2. 都会分配一个页对齐的buffer用于接收读取到的数据

区别在于:

  • read会直接提交Task到SharedDriverQueueDatatask_queue,然后同步等待直至Task被处理完成,获得读取到的数据同步返回
  • aio_read仅完成Task组装和IOContext组装,与aio_write一样,需要经过aio_submit来提交Task,为异步读取数据

_aio_thread()

由上面的分析可知,aio_writeaio_read操作最后都会将Task提交到SharedDriverQueueDatatask_queue
_aio_thread()的处理逻辑如下:

void SharedDriverQueueData::_aio_thread()
{
  // 准备数据buffer缓冲区
  if (data_buf_mempool.empty()) {
    for (uint16_t i = 0; i < data_buffer_default_num; i++) {
      void *b = spdk_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
      if (!b) {
        derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
        assert(b);
      }
      data_buf_mempool.push_back(b);
    }
  }

  Task *t = nullptr;
  int r = 0;
  uint64_t lba_off, lba_count;

  ceph::coarse_real_clock::time_point cur, start
    = ceph::coarse_real_clock::now();
  
  // 开始线程的处理逻辑循环
  while (true) {
    bool inflight = queue_op_seq.load() - completed_op_seq.load();
 again:
    
    // 当存在在途未完成的请求时,进行一次completion收割操作,若没有完成的请求,则线程自旋等待
    // _mm_pause() 由DPDK库的librte_env实现
    // 若有完成的IO,则会依次调用其注册的回调函数:io_complete
    // 回调操作由本线程在此处完成
    if (inflight) {
      if (!spdk_nvme_qpair_process_completions(qpair, g_conf->bluestore_spdk_max_io_completion)) {
        dout(30) << __func__ << " idle, have a pause" << dendl;
        _mm_pause();
      }
    }
    
    // 由于t还未被初始化,首次将跳过此for循环,之后t会被赋值,下一轮循环就可能进入for循环中
    for (; t; t = t->next) {
      t->queue = this;
      lba_off = t->offset / sector_size;
      lba_count = t->len / sector_size;
      switch (t->command) {
        case IOCommand::WRITE_COMMAND:
        {
          // 分配并拷贝task的数据区域内存,若分配失败则goto again重试
          r = alloc_buf_from_pool(t, true);
          ...
          //提交写请求到qpair,参数含义如下
          // ns      提交 I/O.的namespace
          // qpair   提交 I/O 的qpair
          // lba_off 写请求起始LBA号.
          // lba_count   写请求的LBA数量
          // io_complete 写请求完成时的回调函数
          // t  io_complete回调时传入的参数.
          // io_flags        I/O flag
          // data_buf_reset_sgl  重置数据区域的回调函数.
          // data_buf_next_sge   遍历数据各内存区域的回调函数
          r = spdk_nvme_ns_cmd_writev(
              ns, qpair, lba_off, lba_count, io_complete, t, 0,
              data_buf_reset_sgl, data_buf_next_sge);
          ...
          break;
        }
        case IOCommand::READ_COMMAND:
        {
          // 分配task的数据区域内存,不做拷贝,若分配失败则goto again重试
          r = alloc_buf_from_pool(t, false);
          ...
          //提交读请求到qpair,其参数含义与写一致
          r = spdk_nvme_ns_cmd_readv(
              ns, qpair, lba_off, lba_count, io_complete, t, 0,
              data_buf_reset_sgl, data_buf_next_sge);
          ...
          break;
        }
        case IOCommand::FLUSH_COMMAND:
        {
          // 暂无使用FLUSH_COMMAND的场景
          ...
          break;
        }
      }
    }

    if (!queue_empty.load()) {
      // queue非空时,取出队首的Task,下一轮循环时,由上面的for循环处理
      Mutex::Locker l(queue_lock);
      if (!task_queue.empty()) {
        t = task_queue.front();
        task_queue.pop();
        logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
      }
      if (!t)
        queue_empty = true;
    } else {
      // 队列为空时,唤醒上层因flush阻塞的线程,因为没有在途IO,说明所有数据已经安全落盘,flush可以安全返回
      if (flush_waiters.load()) {
        Mutex::Locker l(flush_lock);
        if (*flush_waiter_seqs.begin() <= completed_op_seq.load())
          flush_cond.Signal();
      }

      if (!inflight) {
        ...
        Mutex::Locker l(queue_lock);
        if (queue_empty.load()) {
          // 运行到此处,说明本轮循环开始时没有在途IO,到本轮循环结束时也没有在途IO
          // 满足线程安全退出的条件
          // 检查是否在外层设置了aio_stop标示,若设置了,则本线程退出,停止IO处理
          if (aio_stop)
            break;
          queue_cond.Wait(queue_lock);
        }
      }
    }
  }
  ...
}

FLUSH

int NVMEDevice::flush()
{
  ...
  SharedDriverQueueData *queue = driver->get_queue(queue_id);
  assert(queue != NULL);
  queue->flush_wait();
  ...
}
// SharedDriverQueueData的flush_wait()实现
 void flush_wait() { 
    uint64_t cur_seq = queue_op_seq.load();
    uint64_t left = cur_seq - completed_op_seq.load();

    if (cur_seq > completed_op_seq) {
      // 存在在途IO,则等待,在_aio_thread()中处理完在途IO后会唤醒本flush线程,
      // 唤醒后再次检查queue_op_seq和completed_op_seq,确认没有在途IO后,flush才能返回
      Mutex::Locker l(flush_lock);
      ++flush_waiters;
      flush_waiter_seqs.insert(cur_seq);
      while (cur_seq > completed_op_seq.load()) {
       flush_cond.Wait(flush_lock);
      }
      flush_waiter_seqs.erase(cur_seq);
      --flush_waiters; 
    }      
  }  
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354