PySpark 是 Spark 为 Python 开发者提供的 API。
创建RDD
在PySpark中,基于Scala的创建RDD的方法有两种:
第一种是通过元组创建:
import pyspark
data1 = sc.parallelize(("a", 2))
data2 = sc.makeRDD((1 to 6, 2)) //仅基于scala
第二种是通过读取外部文件:
rdd = sc.textFile('/FileStore/tables/sf_data.csv')
textFile方法中第二个参数可选,用于指定分区个数。默认情况下每一个分块创建一个分区
Transformations
- map(): 它把一个函数作为它的参数,并把这个函数作用在原RDD的每个元素上,从而创建一个新的RDD实例。
df_crimes = crime_data_lines.map(lambda row: int(row[10]))
- flatmap():它把一个函数作为它的参数,这个函数处理原RDD中每个元素返回一个序列,扁平化这个序列的集合得到一个数据集,flatMap方法返回的RDD就代表这个数据集。
df_crimes = crime_data_lines.flatmap(lambda row: (row[10], int(row[10] + 1)))
map()方法:
flatmap()方法:
- filter():它把一个布尔函数作为它的参数,并把这个函数作用原RDD的每个元素上,从而创建一个新的RDD实例。一个布尔函数只有一个参数作为输入,返回true或false。filter方法返回一个新的RDD实例,这个RDD实例代表的数据集由布尔函数返回true的元素构成。新的RDD实例代表的数据集是原RDD的子集。
crimes = df_crimes.filter(lambda x: x != header)
- distinct(): 它的作用是找到不重复的元素
crimes.values().distinct().count()
- values(),keys():返回只由原RDD中的值/键构成的RDD
crimes.keys()
crimes.values()
Actions
1.collect():返回一个数组,这个数组由原RDD中的元素构成。在使用这个方法的时候需要小心,因为它把worker节点的数据移给了驱动程序driver。如果操作一个有大数据集的RDD,它有可能导致驱动程序崩溃。
2.count():返回RDD中的元素的个数
3.take():输入参数为一个整数N,返回一个由原RDD中前N个元素构成的RDD。
4.top():返回一个由原RDD中前N小的元素构成的RDD。
5.reduce():对原RDD的元素做汇总操作,汇总的时候满足结合律和交换律的二元操作符。
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)
works = data_reduce.reduce(lambda x, y: x / y)
以上为pyspark中常用基于rdd操作