NVMEDevice
是基于SPDK针对NVME设备的一种BlockDevice
实现,模块的主要类图如下:
接下来对主要的流程进行分析
初始化设备 NVMEDevice::open
配置项如:
bluestore_block_path = spdk:55cd2e404bd73932
int NVMEDevice::open(const string& p, int path_fd)
{
string serial_number;
int fd = ::open(p.c_str(), O_RDONLY | O_CLOEXEC);
...
char buf[100];
r = ::read(fd, buf, sizeof(buf));
...
// 读取到设备sn
serial_number = string(buf, i);
// 调用NVMEManager进行设备加载
r = manager.try_get(serial_number, &driver);
// 将NVMEDevice注册到Driver
driver->register_device(this);
block_size = driver->get_block_size();
size = driver->get_size();
name = serial_number;
...
return 0;
}
主要的初始化工作在manager.try_get
中完成,其中进行NVME设备的发现、注册
int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
{
// 处理参数bluestore_spdk_coremask指定的core mask
// 需要至少2个core来运行spdk
...
// 首次init = false
if (!init) {
init = true;
// 启动一个线程,监视probe_queue,当其中有ProbeContext插入时,进行 spdk_nvme_probe 操作来发现设备
// 省略部分不重要的细节,保留主要代码逻辑
// probe_cb 用于指示是否连接设备
// attach_cb 用于自定义连接设备后的操作
dpdk_thread = std::thread(
[this, coremask_arg, m_core_arg, mem_size_arg]() {
...
// env 参数初始化
spdk_env_opts_init(&opts);
...
// 初始化env, DPDK库
spdk_env_init(&opts);
...
std::unique_lock<std::mutex> l(probe_queue_lock);
while (true) {
if (!probe_queue.empty()) {
ProbeContext* ctxt = probe_queue.front();
probe_queue.pop_front();
// 遍历总线NVME设备,并通过uio/vfio将其连接到用户态NVME设备驱动
// 是否连接设备,取决于probe_cb返回值,返回true的设备才连接
r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL);
...
probe_queue_cond.notify_all();
} else {
probe_queue_cond.wait(l);
}
}
}
);
// 用于probe的dpdk_thread将一直运行,监听probe_queue,重复设备发现过程
dpdk_thread.detach();
}
// 向probe_queue插入本次需要发现设备的ProbeContext
// 触发dpdk_thread的设备probe
ProbeContext ctx = {sn_tag, this, nullptr, false};
{
std::unique_lock<std::mutex> l(probe_queue_lock);
probe_queue.push_back(&ctx);
while (!ctx.done)
probe_queue_cond.wait(l);
}
// 等待本次ProbeContext处理完成以后,获得SharedDriverData实例:ctx.driver
if (!ctx.driver)
return -1;
// 至此,NVMEDevice与SharedDriverData的关联关系建立起来,为接下来的IO做好了准备
*driver = ctx.driver;
return 0;
}
在上面的probe流程中,有两个最关键的函数:probe_cb和attach_cb,接下来进行分析
probe_cb
注意参数cb_ctx
即为此前spdk_nvme_probe
调用时的ProbeContext实例指针
static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
{
NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
char serial_number[128];
struct spdk_pci_addr pci_addr;
struct spdk_pci_device *pci_dev = NULL;
int result = 0;
// 非本地NVME设备不做连接,还可能存在远程的基于NVMe-oF的设备(NVMe over Fabrics),Ceph仅使用本地NVME设备
if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
dout(0) << __func__ << " only probe local nvme device" << dendl;
return false;
}
// 获取设备PCI地址,获取失败的设备不做连接
result = spdk_pci_addr_parse(&pci_addr, trid->traddr);
if (result) {
dout(0) << __func__ << " failed to get pci address from %s, " << trid->traddr << " return value is: %d" << result << dendl;
return false;
}
// 获取PCI设备信息,获取失败的设备不连接
pci_dev = spdk_pci_get_device(&pci_addr);
if (!pci_dev) {
dout(0) << __func__ << " failed to get pci device" << dendl;
return false;
}
// 读取设备SN,读取失败的设备不连接
result = spdk_pci_device_get_serial_number(pci_dev, serial_number, 128);
if (result < 0) {
dout(10) << __func__ << " failed to get serial number from %p" << pci_dev << dendl;
return false;
}
// 对比配置参数的SN和设备的SN,若不一致,则为其它的非指定NVME设备,不连接
if (ctx->sn_tag.compare(string(serial_number, 16))) {
dout(0) << __func__ << " device serial number (" << ctx->sn_tag << ") not match " << serial_number << dendl;
return false;
}
// 一切正常,且SN匹配的设备,连接
return true;
}
attach_cb
static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
{
struct spdk_pci_addr pci_addr;
struct spdk_pci_device *pci_dev = NULL;
spdk_pci_addr_parse(&pci_addr, trid->traddr);
pci_dev = spdk_pci_get_device(&pci_addr);
...
NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
// 注册NVME设备控制器
ctx->manager->register_ctrlr(ctx->sn_tag, ctrlr, pci_dev, &ctx->driver);
}
// 设备控制器注册逻辑
void register_ctrlr(const string &sn_tag, spdk_nvme_ctrlr *c, struct spdk_pci_device *pci_dev,
SharedDriverData **driver) {
// 确保manage的互斥锁,因为注册操作非线程安全
assert(lock.is_locked());
// 获取设备的namespace数量,至少要有1个,若超过1个,也仅使用第一个namespace
spdk_nvme_ns *ns;
int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
assert(num_ns >= 1);
...
ns = spdk_nvme_ctrlr_get_ns(c, 1);
...
// 实际上,manager仅管理一个driver,因为现在的版本,OSD只能使用一个NVME设备
assert(shared_driver_datas.empty());
// 初始化SharedDriverData并将其加入driver列表,实际上只会有一个driver
shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns));
*driver = shared_driver_datas.back();
}
// SharedDriverData初始化逻辑
SharedDriverData(unsigned _id, const std::string &sn_tag,
spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
: id(_id),
sn(sn_tag),
ctrlr(c),
ns(ns) {
int i;
// 获得size、sector size等基本信息
sector_size = spdk_nvme_ns_get_sector_size(ns);
block_size = std::max(CEPH_PAGE_SIZE, sector_size);
size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
// 之前通过core mask指定了可用的核,这里将在除了主核之外的可用核上,各创建一个SharedDriverQueueData
RTE_LCORE_FOREACH_SLAVE(i) {
queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
}
// 调用每个SharedDriverQueueData的start()方法
_aio_start();
}
// SharedDriverQueueData的初始化
SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
: driver(driver),
...
// 指定queue的执行函数为 _aio_thread()
run_func([this]() { _aio_thread(); }),
completed_op_seq(0), queue_op_seq(0) {
// 核心就一行代码,创建spdk nvme qpair
qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT);
...
}
// SharedDriverQueueData的start()
void start() {
// DPDK提供的函数,启动一个线程,执行初始化时指定的函数:_aio_thread(),
// 并设置线程的CPU亲和性,指定到core_id对应的核
// 达到在一个逻辑核上启动一个工作线程的目的
int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
core_id);
assert(r == 0);
}
_aio_thread()
的流程,结合后续的读写流程来讨论
WRITE
Bluestore的写流程中,在_txc_add_transaction
过程中会调用BlockDevice
的aio_write
,
aio_write
过程主要完成IOContext的装配工作
在Bluestore的_txc_state_proc
状态机中,对于SimpleWrite会在STATE_PREPARE
阶段调用BlockDevice
的aio_submit
提交IOContext
对于defferedWrite,会在kv_finalize_thread
中调用BlockDevice
的aio_submit
提交IOContext
接下来分析NVMEDevice
的aio_write
和aio_submit
过程
int NVMEDevice::aio_write(
uint64_t off,
bufferlist &bl,
IOContext *ioc,
bool buffered)
{
uint64_t len = bl.length();
// 确保IO的合法性,offset和length必须与block_size对齐,且不会越出设备大小边界
assert(off % block_size == 0);
assert(len % block_size == 0);
assert(len > 0);
assert(off < size);
assert(off + len <= size);
// 初始化NVMEDevice特有的Task
Task *t = new Task(this, IOCommand::WRITE_COMMAND, off, len);
t->write_bl = std::move(bl);
if (buffered) {
// buffered write,默认关闭,可通过参数打开
// 直接将Task提交给SharedDriverQueueData,将被它的_aio_thread()处理
if(queue_id == -1)
queue_id = ceph_gettid();
driver->get_queue(queue_id)->queue_task(t);
} else {
// 默认的非Buffered write,此处仅进行IOContext组装,并不提交IO
// 经由后续的aio_submit来提交Task
t->ctx = ioc;
Task *first = static_cast<Task*>(ioc->nvme_task_first);
Task *last = static_cast<Task*>(ioc->nvme_task_last);
if (last)
last->next = t;
if (!first)
ioc->nvme_task_first = t;
ioc->nvme_task_last = t;
++ioc->num_pending;
}
return 0;
}
void NVMEDevice::aio_submit(IOContext *ioc)
{
int pending = ioc->num_pending.load();
Task *t = static_cast<Task*>(ioc->nvme_task_first);
// num_pending在aio_write时自增,ioc->nvme_task_first也被设置为Task类型实例指针
// 对于默认的非buffered write来说,此处的条件总是满足
if (pending && t) {
ioc->num_running += pending;
ioc->num_pending -= pending;
// 确认只有本线程在处理这个IOContext,不应存在多个线程处理同一个IOContext的情况
assert(ioc->num_pending.load() == 0);
// 提交Task到SharedDriverQueueData,将被它的_aio_thread()处理
if(queue_id == -1)
queue_id = ceph_gettid();
driver->get_queue(queue_id)->queue_task(t, pending);
ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
}
}
经过aio_submit
之后,Task被提交到SharedDriverQueueData
的task_queue
队列中
需要注意的是,Task通过其next指针将多个Task串联起来,入队列的只是head Task
READ
BlueStore
会调用BlockDevice
的read
和aio_read
两个方法来读取数据
相同点在于:
- 它们都会产生一个Task,类型为
READ_COMMAND
; - 都会分配一个页对齐的buffer用于接收读取到的数据
区别在于:
-
read
会直接提交Task到SharedDriverQueueData
的task_queue
,然后同步等待直至Task被处理完成,获得读取到的数据同步返回 -
aio_read
仅完成Task组装和IOContext组装,与aio_write
一样,需要经过aio_submit
来提交Task,为异步读取数据
_aio_thread()
由上面的分析可知,aio_write
和aio_read
操作最后都会将Task提交到SharedDriverQueueData
的task_queue
_aio_thread()
的处理逻辑如下:
void SharedDriverQueueData::_aio_thread()
{
// 准备数据buffer缓冲区
if (data_buf_mempool.empty()) {
for (uint16_t i = 0; i < data_buffer_default_num; i++) {
void *b = spdk_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
if (!b) {
derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
assert(b);
}
data_buf_mempool.push_back(b);
}
}
Task *t = nullptr;
int r = 0;
uint64_t lba_off, lba_count;
ceph::coarse_real_clock::time_point cur, start
= ceph::coarse_real_clock::now();
// 开始线程的处理逻辑循环
while (true) {
bool inflight = queue_op_seq.load() - completed_op_seq.load();
again:
// 当存在在途未完成的请求时,进行一次completion收割操作,若没有完成的请求,则线程自旋等待
// _mm_pause() 由DPDK库的librte_env实现
// 若有完成的IO,则会依次调用其注册的回调函数:io_complete
// 回调操作由本线程在此处完成
if (inflight) {
if (!spdk_nvme_qpair_process_completions(qpair, g_conf->bluestore_spdk_max_io_completion)) {
dout(30) << __func__ << " idle, have a pause" << dendl;
_mm_pause();
}
}
// 由于t还未被初始化,首次将跳过此for循环,之后t会被赋值,下一轮循环就可能进入for循环中
for (; t; t = t->next) {
t->queue = this;
lba_off = t->offset / sector_size;
lba_count = t->len / sector_size;
switch (t->command) {
case IOCommand::WRITE_COMMAND:
{
// 分配并拷贝task的数据区域内存,若分配失败则goto again重试
r = alloc_buf_from_pool(t, true);
...
//提交写请求到qpair,参数含义如下
// ns 提交 I/O.的namespace
// qpair 提交 I/O 的qpair
// lba_off 写请求起始LBA号.
// lba_count 写请求的LBA数量
// io_complete 写请求完成时的回调函数
// t io_complete回调时传入的参数.
// io_flags I/O flag
// data_buf_reset_sgl 重置数据区域的回调函数.
// data_buf_next_sge 遍历数据各内存区域的回调函数
r = spdk_nvme_ns_cmd_writev(
ns, qpair, lba_off, lba_count, io_complete, t, 0,
data_buf_reset_sgl, data_buf_next_sge);
...
break;
}
case IOCommand::READ_COMMAND:
{
// 分配task的数据区域内存,不做拷贝,若分配失败则goto again重试
r = alloc_buf_from_pool(t, false);
...
//提交读请求到qpair,其参数含义与写一致
r = spdk_nvme_ns_cmd_readv(
ns, qpair, lba_off, lba_count, io_complete, t, 0,
data_buf_reset_sgl, data_buf_next_sge);
...
break;
}
case IOCommand::FLUSH_COMMAND:
{
// 暂无使用FLUSH_COMMAND的场景
...
break;
}
}
}
if (!queue_empty.load()) {
// queue非空时,取出队首的Task,下一轮循环时,由上面的for循环处理
Mutex::Locker l(queue_lock);
if (!task_queue.empty()) {
t = task_queue.front();
task_queue.pop();
logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
}
if (!t)
queue_empty = true;
} else {
// 队列为空时,唤醒上层因flush阻塞的线程,因为没有在途IO,说明所有数据已经安全落盘,flush可以安全返回
if (flush_waiters.load()) {
Mutex::Locker l(flush_lock);
if (*flush_waiter_seqs.begin() <= completed_op_seq.load())
flush_cond.Signal();
}
if (!inflight) {
...
Mutex::Locker l(queue_lock);
if (queue_empty.load()) {
// 运行到此处,说明本轮循环开始时没有在途IO,到本轮循环结束时也没有在途IO
// 满足线程安全退出的条件
// 检查是否在外层设置了aio_stop标示,若设置了,则本线程退出,停止IO处理
if (aio_stop)
break;
queue_cond.Wait(queue_lock);
}
}
}
}
...
}
FLUSH
int NVMEDevice::flush()
{
...
SharedDriverQueueData *queue = driver->get_queue(queue_id);
assert(queue != NULL);
queue->flush_wait();
...
}
// SharedDriverQueueData的flush_wait()实现
void flush_wait() {
uint64_t cur_seq = queue_op_seq.load();
uint64_t left = cur_seq - completed_op_seq.load();
if (cur_seq > completed_op_seq) {
// 存在在途IO,则等待,在_aio_thread()中处理完在途IO后会唤醒本flush线程,
// 唤醒后再次检查queue_op_seq和completed_op_seq,确认没有在途IO后,flush才能返回
Mutex::Locker l(flush_lock);
++flush_waiters;
flush_waiter_seqs.insert(cur_seq);
while (cur_seq > completed_op_seq.load()) {
flush_cond.Wait(flush_lock);
}
flush_waiter_seqs.erase(cur_seq);
--flush_waiters;
}
}