PPT的内容是去年在部门内部做的关于Presto入门分享,主要涉及的内容如下图所示:
由于简书不支持嵌入iframe,所以具体的内容放在腾讯文档上面了:
Presto中有很多的基本概念,只有理解好这些基本概念,我们才能更好的理解Presto,下面是我梳理的各种概念的脑图:
接下来的部分是关于当时分享后的一些Q&A:
1.Presto的task是如何在Worker节点间分配的,按什么策略进行分配的?
Presto中task
可以分为两种,一种是leaf stages
的task,另一种是intermediate stages
的task。
(1)leaf stages的task一般都是表扫描,读取数据,过滤、解压、解码等操作,这些操作可以进行高度的并行化。所以在这一阶段源数据会被划分成尽可能多的splits
,因此每一个Worker上都可能会启动一个task来处理分配到此Worker上的splits(而每个split的大小,Worker上可被处理的split个数都是由相应的参数来控制的)。
(2)对于intermediate stages的中的每一个stage
的task数,又是基于多种配置来确定的,如connector
的配置,执行计划的属性,中间数据的位置以及集群部署配置等信息来确定的。
(3)集群常用的配置信息如下所示:
- 内存管理相关的参数:
user memory
一般是输入文件以及执行程序时所使用的内存,system memory
一般是伴随着程序执行而使用到的内存,如shuffle buffers等。
参数 | 示意 |
---|---|
query.max-memory-per-node | 每个Worker上一个query可用的最大user memory,默认JVM max memory * 0.1 |
query.max-total-memory-per-node | 每个Worker上一个query可用的最大user memory+system memory,默认JVM max memory * 0.3 |
query.max-memory | 整个集群中一个query可用的最大user memory,默认20GB |
query.max-total-memory | 整个集群中一个query可用的最大user memory+system memory,默认query.max-memory * 2 |
- 节点管理相关的参数:
参数 | 示意 |
---|---|
node-scheduler.max-splits-per-node | 每个Worker可被执行的最大splits数,默认100 |
node-scheduler.max-pending-splits-per-task | 每个Worker上,每个query的stage的task的等待队列中可容纳的最大splits数,默认10 |
- hive-connector相关参数:
参数 | 示意 |
---|---|
hive.max-split-size | hive-split的最大size,默认64M |
总的来说,一个stage可以被划分成多个task,每一个task只会运行在一个Worker上(比如我们线上Presto集群有20个节点,那么一个query当中的一个stage最多可以有20个task在并行执行)。而每个stage具体分配多少个task是和相应的配置信息息息相关的。
2.Presto处理压缩文件时内存如何分配,如何处理压缩文件,内存大小如何做预估?
Presto是通过插件的方式来支持不同类型的文件格式,如ORC、Parquet以及RCFile等。对于不同格式的文件会有对应reader
,比如对于Parquet文件就对应着ParquetReader,ParquetReader主要的作用是通过hive-connector获得源数据的信息(包括目标文件在hdfs的block列表,以及地址等信息),并将数据转换成Presto中对应的数据对象(如split
,page
,block
等)。接着会根据列不同的数据类型(int、boolean、timestamp等)来读取相应column下的数据。在读取数据的时候,如果数据是压缩数据,首先会对数据进行解压后再做处理。读取压缩数据并解压的代码如下所示:
对于parquet文件,其PageHeader类中有记录当前page的压缩和未压缩文件大小,所以在解压的时候,程序知道需要向
MemoryPool
申请多少内存。Presto的内存管理涉及的内容比较多,我这里就简单做一下介绍,Presto在启动的时候会新建一个MemoryPool
,所有的任务都是通过该MemoryPool去申请内存,而Presto在执行任务的每一个阶段都会有相应的MemoryContext
记录该阶段所使用的的内存详情。而Presto可以通过各种参数(如上面提到过的几个内存管理相关的参数)来限制一个query
在集群中可用的最大内存以及一个query在每一个Worker上可用的内存,只要当前MemoryPool还有可用的内存并且申请的内存在参数限制的范围内,相应的query就能够获得执行内存。如果query未申请到相应的内存会有相应的query kill策略进行处理。
3.task挂了,是不是所有的stage都要重新执行,以及Presto是如何做容错的?
目前,Presto可以通过低级别的重试从临时的错误中恢复(比如,出现请求错误时,会发送多次请求来获得结果)。至于整个任务的失败,需要依靠client端的重新发起查询请求。目前,在facebook的生产环境中是通过额外的机制保证某种特定场景下的高可用(如standby coordinators、run multiple active cluster以及通过监控系统来确定失败的节点并将节点移出集群再重新加入集群等)。Presto后续会添加对于长时间查询任务的容错支持,如添加checkpointing以及执行计划子树的重试(不过,就目前更新的版本来看还未添加该特性)。
4.sql的执行计划以及结果等是否缓存?相同的表不同where条件是否有做缓存?
该部分未找到可以参考的资料,但是从如下三方面来看,数据应该是未做缓存的:
(1)从源码来看在Coordinator中不管是生成语法树、逻辑执行计划以及物理执行计划时,都未看到有做缓存处理;
(2)而在Worker上,每一个task都有与之对应的taskid,stage间的不同的task 在进行shuffle的时候都是通过唯一的taskid来进行数据传输的,而每次任务请求的taskid各不相同。并且shuffle请求的client端在接收到了shuffle的server端发送的结果数据后,会向server端发送token确认信息,server端确认client端接收到数据之后会删除结果数据,因此Worker端也未缓存查询的结果以及中间结果数据。
(3)在Presto集群比较空闲的情况下,一条sql在Presto中执行多次,查询时间以及资源利用情况基本差不多,未见明显差异。
参考资料:
[1] Presto: SQL on Everything https://prestosql.io/Presto_SQL_on_Everything.pdf
[2] Presto配置信息 https://prestodb.io/docs/current/admin/properties.html
[3] Presto 0.214版源码