先来看一篇分享:
什么是Spark中的RDD以及我们为什么需要它?
Spark一般已经过度使用Hadoop(MapReduce),因为它在迭代处理算法中更快地执行方面提供了好处。
Apache Spark已经过度采用了Hadoop(MapReduce),因为它在迭代处理算法(如机器学习)中的更快执行方面提供了多种好处。
在这篇文章中,我们将尝试理解是什么让Spark RDD在批量分析中如此有用。
为何选择RDD?
当涉及迭代分布式计算,即在诸如Logistic回归,K均值聚类,页面排名算法的计算中处理多个作业上的数据时,在多个作业之间重用或共享数据是相当常见的,或者您可能想要多个对共享数据集的即席查询。
在现有的分布式计算系统(例如MapReduce)中存在数据重用或数据共享的潜在问题,也就是说,您需要将数据存储在某些中间稳定的分布式存储(如HDFS或Amazon S3)中。这使得作业的整体计算变慢,因为它涉及过程中的多个IO操作,复制和序列化。
MapReduce中的迭代处理
RDD尝试通过启用容错分布式内存计算来解决这些问题。
Spark中的迭代处理
现在,让我们了解RDD究竟是什么以及它如何实现容错 -
RDD - 弹性分布式数据集
RDD是不可变和分区的记录集合,只能通过粗粒度操作(如map,filter,group by等)创建。通过粗粒度操作,这意味着操作应用于数据集中的所有元素。RDD只能通过从稳定存储(如HDFS)读取数据或通过现有RDD上的转换来创建。
现在,如何帮助容错?
由于RDD是通过一组转换创建的,因此它会记录这些转换,而不是实际数据。生成一个RDD的转换图称为Lineage Graph。
firstRDD = spark。textFile(“hdfs:// ...”)
secondRDD = firstRDD。filter(someFunction);
thirdRDD = secondRDD。map(someFunction);
例如 -
Spark RDD Lineage Graph
如果我们丢失了RDD的一些分区,我们可以在lineage中重放该分区上的转换来实现相同的计算,而不是跨多个节点进行数据复制。这个特性是RDD的最大好处,因为它节省了很多努力在数据管理和复制中,从而实现更快的计算。
上面的解释总结有两点:
1:重用和共享数据
2:容错
第一点我并不同意:如果是在多个作业间共享数据的话,无论是MR还是spark都会用磁盘作为重用的介质
第二点才是RDD出现的主要原因:第一如果你在MR中任务在某个地方出错,只能是全部重头算一遍,我们是想如果要避免每一次出错重算的话,我们是不是要缓存出错之前的数据集,另外还需要从出错点重试的机制,那么这一点在MR计算模型中是没有实现,当然你可以自己重写MR框架来实现这个模式,实现这个模式需要除了需要考虑之前的那两点,还需要考虑到出错之前的数据集如果数据缓存达到存储瓶颈时的要落盘,缓存数据需要重新就算时要从该数据集的上游去重新计算,也就是需要存储该数据集的上游信息,然后这个是分布式计算,是不是还要存储该数据集的分区情况,等等这些,其实就是对应了spark里面的RDD的设计思想,所以说spark中之所以有RDD这个概念,主要原因是为了实现容错的快速处理。
为了验证上面的说的,我们看看RDD的内部结构和出错的时的处理机制
RDD内部结构(RDD是一个抽象类,封装了通用的属性和方法)
比较重要的属性:dependencies和partition,其中dependencies记录的该RDD的依赖关系,partition记录了RDD的分区信息
比较重要的方法:cache和persisit,这个两个方法决定了RDD里面data缓存的级别,默认RDD是不缓存的,如果需要重用那么可以使用这个两个方法缓存(内存和磁盘可选)
预留问题:
1:没有缓存的RDD的数据是在什么时候被回收的?
2:task失败是如何使用血缘机制重新计算的?