4.3 RDD操作
RDD提供了一个抽象的分布式数据架构,我们不必担心底层数据的分布式特性,而应用逻辑可以表达为一系列转换处理。
通常应用逻辑是以一系列转换(Transformation)和执行(Action)来表达的,前者在RDD之间指定处理的相互依赖关系,后者指定输出的形式。
其中:
□转换:是指该操作从已经存在的数据集上创建一个新的数据集,是数据集的逻辑操作,并没有真正计算。
□执行:是指该方法提交一个与前一个Action之间的所有Transformation组成的Job进行计算,Spark会根据Action将作业切分成多个Job。
比如,Map操作传递数据集中的每一个元素经过一个函数,形成一个新的RDD转换结果,而Reduce操作通过一些函数对RDD的所有元素进行操作,并返回最终结果给Driver程序。
在默认情况下,Spark所有的转换操作都是惰性(Lazy)的,每个被转换得到的RDD不会立即计算出结果,只是记下该转换操作应用的一些基础数据集,可以有多个转换结果。转换只有在遇到一个Action时才会执行,如图4-2所示。
[插图]
图4-2 Spark转换和执行
这种设计使得Spark以更高的效率运行。例如,可以通过将要在Reduce操作中使用的Map转换来创建一个数据集,并且只返回Reduce的结果给驱动程序,而不是整个Map所得的数据集。
每当一个Job计算完成,其内部的所有RDD都会被清除,如果在下一个Job中有用到其他Job中的RDD,会引发该RDD的再次计算,为避免这种情况,我们可以使用Persist(默认是Cache)方法“持久化”一个RDD到内存中。在这种情况下,Spark将会在集群中保留这个RDD,以便其他Job可以更快地访问,另外,Spark也支持持久化RDD到磁盘中,或者复制RDD到各个节点。
下面,通过几行简单的程序,进一步说明RDD的基础知识。
val lines=sc.textFile("data.txt")
val lineLengths=lines.map(s=>s.length)
val totalLength=lineLengths.reduce((a,b)=>a+b)
第一行读取外部文件data.txt返回一个基础的MappedRDD,该MappedRDD并不加载到内存中或被执行操作,lines只是记录转换操作结果的指针。
第二行定义了lineLengths作为一个Map转换的结果,由于惰性机制的存在,lineLengths的值不会立即计算。
最后,运行Reduce,该操作为一个Action。Spark将计算打散成多个任务以便在不同的机器上分别运行,每台机器并行运行Map,并将结果进行Reduce操作,返回结果值Driver程序。
如果需要继续使用lineLengths,可以添加缓存Persist或Cache,该持久化会在执行Reduce之前,第一次计算成功之后,将lineLengths保存在内存中。
4.3.1 转换操作
转换操作是RDD的核心之一,通过转换操作实现不同的RDD结果,作为下一次RDD计算的数据输入,转换操作不会触发Job的提交,仅仅是标记对RDD的操作,形成DAG图,以供Action触发Job提交后调用。
常用的转换操作包括:基础转换操作和键-值转换操作。
1.基础转换操作
表4-2列出了目前支持的基础转换操作,具体内容请参见RDD的API官方文档,以获得更多的细节。
表4-2 基础转换操作
[插图]
(续)
[插图]
2.键-值转换操作
尽管大多数Spark操作都基于包含各种类型对象的RDD,但是一小部分特殊的却只能在键-值对形式的RDD上执行。其中,最普遍的就是分布式“洗牌”(shuffle)操作,比如通过键进行分组或聚合元素。
例如,使用reduceByKey操作对文件中每行出现的文字次数进行计数,各种语言的示例如下。
在Scala中,只要在程序中导入org.apache.spark.SparkContext,就能使用Spark的隐式转换,这些操作就可用于包含二元组对象的RDD(Scala中的内建元组,可通过(a,b)创建),键-值对操作可用PairRDDFunction类,如果导入了转换,该类将自动封装元组RDD。
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
基于counts,可以使用counts.sortByKey()按字母表顺序对这些键-值对排序,然后使用counts.collect(),以对象数组的形式向Driver返回结果。
顺便说一句,进行分组的groupByKey不进行本地合并,而进行聚合的reduceByKey会在本地对每个分区的数据合并后再做Shuffle,效率比groupByKey高得多。下面通过几行基于Scala的代码对键-值转换操作进行说明。
// 初始化List
scala>val data = List(("a",1),("b",1),("c",1),("a",2),("b",2),("c",2))
data: List[(String, Int)] = List((a,1), (b,1), (c,1), (a,2), (b,2), (c,2))
// 并行数组创建RDD
scala>val rdd =sc.parallelize(data)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0]
// 按照key进行reduceByKey操作
scala>val rbk = rdd.reduceByKey(_+_).collect
rbk: Array[(String, Int)] = Array((a,3), (b,3), (c,3))
// 按照key进行groupByKey操作
scala>val gbk = rdd.groupByKey().collect
gbk: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2)), (b,
CompactBuffer(1, 2)), (c,CompactBuffer(1, 2)))
// 按照key进行sortByKey操作
scala>val sbk = rdd.sortByKey().collect
sbk: Array[(String, Int)] = Array((a,1), (a,2), (b,1), (b,2), (c,1), (c,2))
表4-3列出了常用的健-值转换。
表4-3 常用的键-值转换
[插图]
4.3.2 执行操作
Spark将提交的Action与前一个Action之间的所有Transformation组成的Job进行计算,并根据Action将作业切分成多个Job,指定Transformation的输出结果。
1.常用执行操作
这里以加载Spark自带的本地文件README.md文件进行测试,返回一个MappedRDD文件,进行Filter转换操作和Count执行。
// 读取README.md数据,并转化为RDD
scala>val data = sc.textFile("file:///$SPARK_HOME/README.md")
data: org.apache.spark.rdd.RDD[String] = file:///$SPARK_HOME/README.md MappedRDD[1]
// 执行f ilter操作,提取带有"Spark"的子集
scala>val datafilter = data.filter(line =>line.contains("Spark"))
datafilter: org.apache.spark.rdd.RDD[String] = FilteredRDD[2]
// 执行Action操作,输出结果
scala>val datacount = datafilter.count()
datacount: Long = 21
如果想了解更多,请参考表4-4中列出的常用的执行操作。
表4-4 常用的执行操作
[插图]
通过常用执行操作,Spark可以实现大部分MapReduce流式计算的任务,提升了计算效率,对Transformation操作进行结果值输出。
2.存储执行操作
常用存储操作主要包含的执行如表4-5所示。
表4-5 常用存储操作包含的执行
[插图]
存储执行操作将结果进行保存,以文本、序列化文件、对象文件的方式输出到存储设备进行持久化。
4.3.3 控制操作
控制操作主要包括故障恢复、数据持久性,以及移除数据。其中,缓存操作Cache/Pesist是惰性的,在进行执行操作时才会执行,而Unpesist是即时的,会立即释放内存。checkpoint会直接将RDD持久化到磁盘或HDFS等路径,不同于Cache/Persist的是,被checkpoint的RDD不会因作业的结束而被消除,会一直存在,并可以被后续的作业直接读取并加载。
1. RDD故障恢复
在一个典型的分布式系统中,容错机制主要是采取检查点(checkpoint)机制和数据备份机制。故障恢复是由主动检查,以及不同机器之间的数据复制实现的。由于进行故障恢复需要跨集群网络来复制大量数据,这无疑是相当昂贵的。因此,在Spark中则采取了不同的方法进行故障恢复。
作为一个大型的分布式集群,Spark针对工作负载会做出两种假设:
□处理时间是有限的;
□保持数据持久性是外部数据源的职责,主要是让处理过程中的数据保持稳定。
基于假设,Spark在执行期间发生数据丢失时会选择折中方案,它会重新执行之前的步骤来恢复丢失的数据,但并不是说丢弃之前所有已经完成的工作,而重新开始再来一遍。
假如其中一个RDD坏掉,RDD中有记录之前的依赖关系,且依赖关系中记录算子和分区。此时,仅仅需要再执行一遍父RDD的相应分区。
但是,跨宽依赖的再执行能够涉及多个父RDD,从而引发全部的再执行。为了规避这一点,Spark会保持Map阶段中间数据输出的持久,在机器发生故障的情况下,再执行只需要回溯Mapper持续输出的相应分区,来获取中间数据。
Spark还提供了数据检查点和记录日志,用于持久化中间RDD,这样再执行就不必追溯到最开始的阶段。通过比较恢复延迟和检查点开销进行权衡,Spark会自动化地选择相应的策略进行故障恢复。
2. RDD持久化
Spark的持久化,是指在不同转换操作之间,将过程数据缓存在内存中,实现快速重用,或者故障快速恢复。持久化主要分为两类,主动持久化和自动持久化。
主动持久化,主要目标是RDD重用,从而实现快速处理,是Spark构建迭代算法的关键。例如,持久化一个RDD,每一个节点都将把它的计算分块结果保存在内存中,并在该数据集(或者衍生数据集)进行的后续Action中重用,使得后续Action执行变得更加迅速(通常快10倍)。
可以使用persist()方法标记一个持久化的RDD,一旦被一个执行(action)触发计算,它将会被保留在计算节点的内存中并重用。如果RDD的任一分区丢失,通过使用原先创建的转换操作,它将会被自动重算,不需要全部重算,而只计算丢失的部分。
此外,每一个RDD都可以用不同的保存级别进行保存,从而允许持久化数据集在硬盘或内存作为序列化的Java对象(节省空间),甚至跨节点复制。
持久化的等级选择,是通过将一个StorageLevel对象传递给persist()方法进行确定的,cache()方法调用persist()的默认级别MEMORY_ONLY。表4-6是持久化的等级。
表4-6 持久化的等级
[插图]
相对于MEMORY_ONLY_SER,OFF_HEAP减小了垃圾回收的开销,同时也允许Executor变得更小且可共享内存储备,Executor的崩溃不会导致内存中的缓存丢失。在这种模式下,Tachyon中的内存是不可丢弃的。
自动持久化,是指不需要用户调用persist(),Spark自动地保存一些Shuffle操作(如reduceByKey)的中间结果。这样做是为了避免在Shuffle过程中一个节点崩溃时重新计算所有的输入。
持久化时,一旦设置了就不能改变,想要改变就要先去持久化。推荐用户在重用RDD结果时调用Persist,这样会使持久化变得可控。
Persist持久化RDD,修改了RDD的meta info中的StorageLevel。而检查点在持久化的同时切断Lineage,修改了RDD的meta info中的Lineage。二者均返回经过修改的RDD对象自身,而非新的RDD对象,也均属于Lazy操作。
3. 选择存储等级
Spark的不同存储级别,旨在满足内存使用和CPU效率权衡上的不同需求,建议通过以下步骤进行选择:
□如果你的RDD可以很好地与默认的存储级别(MEMORY_ONLY)契合,那么就不需要做任何修改。这已经是CPU使用效率最高的选项,它使RDD的操作尽可能快。
□如果不能与MEMORY_ONLY很好地契合,建议使用MEMORY_ONLY_SER并选择一个快速序列化的库,使对象在有较高空间使用率的情况下,依然可以较快地被访问。
□尽可能不要存储数据到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度与从硬盘中读取的效率差不多。
□如果想拥有快速故障恢复能力,可使用复制存储级别(例如,用Spark来响应Web应用的请求)。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在RDD上持续地运行任务,而不需要等待丢失的分区被重新计算。
□如果想要定义自己的存储级别(如复制因子为3而不是2),可以使用StorageLevel单例对象的apply()方法。
4. 移除数据
RDD可以随意在RAM中进行缓存,因此它提供了更快速的数据访问。目前,缓存的粒度为RDD级别,只能缓存全部的RDD。
Spark自动监视每个节点上使用的缓存,在集群中没有足够的内存时,Spark会根据缓存情况确定一个LRU(Least Recently Used,最近最少使用算法)的数据分区进行删除。
如果想手动删除RDD,而不想等待它从缓存中消失,可以使用RDD的unpersist()方法移除数据,unpersist()方法是立即生效的。