一个早上只做了一点微小的工作,很忏愧。但是发现Spark这玩意还是蛮有意思的。下面给大家介绍一下如何用python跑一遍Wordcount的词频统计的示例程序。
在operator模块中导入add类from pyspark import SparkContext, SparkConf from operator import add#应用程序名#初始化一个SparkContext,现在sc就是一个SparkContext的实例化对象,然后方可创建RDD。
appName = "WordCount"conf = SparkConf().setAppName(appName).setMaster("local")sc = SparkContext(conf=conf)# inputFiles表示输入文件路径
stopWordFile表示停词文件路径
outputFile表示输出文件路径inputFiles = "/home/hadoop/software/spark-2.0.0-bin-hadoop2.6/examples/src/main/resources/wordcount/*"stopWordFile = "/home/hadoop/software/spark-2.0.0-bin-hadoop2.6/examples/src/main/resources/wordcount/stopword.txt"outputFile = "/tmp/result"#处理非单词符号targetList = list('\t().,?[]!;|') + ['--']#用空格替换这些标点符号,同时将替换后的行拆分成单词.在flatMap中使用replaceAndSplit函数def replaceAndSplit(s): for c in targetList: s = s.replace(c, " ") return s.split()inputRDD = sc.textFile(inputFiles)stopRDD = sc.textFile(stopWordFile)stopList = stopRDD.map(lambda x: x.strip()).collect()inputRDDv1 = inputRDD.flatMap(replaceAndSplit)inputRDDv2 = inputRDDv1.filter(lambda x: x not in stopList)inputRDDv3 = inputRDDv2.map(lambda x: (x,1))inputRDDv4 = inputRDDv3.reduceByKey(add)inputRDDv5 = inputRDDv4.map(lambda x: (x[1], x[0]))inputRDDv6 = inputRDDv5.sortByKey(ascending=False)inputRDDv7 = inputRDDv6.map(lambda x: (x[1], x[0])).keys()top100 = inputRDDv7.take(100)result = sc.parallelize(top100)result.saveAsTextFile(outputFile)
背景知识
1.任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的,SparkContext的初始化需要一个SparkConf对象,Sparkconf包括了Spark集群配置的各种参数(比如主节点的URL)。初始化后,就可以用SparkContext对象所包含的各种方法来创建,操作分布式数据集和共享变量。2.涉及的函数 - Python split()方法:通过指定分隔符对字符串进行切片,如果参数num 有指定值,则仅分隔 num 个子字符串。 - Python strip() 方法:用于移除字符串头尾指定的字符(默认为空格)。 - Python lambda()方法:用来创建匿名函数,lambda的主体是一个表达式,用来封转有限的逻辑进去。 - Python内建的filter()函数 : 用于过滤序列,filter()也接收一个函数和一个序列. - map( )方法:接收一个函数,应用到RDD中的每个元素,然后为每一条输入返回一个对象。根据提供的函数对指定序列做映射。 - flatMap( )方法:接收一个函数replaceAndSplit,应用到RDD中的每个元素,返回一个包含可迭代的类型(如list等)的RDD,可以理解为先Map(),后flat(). - > map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:> 操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象操作2:最后将所有对象合并为一个对象 - Spark sortByKey函数 : 作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的. - take(): Spark的RDD的action操作take()用于提取数据 - parallelize() : 创建一个并行集合,例如sc.parallelize(0 until numMappers, numMappers) 创建并行集合的一个重要参数,是slices的数目(例子中是numMappers),它指定了将数据集切分为几份. - Spark主要提供了两种函数:parallelize和makeRDD:1)parallelize的声明:def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
2)makeRDD的声明:def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
3)区别:A)makeRDD函数比parallelize函数多提供了数据的位置信息。B)两者的返回值都是ParallelCollectionRDD,但parallelize函数可以自己指定分区的数量,而makeRDD函数固定为seq参数的size大小。