DriverProgram 就是程序员所设计的 Spark 程序,在 Spark 程序中必须定义 SparkContext,它是开发 Spark 应用程序的入口。
SparkContext 通过 Cluster Manager 管理整个集群,群集中包含多个 Worker Node,在每个 Workder Node 都有 Executor 负责执行任务。
Cluster Manager 可以在下列模式下运行:
- 本地运行(Local Machine)——这只需要在程序中 import Spark 的链接库就可以实现。在本地运行时,对于只安装在一台计算机上时最方便,适合刚入门时学习、测试使用。
- Spark Standalone Cluster——这是由 Spark 提供的 Cluster 管理模式,如果没有架设 Hadoop Multi Node Cluster,则可以架设 Spark Standalone Cluster,实现多台计算机并行计算。在这个模式下仍然可以直接存取 Local Disk 或是 HDFS。
- Hadoop YARN (Yet Another Resource Negotiator),它是 Hadoop2. X 新架构中更高效率的资源管理核心。Spark 可以在 YARN上运行,让 YARN 帮助它进行多台机器的资源管理。
- 在云端运行——针对更大型规模的计算工作,本地机器或自建集群的计算能力恐怕难以满足。此时可以将 Spark 程序在云平台。上运行,例如 AWS 的 EC2 平台。使用云计算的好处是不需要自己架设的费用,用多少付多少,随时可扩充。
scala 介绍
Spark 支持 Scala、Java 和 Python 等语言,不过 Spark 本身是用 scala 语言开发的,所以在 Spark 应用程序开发中,Scala 被认为是当前和 Spark 兼容最好的语言。Scala 具有以下特点:
- Scala 可编译为 Java bytecode 字节码,也就是说它可以在 JVM (Java Virtual Machine)上运行,具备跨平台能力。
- 现有 Java 的链接库都可以使用,可以继续使用丰富的 Java 开放源码生态系统。
- Scala 是一种函数式语言。在函数式语言中,函数也是值,与整数字符串等处于同一地位。函数可以作为参数传递给其他函数。
- Scala 是一种纯面向对象的语言,所有的东西都是对象,而所有的操作都是方法。
- 因为 Spark 本身是以 Scala 开发的,所以必须先安装 Scala。我们将安装在 master 虚拟机上。
#创建 intRDD
val intRDD= sc. Parallelize (List (3,1,2, 5,5))
#intRDD转化为Array
intRDD.collect()
运算
map运算
Map 运算可以通过传入的函数,将每一个元素经过函数运算产生另外一个 RDD。如下,RDD 通过传入的函数 addOne,将每一个元素加 1 而产生另外一个 RDD。
def addOne (x: Int): Int={ return (x+1)}
intRDD. map (addOne). collect ()
#匿名函数
intRDD.map(x =>x+1).collect()
#匿名函数+匿名参数
intRDD.map(_+1)
filter运算
#让 intRDD 筛选数字小于 3
intRDD. filter (x=> x <3). collect ()
#使用匿名参数让筛选数字小于 3,可使用下划线“_”来取代 x=> x <3
intRDD. filter (_<3). collect ()
#字符串的筛选
strRDD.filter(x =>x.contains("ra")).collect()
distinct运算
#删除重复的元素
intRDD.distinct().collect()
strRDD.distinct().collect()
randomSplit运算
#随机将rdd按比例切分,返回一个数组
val sRDD=intRDD.randomSplit(Array(0.2,0.8))
sRDD(0).collect
sRDD(1).collect
groupBy
groupBy 可以按照传入的匿名函数规则,将数据分为多个 Array。
#使用 groupBy 运算将整个集合分成奇数与偶数
#使用 groupBy 运算时,传入的匿名函数将整个集合按照奇数与偶数分为两个 Array,此运算会返回 Array [(String, Iterable [Int])]。
val gRDD=intRDD.groupBy (
x=> {if (x % 2==0) "even" else "odd"}
). collect
#读取第 1 个是偶数的 Array
gRDD (0)
#读取第 2 个是奇数的 Array
gRDD (1)
多个rdd转换运算
#创建 3 个范例 RDD
val intRDD1 = sc. Parallelize (List (3,1, 2,5,5))
val intRDD2 =sc. Parallelize (List (5, 6))
val intRDD3 = sc. Parallelize (List (2,7))
#union 并集运算
#可以使用下列命令,将 intRDD1、intRDD2、intRDD3 进行并集运算。
intRDD1. union (intRDD2). union (intRDD3), collect ()
#使用++符号进行并集运算
(intRDD1 ++ intRDD2++ intRDD3). collect ()
#intersection 交集运算
#将 intRDD1、intRDD2 进行交集运算
intRDD1. intersection (intRDD2). collect ()
#subtract 差集运算
intRDD1. subtract (intRDD2). collect ()
#cartesian 笛卡尔乘积运算
intRDD1. cartesian (intRDD2). collect ()
spark部署
./bin/spark-submit --class org.apache.spark.examples.SparkPi \ #作业类名
--master yarn \ #spark模式
--deploy-mode cluster \ #spark on yarn 模式
--driver-memory 4g \ #每一个driver的内存
--executor-memory 2g \ #每一个executor的内存
--executor-cores 1 \ #每一个executor占用的core数量
--queue thequeue \ #作业执行的队列
examples/jars/spark-examples*.jar \ #jar包
10 #传入类中所需要的参数
spark 任务划分
一个jar包就是一个Application
一个行动操作就是一个Job, 对应于Hadoop中的一个MapReduce任务
一个Job有很多Stage组成,划分Stage是从后往前划分,遇到宽依赖则将前面的所有转换换分为一个Stage
一个Stage有很多Task组成,一个分区被一个Task所处理,所有分区数也叫并行度。