第三章 RDD编程

RDD(弹性分布式数据集)是Spark的核心概念,Spark在对数据进行操作时,不外乎创建RDD,转化RDD以及调用RDD操作进行求值。

3.1 RDD基础

Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中不同的节点上。
两种创建RDD的方法:
1)读取一个外部数据集
2)在驱动器程序里分发驱动器程序中的对象集合(并行化)。
创建出来的RDD支持两类操作:转化操作(transformation)和行动操作(action);转化操作会由一个RDD转化为另一个RDD,行动操作会返回一个结果,并且把结果返回到驱动器程序中,或者把结果存储在外部存储系统中。

Spark只会惰性计算这些RDD,他们只有第一次在一个行动操作中用到时才会真正计算。

如果想在多个行动操作中重用一个RDD,可以使用RDD.persist()让Saprk把这个RDD缓存下来。在第一次对持久化的RDD计算之后,Spark会把RDD的内容保存到内存中(以分区方式存储到集群中的各个服务器),在之后的行动操作中,就可以重用这些数据了。

  pythonLines.persist
  pythonLines.count()
  pythonLines.first()

=======================================================
每个Spark程序或shell会话都按如下方式工作
(1)从外部数据创建输入RDD
(2)使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD
(3)告诉Spark对需要被重用的中间结果RDD执行persist()操作
(4)使用行动操作来触发一次并行计算,Spark会对计算进行优化后再执行
=======================================================

3.2 创建RDD

创建RDD最简单的方式是把程序中一个已有的集合传给SparkContext的parallelize()方法,除了开发原型和测试,这种方法用的不多,因为需要把整个数据先放在一台机器的内存中。

 lines = sc.parallelize(['pandas','i like pandas'])

3.3 RDD操作

转化操作:返回的是RDD数据
行动操作:返回的是非RDD数据

3.3.1 转化操作

通过转化操作后得到的RDD是惰性求值的,只有在行动操作中用到RDD时才会被计算。许多转化操作都是针对各个元素的,也就是说,这些转化操作每次只会操作RDD中的一个元素。转化操作可以操作任意数量的输入RDD。

通过转化操作,从已有的RDD中派生出新的RDD,Spark会使用谱系图来记录这些不同RDD之间的依赖关系。Spark需要这些信息来按需计算每个RDD,也可以依赖谱系图在持久化RDD丢失部分数据时恢复所丢失的数据。如下图:


RDD谱系图

3.3.2 行动操作

take()函数可以获取部分RDD中的元素,并在本地进行打印等操作。
collect()函数可以用来获取整个RDD中的数据,当程序将RDD筛选到一个很小体量的规模时,如当整个数据集能在单台机器的内存中放得下时,才可以在本地使用collect()函数,collect()函数不能用于大规模数据。

一般RDD不能通过collect()的收集到驱动器程序中,因为结果很大,我们可以将数据写到分布式存储系统(HDFS),可以使用saveAsTextFile()、saveAsSequenceFile()或者其他的行动操作来把RDD的数据内容以各种自带的格式保存起来。

——将中间结果持久化会加快速度

3.3 惰性求值

惰性求值意味着对RDD调用转化操作并不是立即执行,相反,Spark会在内部记录下所要求执行的操作的相关信息,即每个RDD相当于记录下如何计算数据的指令列表(不是一个存着运行程序后的切实结果),因此用sc.textFile()时,数据也并没有读取进来,而是在需要的时候读取(所以可能会被多次操作)。

可以通过调用一个简单的行动操作来对转化操作进行强制执行,如count()

3.4 向Spark传递函数

主要介绍传递Python版本的函数

在Python中以三种方式把函数传递给Spark:1)lambda;2)顶层函数;3)自定义的局部函数。

word = rdd.filter(lambda s: 'error' in s)

def containsError(s):
    return 'error' in s
word = rdd.filter(containsError)

【注意】传递函数时,Python会在不经意间把函数所在的对象也序列化传出去(函数需要的只是处理某个对象的部分内容,但函数会将整个对象传出去),造成的问题是传递的内容过大会造成性能降低或者传递的类里面包含Python不知道如何序列化的传输对象而报错(程序不能运行)
解决方案:先把需要的字段从对象中拿出来放到一个局部变量中,然后传递这个局部变量

  calss WordFunctions(object):
   ...
    def getMatchesNoReference(self,rdd):
    #安全:只把需要的字段提取到局部变量中,避免函数传递整个self
          query = self.query
          return rdd.filter(lambda x :query in x)

