一、PySpark是什么
Python PySpark是Spark官方提供的一个Python类库,其中内置了完全的Spark API,使得Python用户在导入这个类库后,可以使用自己熟悉的Python语言来编写Spark应用程序,并最终将程序提交到Spark集群运行。
PySpark是基于Python语言开发的类库,仅支持在单机环境下供Python用户开发调试使用,需要将程序提交到Spark集群上才能使用Spark集群分布式的能力处理大规模的数据处理任务。
二、为什么要使用PySpark
在原先的文章中,我们介绍过Spark支持多语言开发应用程序,比如Scala、Python、Java、R、SQL等,数据处理与分析方向有很多人都习惯和擅长使用Python,Spark官方为了方便这些用户使用Spark,因此选择支持Python语言,而PySpark就是官方为了让Python用户更方便地使用Spark而开发出来的类库。Python用户不需要编写复杂的底层逻辑,只需要调用PySpark的API即可。
三、如何使用PySpark
3.1 PySpark开发环境搭建
本文基于Windows 64位操作系统进行PySpark的安装演示。
预先条件:安装好JDK运行环境。
1 Python环境搭建
现在主流的方式都是通过Anaconda来管理自己的Python环境了,我们从官网或者国内清华的源下载下来安装包,这里选择使用最新的23年3月更新的版本。
在自己认为合适的位置安装Anaconda即可,假设这里安装的位置是D盘。安装完成后需要配置Path的如下环境变量:
D:\anaconda3
D:\anaconda3\Scripts
D:\anaconda3\Library\mingw-w64\bin
D:\anaconda3\Library\usr\bin
D:\anaconda3\Library\bin
配置完成后,我们在命令行输入测试命令:
PS C:\Users\zhangxun> conda info
active environment : None
user config file : C:\Users\zhangxun\.condarc
populated config files : C:\Users\zhangxun\.condarc
conda version : 23.1.0
conda-build version : 3.23.3
python version : 3.10.9.final.0
virtual packages : __archspec=1=x86_64
__cuda=11.6=0
__win=0=0
base environment : D:\anaconda3 (writable)
conda av data dir : D:\anaconda3\etc\conda
conda av metadata url : None
channel URLs : https://repo.anaconda.com/pkgs/main/win-64
https://repo.anaconda.com/pkgs/main/noarch
https://repo.anaconda.com/pkgs/r/win-64
https://repo.anaconda.com/pkgs/r/noarch
https://repo.anaconda.com/pkgs/msys2/win-64
https://repo.anaconda.com/pkgs/msys2/noarch
package cache : D:\anaconda3\pkgs
C:\Users\zhangxun\.conda\pkgs
C:\Users\zhangxun\AppData\Local\conda\conda\pkgs
envs directories : D:\anaconda3\envs
C:\Users\zhangxun\.conda\envs
C:\Users\zhangxun\AppData\Local\conda\conda\envs
platform : win-64
user-agent : conda/23.1.0 requests/2.28.1 CPython/3.10.9 Windows/10 Windows/10.0.19044
administrator : False
netrc file : None
offline mode : False
如此,说明Anacond安装成功了,默认使用的是Python3.10版本,但是我们还需要更换下第三方包的下载路径,不然官方的地址下载太慢了:
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --set show_channel_urls yes
我们可以再次conda info
查看下配置的channelURLs
来查看是否设置成功,至此Anaconda的安装就完成了。
在win10环境下,如果想要在powershell中切换conda虚拟环境可能会有问题,我们需要以管理员身份打开powershell,然后执行:
conda init powershell
然后关闭重新打开powershell就可以了,这个会影响下面第四步骤PySpark的设置,所以此处需要设置一下。
2 Spark环境搭建
我们从官网或者国内镜像下载最新版本的Spark安装包,这里下载的是spark-3.3.2-bin-hadoop3.tgz
,然后将其解压缩到合适的位置,这里比如是D:\spark
。
然后我们需要新增一个环境变量:
SPARK_HOME=D:\spark\spark-3.3.2-bin-hadoop3
再将其添加到Path环境变量中:
%SPARK_HOME%\bin
然后我们在命令行尝试运行spark\bin
下面的pyspark:
(base)PS C:\Users\zhangxun> pyspark
Python 3.6.13 |Anaconda, Inc.| (default, Mar 16 2021, 11:37:27) [MSC v.1916 64 bit (AMD64)] on win32
Warning:
This Python interpreter is in a conda environment, but the environment has
not been activated. Libraries may fail to load. To activate this environment
please see https://conda.io/activation
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Using Python version 3.6.13 (default, Mar 16 2021 11:37:27)
Spark context Web UI available at ......
Spark context available as 'sc' (master = local[*], app id = local-1680416157575).
SparkSession available as 'spark'.
>>>
至此Spark的安装完成了,注意命令行一开始有个(base)
,表示是conda的base虚拟环境,我们后面第四步讲解pyspark安装时会使用新的虚拟环境,所以此处spark的warning是正常的,不必特殊处理。
3 Hadoop环境搭建
由于Spark的运行需要Hadoop作为底层支持,所以我们还需要从官网或者国内镜像下载最新的Hadoop安装包,这里现在的是hadoop-3.3.4.tar.gz
,然后将其加压缩到合适的目录,假设这里为D:\hadoop
。
然后我们需要配置Hadoop的环境变量,新建:
HADOOP_HOME=D:\hadoop\hadoop-3.3.4
然后将其添加到Path环境变量中:
%HADOOP_HOME%\bin
%HADOOP_HOME%\sbin
然后我们进入到D:\hadoop\hadoop-3.3.4\etc\hadoop
目录下,修改hadoop-env.cmd
,设置Java Home:
set JAVA_HOME=C:\PROGRA~1\Java\jdk1.8.0_341
我本机的Java环境在C:\Program Files\Java
,因为Program Files
中间有空格,按照这样填写的话Hadoop启动会报错,无法找到Java的地址,因此我们使用PROGRA~1
来代替Program Files
就能解决这个问题。
然后我们从github的winutils仓库中下载hadoop-3.0.0的压缩包,将其中的内容全部覆盖复制到D:\hadoop\hadoop-3.3.4\bin
下面,然后将hadoop.dll
拷贝到C:\Windows\System32
下面,然后我们就可以验证Hadoop了:
(base)PS C:\Users\zhangxun> hadoop version
Hadoop 3.3.4
Source code repository https://github.com/apache/hadoop.git -r a585a73c3e02ac62350c136643a5e7f6095a3dbb
Compiled by stevel on 2022-07-29T12:32Z
Compiled with protoc 3.7.1
From source with checksum fb9dd8918a7b8a5b430d61af858f6ec
This command was run using /D:/hadoop/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar
至此,Hadoop配置完成。
4 PySpark环境配置
我们在使用Python PySpark的时候,建议不要使用最新版本的Python,比如上面我们安装Anaconda时默认自带的就是Python3.10.9,因为不确定目前的PySpark类库是否已经更新支持当前最新版本的Python版本,所以我们最好选择Python3.6,我们使用Anaconda创建一个新的Python环境:
# 查看当前conda有几个虚拟环境(*代表当前使用的虚拟环境)
conda env list
base * D:\anaconda3
# 创建一个名称为pyspark的虚拟环境,使用Python3.6版本
conda create -n pyspark python=3.6
# 切换并激活使用pyspark这个虚拟环境
conda activate pyspark
# 再次查看当前conda有几个虚拟环境(*代表当前使用的虚拟环境)
conda env list
base D:\anaconda3
pyspark * D:\anaconda3\envs\pyspark
然后我们需要在当前pyspark这个虚拟环境下进行一些依赖的安装:
pip install pyspark
pip install py4j
pip install psutil
pip install jieba
注意这些依赖都是在pyspark这个虚拟环境中的,所以日后我们使用python pyspark开发spark程序的时候,都要在这个虚拟环境下执行程序。在当前虚拟环境下再次执行spark\bin
下面的pyspark命令行就不会再有warning告警了,此时表示我们的pyspark就配置成功了。
(pyspark) PS C:\Users\zhangxun> pyspark
Python 3.6.13 |Anaconda, Inc.| (default, Mar 16 2021, 11:37:27) [MSC v.1916 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Using Python version 3.6.13 (default, Mar 16 2021 11:37:27)
Spark context Web UI available at ......
Spark context available as 'sc' (master = local[*], app id = local-1680418916213).
SparkSession available as 'spark'.
>>>
5 HelloWorld测试
我们在合适的地方新建一个文件夹,然后用vscode打开这个文件夹,后面的示例代码程序都在这个文件夹里面完成了。
新建test0.py内容如下:
import os
import sys
print("hello python!")
然后打开vscode的命令行,默认是base虚拟环境,我们切换到pyspark虚拟环境,然后执行该程序:
(pyspark) C:\Users\zhangxun\pyspark-demo>python test0.py
hello python!
如此证明python程序可以正常运行。
新建test1.py内容如下:
from pyspark import SparkConf
from pyspark import SparkContext
if __name__ == '__main__':
conf=SparkConf().setAppName("test1").setMaster("local")
sc=SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6], 3)
def add(data):
return data * 10
print(rdd.map(add).collect())
执行该程序:
(pyspark) C:\Users\zhangxun\pyspark-demo>python test1.py
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/02 15:14:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[10, 20, 30, 40, 50, 60]
如此证明pyspark程序可以提交到本地的spark环境正常运行。
3.2 PySpark程序运行机制分析
在没有学习PySpark执行原理之前,很多人可能会认为PySpark编写的程序会被翻译为JVM能识别的字节码,然后提交给Spark运行,其实不是这样的。
我们在前面学习Spark基础的时候就知道,其工作原理如下:
Driver负责总体的调度,Executor负责具体Task的运行,它们都是运行在JVM进程之中的,而这些JVM进程则是可以部署在多种的资源管理系统中的,比如Yarn、Mesos或者是K8s等;用户提交的Spark程序交给Driver进行管理,Driver将程序分解为一个个的Task交给Executor执行。
为了不影响现有Spark的工作架构,Spark在外围包装了一层Python的API,借助Py4j实现Python和Java的交互,进而实现通过Python代码来编写Spark应用程序并提交给Spark集群运行。
上图中白色块为新增的Python进程:
- 在Driver端,Python通过Py4j来调用Java方法,将用户使用Python写的程序映射到JVM中,比如,用户在PySpark中实例化一个Python的SparkContext对象,最终会在JVM中实例化Scala的SparkContext对象。
- 在Executor端,都启动一个Python守护进程,当Task收到任务请求后,交给底层的Python进程去执行。
所以,Pyspark的运行机制和我们预想的并不一样,这种方式可以不破坏现有的Spark执行架构,同时也方便多种语言支持的扩展,但是也很明显,使用PySpark运行Spark任务肯定比使用Java或者Scala要有一些额外的性能损耗。
四、参考文章
pyspark在windows的安装和使用(超详细) - 腾讯云开发者社区-腾讯云 (tencent.com)
Windows 安装配置 PySpark 开发环境(详细步骤+原理分析) - 腾讯云开发者社区-腾讯云 (tencent.com)