SPARK概述
1. 概述
Spark是一个用来实现快速而通用的集群计算的平台。
Spark的核心是一个 对 很多计算任务组成的,运行在多个工作机器或是一个计算集群上的应用进行调度,分发以及监控的计算引擎。
Spark SQL是
相较与MapReduce的优势:
- 基于内存;
- 模糊了Map和Reduce的界限,减少了磁盘读写;
spark独立应用:python api
SparkConf配置应用,SparkContext对象用于访问spark。一个对下个代表一个连接,shell启动时,已有一个自带的SparkContext对象,叫做sc。
独立应用中如下配置:SparkConf传递集群URL和应用名两个参数即可。如需停止,sc.stop()或sys.rxit()即可。
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
3. RDD编程
3.1 RDD基础
弹性分布式数据集(Resilient Distributed Dataset, RDD)是分布式的元素集合。
3.1 创建
RDD的两种创建方式:读取外部数据集,或在驱动程序中对一个集合进行并行化。
3.3 RDD操作
RDD操作分为转化操作和行动操作。
惰性求值:转化操作在行动操作前是不会执行的。这样用户可以编写一系列简单操作,再执行。
3.4 向Spark传递函数
# lambda
word = rdd.filter(lambda line: "python" in line)
# 传递顶层函数或自定义局部函数
def containsPython(line):
ruturn "python" in line
word.filter(containsPython)
3.5 常见的转化操作和行动操作
3.6 持久化
避免对一个RDD多次计算,可以对其进行持久化,RDD的节点会保存他们所求出的分区数据。
python中总是会序列化
persist选项:
MEMORY_ONLY, DISK_ONLY, MEMORY_AND_DISK
cache是默认储存级别的persist。
unpersist将持久化的RDD从缓存中移除。
4. 键值对操作
4.1 定义
pair RDD:键值对类型的RDD,可以并行操作各个键或跨节点重新进行数据分组的操作接口。
4.2 创建
python 直接map元素返回成二元tuple即可。
4.3 Pair RDD的转化操作
4.4 数据分区
分布式计算中,通信代价巨大,控制数据分布以减小网络传输来提高整体性能。
partitionBy对需要反复使用用于连接的RDD进行分区,并对结果持久化后,至少一个RDD不会发生数据混洗。
sortByKey和groupByKey会分别生成范围分区的RDD和哈希的RDD。
而Map这种可能修改每天记录键的操作,可能导致新的RDD失去父RDD的分区信息。如果不需要改动键,尽量使用mapValues或flatMapValues
获取分区:rdd.partitioner
自定义分区方式(python)
把一个特定的哈希函数作为一个额外参数传递给partitionBy函数即可。
下例为基于域名进行hash分区。
from urllib.parse import urlparse
def has_domain(url):
return hash(urlparse(url).netloc)
rdd.partitionBy(20, has_domain)
5. 数据读取与保存
5.4 结构化数据
SPARK SQL用于操作结构化数据。
5.4.1 Hive
使用hive数据,需要将hiv-site.xml文件复制到Spark的./conf/目录下。然后创建HiveContext对象(spark sql的入口), 然后使用Hive SQL查询表。
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users")
firsRow = rows.first()
6. 累加器和广播变量
accumulator and broadcast variable