pyspark_2_入门篇(编写我们的第一个程序WordCount)

跟着Leo学习PySpark

chapter2——编写我们的第一个程序WordCount

上一章我们大致讲了一下pyspark的基本理论和重要概念,如果想系统化且更深入地理解spark中的概念,还请移步官方文档,这一章,将用一个我们耳熟能详的WordCount小例子,零距离感受下pyspark的简单使用

from pyspark import SparkContext, SparkConf

# 编写Spark程序做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。
# 要创建SparkContext,首先需要创建一个SparkConf对象,该对象包含有关您的应用程序的信息。

# conf = SparkConf().setAppName(appName).setMaster(master)
# sc = SparkContext(conf=conf)

conf = SparkConf().setAppName("leo-study-spark").setMaster("local")
sc = SparkContext(conf=conf)

# 1. appName参数是您的应用程序在群集UI上显示的名称。
# 2. master是一个Spark,Mesos或YARN群集URL,或一个特殊的“local”字符串,以本地模式运行。
# 3. 对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark

# spark程序围绕RDD的概念展开,创建RDD的方式有两种:并行化驱动程序中的现有集合,或外部存储系统

# 1. 所谓并行化驱动程序中的现有集合,说白了,就类似于本地的一个数组变量
# 2. 外部存储系统可以是本地文件系统、HDFS
# 方式一:从本地数组变量中创建一个RDD

data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
print type(rdd)

# rdd被创建后,就可以并行化处理,列如我们可以调用map做一步转换操作,
# 然后调用reduce聚合计算我们的数据集,最后使用print打印输出。
result = rdd.map(lambda x: x+1).reduce(lambda a, b: a + b)
print result

# 可以看到输出的结果是20
# 程序首先进行了一个map转换操作,即对数据集中的每一个元素都加上1
# 其次,又对这个RDD进行了reduce的累加操作,最后输出元素累加后的结果
<class 'pyspark.rdd.RDD'>
20
# 方式二:从本地文件系统中创建一个RDD,并演示我们今天的第一个入门级小程序,WordCount
# test.txt 文本内容如下:
# I love china
# china is my home
# I love yyf
# yyf is a beautiful girl

file_path = "/Users/mac/software/conda-demo/test_data/test.txt"

# 从文件系统中创建RDD,调用sc.textFile(filePath)

rdd = sc.textFile(file_path)

# textFile("/my/directory"), textFile("/my/directory/*.txt"), textFile("/my/directory/*.gz")

print 'textFile 将文件映射成RDD,RDD中的每一个元素是文件中的一行内容'
print rdd.collect()
print '--------------------------------------------------'

# flatMap 其实是先做了map操作,然后在此基础上又做了一层合并操作,大家可以看到

rdd = rdd.flatMap(lambda x: x.split(" "))

print rdd.collect()
print 'flatMap 先做了map操作,把RDD每一行内容按空格分隔,映射成为一个字符串数组,再做了合并操作'
print '--------------------------------------------------'

rdd = rdd.map(lambda x:(x, 1))

print rdd.collect()
print "map 操作对RDD数据集中的每一个单词进行计数"
print '--------------------------------------------------'

rdd = rdd.reduceByKey(lambda x, y: x + y)
print 'reduceByKey 对数据集中每一个元组结构的第一个元素,分组后进行累加计算,统计出文章中每个单词出现的频次,然后把结果输出'

print rdd.collect()

textFile 将文件映射成RDD,RDD中的每一个元素是文件中的一行内容
[u'I love china', u'china is my home', u'I love yyf', u'yyf is a beautiful girl']
--------------------------------------------------
[u'I', u'love', u'china', u'china', u'is', u'my', u'home', u'I', u'love', u'yyf', u'yyf', u'is', u'a', u'beautiful', u'girl']
flatMap 先做了map操作,把RDD每一行内容按空格分隔,映射成为一个字符串数组,再做了合并操作
--------------------------------------------------
[(u'I', 1), (u'love', 1), (u'china', 1), (u'china', 1), (u'is', 1), (u'my', 1), (u'home', 1), (u'I', 1), (u'love', 1), (u'yyf', 1), (u'yyf', 1), (u'is', 1), (u'a', 1), (u'beautiful', 1), (u'girl', 1)]
map 操作对RDD数据集中的每一个单词进行计数
--------------------------------------------------
reduceByKey 对数据集中每一个元组结构的第一个元素,分组后进行累加计算,统计出文章中每个单词出现的频次,然后把结果输出
[(u'a', 1), (u'beautiful', 1), (u'love', 2), (u'I', 2), (u'is', 2), (u'yyf', 2), (u'china', 2), (u'home', 1), (u'girl', 1), (u'my', 1)]
# 同样演示基于本地集合的WordCount程序,依旧是同样的输入与输出
data = ["I love china",
"china is my home",
"I love yyf",
"yyf is a beautiful girl"]

rdd = sc.parallelize(data)
print rdd.collect()

rdd = rdd.flatMap(lambda x: x.split(' '))

print rdd.collect()

rdd = rdd.map(lambda x: (x, 1))

print rdd.collect()

rdd = rdd.reduceByKey(lambda x, y: x + y)

print rdd.collect()
['I love china', 'china is my home', 'I love yyf', 'yyf is a beautiful girl']
['I', 'love', 'china', 'china', 'is', 'my', 'home', 'I', 'love', 'yyf', 'yyf', 'is', 'a', 'beautiful', 'girl']
[('I', 1), ('love', 1), ('china', 1), ('china', 1), ('is', 1), ('my', 1), ('home', 1), ('I', 1), ('love', 1), ('yyf', 1), ('yyf', 1), ('is', 1), ('a', 1), ('beautiful', 1), ('girl', 1)]
[('a', 1), ('beautiful', 1), ('love', 2), ('I', 2), ('is', 2), ('yyf', 2), ('china', 2), ('home', 1), ('girl', 1), ('my', 1)]
#  除了上述方式,还有很多创建RDD的方式,从HDFS文件系统中创建RDD,只用把file_path路径换成我们的HDFS路径即可
#到这里,我们可以知晓,spark程序说白了就是对一个超大数据集(一台机器跑不动),转换成可并行计算的RDD数据集,然后被分发到不同的计算节点
#去执行,经过一系列的转换操作,最终触发Action操作,要么把计算结果收集,要么输出到其他存储介质中
#  显式地声明SparkContext的做法,在目前版本的Spark(2.x)中是不被推荐的,pache Spark 2.0引入了SparkSession,
#为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,
#它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互,而且,在之后的课程中也将采用这样的方式去声明spark操作对象。
#  
#  下面将演示通过SparkSession来实现同样的WordCount功能,而不需要显式地创建SparkConf,SparkContext,
#因为这些对象已经封装在SparkSession中。
#
#  使用生成器的设计模式(builder design pattern),如果我们没有创建SparkSession对象,
#则会实例化出一个新的SparkSession对象及其相关的上下文。
#  
#  一样的输入与输出,大家可以自己尝试一步一步调试,再次感受spark程序的运行规律。

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("leo-study-spark").getOrCreate()

rdd = spark.read.text('/Users/mac/software/conda-demo/test_data/test.txt').rdd.map(lambda x: x[0])
rdd = rdd.flatMap(lambda x: x.split(' ')).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)

for (word, count) in rdd.collect():
    print "%s: %d" %(word, count)
    
spark.stop()
a: 1
beautiful: 1
love: 2
I: 2
is: 2
yyf: 2
china: 2
home: 1
girl: 1
my: 1
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容