RDD(弹性分布式数据集)是Spark的核心概念,Spark在对数据进行操作时,不外乎创建RDD,转化RDD以及调用RDD操作进行求值。
3.1 RDD基础
Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中不同的节点上。
两种创建RDD的方法:
1)读取一个外部数据集
2)在驱动器程序里分发驱动器程序中的对象集合(并行化)。
创建出来的RDD支持两类操作:转化操作(transformation)和行动操作(action);转化操作会由一个RDD转化为另一个RDD,行动操作会返回一个结果,并且把结果返回到驱动器程序中,或者把结果存储在外部存储系统中。
Spark只会惰性计算这些RDD,他们只有第一次在一个行动操作中用到时才会真正计算。
如果想在多个行动操作中重用一个RDD,可以使用RDD.persist()让Saprk把这个RDD缓存下来。在第一次对持久化的RDD计算之后,Spark会把RDD的内容保存到内存中(以分区方式存储到集群中的各个服务器),在之后的行动操作中,就可以重用这些数据了。
pythonLines.persist
pythonLines.count()
pythonLines.first()
=======================================================
每个Spark程序或shell会话都按如下方式工作
(1)从外部数据创建输入RDD
(2)使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD
(3)告诉Spark对需要被重用的中间结果RDD执行persist()操作
(4)使用行动操作来触发一次并行计算,Spark会对计算进行优化后再执行
=======================================================
3.2 创建RDD
创建RDD最简单的方式是把程序中一个已有的集合传给SparkContext的parallelize()方法,除了开发原型和测试,这种方法用的不多,因为需要把整个数据先放在一台机器的内存中。
lines = sc.parallelize(['pandas','i like pandas'])
3.3 RDD操作
转化操作:返回的是RDD数据
行动操作:返回的是非RDD数据
3.3.1 转化操作
通过转化操作后得到的RDD是惰性求值的,只有在行动操作中用到RDD时才会被计算。许多转化操作都是针对各个元素的,也就是说,这些转化操作每次只会操作RDD中的一个元素。转化操作可以操作任意数量的输入RDD。
通过转化操作,从已有的RDD中派生出新的RDD,Spark会使用谱系图来记录这些不同RDD之间的依赖关系。Spark需要这些信息来按需计算每个RDD,也可以依赖谱系图在持久化RDD丢失部分数据时恢复所丢失的数据。如下图:
3.3.2 行动操作
take()函数可以获取部分RDD中的元素,并在本地进行打印等操作。
collect()函数可以用来获取整个RDD中的数据,当程序将RDD筛选到一个很小体量的规模时,如当整个数据集能在单台机器的内存中放得下时,才可以在本地使用collect()函数,collect()函数不能用于大规模数据。
一般RDD不能通过collect()的收集到驱动器程序中,因为结果很大,我们可以将数据写到分布式存储系统(HDFS),可以使用saveAsTextFile()、saveAsSequenceFile()或者其他的行动操作来把RDD的数据内容以各种自带的格式保存起来。
——将中间结果持久化会加快速度
3.3 惰性求值
惰性求值意味着对RDD调用转化操作并不是立即执行,相反,Spark会在内部记录下所要求执行的操作的相关信息,即每个RDD相当于记录下如何计算数据的指令列表(不是一个存着运行程序后的切实结果),因此用sc.textFile()时,数据也并没有读取进来,而是在需要的时候读取(所以可能会被多次操作)。
可以通过调用一个简单的行动操作来对转化操作进行强制执行,如count()
3.4 向Spark传递函数
主要介绍传递Python版本的函数
在Python中以三种方式把函数传递给Spark:1)lambda;2)顶层函数;3)自定义的局部函数。
word = rdd.filter(lambda s: 'error' in s)
def containsError(s):
return 'error' in s
word = rdd.filter(containsError)
【注意】传递函数时,Python会在不经意间把函数所在的对象也序列化传出去(函数需要的只是处理某个对象的部分内容,但函数会将整个对象传出去),造成的问题是传递的内容过大会造成性能降低或者传递的类里面包含Python不知道如何序列化的传输对象而报错(程序不能运行)
解决方案:先把需要的字段从对象中拿出来放到一个局部变量中,然后传递这个局部变量
calss WordFunctions(object):
...
def getMatchesNoReference(self,rdd):
#安全:只把需要的字段提取到局部变量中,避免函数传递整个self
query = self.query
return rdd.filter(lambda x :query in x)
3.5 常见的转化操作和行动操作
一般通用的操作和针对特殊RDD数据的操作
3.5.1 基本RDD
通用的操作
1 针对各个元素的转化操作
map()转化操作,接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应元素的值。
filter()转化操作。接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。
map()函数的返回值类型不需要和输入类型一样
flatMap()接收一个函数,实现对每个输入元素生成多个输出“元素”,不过返回的不是一个元素,而是一个返回值序列的迭代器。最后输出的RDD是一个包含各个迭代器可访问的所有元素的RDD。
lines = sc.parallelize(['hello world','hi'])
words = lines.map(lambda line:line.split())
for i in words.collect():
print(i)
#结果
['hello', 'world']
['hi']
使用flatMap:
lines = sc.parallelize(['hello world','hi'])
words = lines.flatMap(lambda line:line.split())
for i in words.collect():
print(i)
#结果
hello
world
hi
flatMap()相当如将返回的迭代器“拍扁”,这样得到一个由各个列表中的元素组成的RDD,而不是一个由列表组成的RDD。
2 伪集合操作
RDD类似于集合,可以支持并(union)、交(intersection)、移除(subtract)和去重等操作,要求必须是同一种类型的数据,但RDD不是集合,因为允许重复元素,去重操作RDD.distinct()开销很大,需要将所有数据通过网络进行混洗(shuffle)。
RDD1.union(RDD2)返回包含两个RDD中所有元素的RDD,允许重复元素
RDD1.intersection(RDD2)只返回两个RDD都有的元素,返回的是无重复的元素集合,所以运行缓慢
RDD1.subtract(RDD2)返回一个由只存在于RDD1而不存在RDD2中的所有元素的RDD,需要混洗
RDD1.cartesian(RDD2)计算两个RDD的笛卡尔积,返回所有可能的(a,b)对,其中,,在求大规模的笛卡尔积时开销巨大。
3 行动操作
reduce()操作两个RDD的元素类型的数据并返回一个同样类型的新元素,常用于聚合操作。
sum = rdd.reduce(lambda x,y:x+y)
fold()接收一个与reduce()接收的函数签名相同的函数,再加上一个“初始值”来作为每个分区第一次调用时的结果。“初始值”是你提供的操作的单位元素,函数调用这个初始值多次计算不会改变结果(如+运算的0,*运算的1,字符串拼接的空列表)
结合使用map函数,便可以实现reduce对二元组的形式操作
aggregate()使用时,需要提供我们期待返回的类型的初始值。然后通过一个函数把RDD中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数(reduce())来将累加器两两合并。
sumCount = nums.aggregate((0,0),
#acc startValue is (0,0),value is a item in nums,get a tuple(sum(nums),len(nums))
(lambda acc,value:(acc[0]+value,acc[1]+1)),
#acc1 is the cumsum result of all before value[0],acc2 is the next parttition's result
(lambda acc1,acc2:((acc1[0]+acc2[0]),acc1[1]+acc2[1])))
return sumCount[0]/folat(sumCount[1])
collect()函数可以将(整个RDD)数据返回驱动器程序中,由于需要将数据复制到驱动器进程中,collect()函数要求所有数据都必须能一同放入单台机器的内存中。
take(n)函数返回RDD中的n个元素,并且尝试访问尽量少的分区(顺序与预期不一致)
top(n)返回RDD中的前n个元素(使用数据默认的顺序)
takeSample(withReplacement,num,seed)函数可以从数据中获得一个采样,并指定是否替换
foreach()行动操作对RDD中的每个元素进行操作,而不需要把任何结果发回本地(如把数据传输到一个网络服务器,或者存在数据库中)
3.5.2 在不同RDD类型间转换
在Python中,所有函数都实现在基本的RDD类中,但如果操作对应的RDD数据类型不正确,就会导致运行错误。
3.6 持久化(缓存)
在程序的运行过程中,为了便面多次重复操作(转化&行动),可以将中间结果持续化(persist),当Spark对一个RDD持久化时,计算出RDD的节点会分别保存他们所求的分区数据(当丢失后也可以方便找回)。在Python中,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在JVM堆空间中。