3.5 常见的转化操作和行动操作

一般通用的操作和针对特殊RDD数据的操作

3.5.1 基本RDD

通用的操作

1 针对各个元素的转化操作

map()转化操作,接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应元素的值。
filter()转化操作。接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。

map和filter区别举例

map()函数的返回值类型不需要和输入类型一样
flatMap()接收一个函数,实现对每个输入元素生成多个输出“元素”,不过返回的不是一个元素,而是一个返回值序列的迭代器。最后输出的RDD是一个包含各个迭代器可访问的所有元素的RDD。

lines = sc.parallelize(['hello world','hi'])
words = lines.map(lambda line:line.split())
for i in words.collect():
     print(i)
#结果
['hello', 'world']
['hi']

使用flatMap:

lines = sc.parallelize(['hello world','hi'])
words = lines.flatMap(lambda line:line.split())
for i in words.collect():
     print(i)
#结果
hello
world
hi

flatMap()相当如将返回的迭代器“拍扁”,这样得到一个由各个列表中的元素组成的RDD,而不是一个由列表组成的RDD。

2 伪集合操作

RDD类似于集合,可以支持并(union)、交(intersection)、移除(subtract)和去重等操作,要求必须是同一种类型的数据,但RDD不是集合,因为允许重复元素,去重操作RDD.distinct()开销很大,需要将所有数据通过网络进行混洗(shuffle)。
RDD1.union(RDD2)返回包含两个RDD中所有元素的RDD,允许重复元素
RDD1.intersection(RDD2)只返回两个RDD都有的元素,返回的是无重复的元素集合,所以运行缓慢
RDD1.subtract(RDD2)返回一个由只存在于RDD1而不存在RDD2中的所有元素的RDD,需要混洗
RDD1.cartesian(RDD2)计算两个RDD的笛卡尔积,返回所有可能的(a,b)对,其中a \in RDD1,b \in RDD2,在求大规模的笛卡尔积时开销巨大。

3 行动操作

reduce()操作两个RDD的元素类型的数据并返回一个同样类型的新元素,常用于聚合操作。

 sum = rdd.reduce(lambda x,y:x+y)

fold()接收一个与reduce()接收的函数签名相同的函数,再加上一个“初始值”来作为每个分区第一次调用时的结果。“初始值”是你提供的操作的单位元素,函数调用这个初始值多次计算不会改变结果(如+运算的0,*运算的1,字符串拼接的空列表)

结合使用map函数,便可以实现reduce对二元组的形式操作

aggregate()使用时,需要提供我们期待返回的类型的初始值。然后通过一个函数把RDD中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数(reduce())来将累加器两两合并。

  sumCount = nums.aggregate((0,0),
                      #acc startValue is (0,0),value is a item in nums,get a tuple(sum(nums),len(nums))
                      (lambda acc,value:(acc[0]+value,acc[1]+1)),
                      #acc1 is the cumsum result of all before value[0],acc2 is the next parttition's result
                      (lambda acc1,acc2:((acc1[0]+acc2[0]),acc1[1]+acc2[1])))
  return sumCount[0]/folat(sumCount[1])

collect()函数可以将(整个RDD)数据返回驱动器程序中,由于需要将数据复制到驱动器进程中,collect()函数要求所有数据都必须能一同放入单台机器的内存中。
take(n)函数返回RDD中的n个元素,并且尝试访问尽量少的分区(顺序与预期不一致)
top(n)返回RDD中的前n个元素(使用数据默认的顺序)
takeSample(withReplacement,num,seed)函数可以从数据中获得一个采样,并指定是否替换

foreach()行动操作对RDD中的每个元素进行操作,而不需要把任何结果发回本地(如把数据传输到一个网络服务器,或者存在数据库中)

3.5.2 在不同RDD类型间转换

在Python中,所有函数都实现在基本的RDD类中,但如果操作对应的RDD数据类型不正确,就会导致运行错误。

3.6 持久化(缓存)

在程序的运行过程中,为了便面多次重复操作(转化&行动),可以将中间结果持续化(persist),当Spark对一个RDD持久化时,计算出RDD的节点会分别保存他们所求的分区数据(当丢失后也可以方便找回)。在Python中,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在JVM堆空间中。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容