FE
与 BE
交互
在 FE
端与 BE
端均存在一个任务 Queue
,如上图所示,从当前版本来看 thrift
实现的与 BE
之间的交互主要是用 AgentClient
来承载,而查看代码仅用于 snapshot
的管理过程,其他的并非通过该接口实现。
注
这里存在疑问 gRpc
和 thrift
接口混着用的目的,不太明确,有了解的小伙伴如果方便可以告知下我哟, 现谢谢咯
SQL
执行过程
如 Doris 源码分析 (二) 代码结构分析
中的主架构图所示,大多数是来自客户端请求,通过 QeServer
中封装的 MysqlServer
来接受 Mysql
客户端的请求,具体流程如下:
注
Fe
端主要以 java
为主,如下:
-
ReadListener
接受处理数据请求传递给ConnectProcessor.processOnce()
函数 - 通过
ConnectProcessor.dispatch()
函数将命令进行任务区分,以query
为例 - 通过
ConnectProcessor.handleQuery()
函数将originStmt
中含有的sql
转为StatementBase
实例列表 - 遍历
StatementBase
实例列表,将其逐一构建StmtExecutor
实例并执行 - 在
StmtExecutor.execute()
函数中将StatementBase
实例解析生成执行计划(详细参见:StmtExecutor.handleQueryStmt()
实现) - 根据
planner
等信息构建Coordinator
实例并执行执行计划 - 在
Coordinator.exec()
函数中,可以看到,将planner
进行分布式拆解,并生成为PlanFragment
然后通过调用Coordinator.sendFragment()
将所有的PlanFragment
任务进行下发到BE
端,同时包含状态的维护过程。 - 在
Coordinator.sendFragment()
中对每个PlanFragment
通过toThrift()
转化为TExecPlanFragmentParams
然后封装为BackendExecState
实例,然后调用BackendExecState.execRemoteFragmentAsync()
通过BackendServiceProxy.execPlanFragmentAsync
向BRpcService
后端服务发起execPlanFragment
请求(后端使用PInternalServiceImpl
来接受处理请求内容)
注
在 BackendServiceProxy
中主要是与 Be
端进行交互部分,接下拉是 Be
端处理过程,具体如下:
- 在
PInternalServiceImpl
实现中_exec_env->fragment_mgr()->exec_plan_fragment(t_request)
可以看到将所有请求直接交付给FragmentMgr
实例来完成对应的操作 - 在
FragmentMgr
中,先判断是否为事务操作, 并将任务传递给下游继续处理 (接下来,以非事务逻辑为例)
1. 事务操作转换为 `StreamLoadContext` 键后交由 `exec_env->load_stream_mgr()->put(stream_load_cxt->id, pipe)` 进行管理,然后由 `_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt, pipe)` 进行执行
2. 非事务操作,直接调用 `exec_plan_fragment()` 函数执行即可
- 在
exec_plan_fragment
将TExecPlanFragmentParams
数据封装为FragmentExecState
实例,将其转化为PlanFragmentExecutor
参数(在
PlanFragmentExecutor::prepare()
函数中2通过ExecNode::create_tree()
创建执行计划树ExecNode
),然后将其放入到线程池中进行执行即可:
// 从代码可看出执行过程由 FragmentMgr::_exec_actual() 函数来完成
// 其中主要是构建 PlanFragmentExecutor 实例并管理所有的 PlanFragment 执行状态
_thread_pool->submit_func(
std::bind<void>(&FragmentMgr::_exec_actual, this, exec_state, cb))
- 在
FragmentMgr::_exec_actual()
函数中直接会调用FragmentExecState.execute()
将PlanFragmentExecutor.open()
函数进行调用, 最终调用ExecNode.get_next()
函数来获取执行计划结束之后的数据内容
注
Be
端的执行计划树节点如下(参见 ExecNode::create_node()
函数实现):
Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs, ExecNode** node) {
std::stringstream error_msg;
VLOG_CRITICAL << "tnode:\n" << apache::thrift::ThriftDebugString(tnode);
switch (tnode.node_type) {
case TPlanNodeType::CSV_SCAN_NODE:
*node = pool->add(new CsvScanNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::MYSQL_SCAN_NODE:
#ifdef DORIS_WITH_MYSQL
*node = pool->add(new MysqlScanNode(pool, tnode, descs));
return Status::OK();
#else
return Status::InternalError(
"Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON");
#endif
case TPlanNodeType::ODBC_SCAN_NODE:
*node = pool->add(new OdbcScanNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::ES_SCAN_NODE:
*node = pool->add(new EsScanNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::ES_HTTP_SCAN_NODE:
*node = pool->add(new EsHttpScanNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::SCHEMA_SCAN_NODE:
*node = pool->add(new SchemaScanNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::OLAP_SCAN_NODE:
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new OlapScanNode(pool, tnode, descs));
}
return Status::OK();
case TPlanNodeType::AGGREGATION_NODE:
if (state->enable_vectorized_exec()) {
} else {
if (config::enable_partitioned_aggregation) {
*node = pool->add(new PartitionedAggregationNode(pool, tnode, descs));
} else {
*node = pool->add(new AggregationNode(pool, tnode, descs));
}
}
return Status::OK();
case TPlanNodeType::HASH_JOIN_NODE:
*node = pool->add(new HashJoinNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::CROSS_JOIN_NODE:
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new CrossJoinNode(pool, tnode, descs));
}
return Status::OK();
case TPlanNodeType::MERGE_JOIN_NODE:
*node = pool->add(new MergeJoinNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::EMPTY_SET_NODE:
*node = pool->add(new EmptySetNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::EXCHANGE_NODE:
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new ExchangeNode(pool, tnode, descs));
}
return Status::OK();
case TPlanNodeType::SELECT_NODE:
*node = pool->add(new SelectNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::OLAP_REWRITE_NODE:
*node = pool->add(new OlapRewriteNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::SORT_NODE:
if (state->enable_vectorized_exec()) {
} else {
if (tnode.sort_node.use_top_n) {
*node = pool->add(new TopNNode(pool, tnode, descs));
} else {
*node = pool->add(new SpillSortNode(pool, tnode, descs));
}
}
return Status::OK();
case TPlanNodeType::ANALYTIC_EVAL_NODE:
*node = pool->add(new AnalyticEvalNode(pool, tnode, descs));
break;
case TPlanNodeType::MERGE_NODE:
*node = pool->add(new MergeNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::UNION_NODE:
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new UnionNode(pool, tnode, descs));
}
return Status::OK();
case TPlanNodeType::INTERSECT_NODE:
*node = pool->add(new IntersectNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::EXCEPT_NODE:
*node = pool->add(new ExceptNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::BROKER_SCAN_NODE:
*node = pool->add(new BrokerScanNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::REPEAT_NODE:
*node = pool->add(new RepeatNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::ASSERT_NUM_ROWS_NODE:
*node = pool->add(new AssertNumRowsNode(pool, tnode, descs));
return Status::OK();
default:
map<int, const char*>::const_iterator i =
_TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
const char* str = "unknown node type";
if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
str = i->second;
}
error_msg << str << " not implemented";
return Status::InternalError(error_msg.str());
}
return Status::OK();
}