什么是Spark Datasource API
Spark Datasource API 是一套连接外部数据源和Spark引擎的框架
它主要是给Spark框架提供一种快速读取外界数据的能力,它可以方便地把不同的数据格式通过DataSource API注册成Spark的表,然后通过Spark SQL直接读取。它可以充分利用Spark分布式的优点进行并发读取,而且SparkSQL本身有一个很好的Catalyst优化引擎,能够极大的加快任务的执行。
Spark Datasource API 同时提供了一套优化机制,如将列剪枝和过滤操作下推至数据源侧,减少数据读取数量,提高数据处理效率。
我们先看一下Spark DataSource API 典型的工作方式:
sparkSession //SparkSession
.read
.format("csv") //驱动类,类似JDBC的driver class
.option(Map(....)) //你需要额外传递给驱动类的参数
.load("hdfs:///...") //数据文件路径
或者
CREATE TEMPORARY TABLE table_test USING druid OPTIONS (timeColumn "time",datasource "spark-druid-test ",zkHost "...")
CREATE TABLE spark_druid_test USING druid OPTIONS
(timeColumn "time",
datasource "spark-demo",
zkHost "node1:2015,node2:2015,node3:2015")
目前Spark DataSource的来源主要有三个:
- Spark 原生支持的DataSource,如常用的csv,orc,parquet;
- Spark Packages 网站中纳入的第三方包;
- 随其他项目一起发布的内嵌DataSource,如ES-Hadoop等。
Spark Datasource API 工作流程
读流程:
sparkSession.read 返回DataFrameReader,它是DataSource 读数据的入口,DataFrameReader中提供了读取多种Spark内置DataSource 的方法,包括参数配置接口。
sparkSession通过调用不同DataSource的接口,实现DataSource的参数配置,最终通过load方法真正建立Spark与DataSource的连接。
load函数最重要的功能就是将baseRelation转换成DataFrame,该功能是通过sparkSession的
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame
接口实现的,其中的参数baseRelation
通过DataSource类的resolveRelation
方法提供。
resolveRelation中使用反射创建出对应DataSource实例,协同用户指定的userSpecifiedSchema进行匹配,匹配成功返回对应的baseRelation:
- 如果是基于文件的,返回HadoopFsRelation实例
- 非文件的,返回如KafkaRelation或者JDBCRelation
上面提到baseRelationToDataFrame接受baseRelation参数返回DataFrame,是通过Dataset.ofRows(sparkSession,logicalPlan)方法实现的,其中的参数logicPlan是由LogicalRelation(baseRelation)得到。
写流程
dataSet.write 返回DataFrameWriter类型对象, 它是DataSource写数据的入口,与读机制类似,DataFrameWriter提供了DataSource的接口和参数配置方法,底层落到save方法上,运行runCommand执行写入过程,runCommand所需的LogicalPlan由对应的DataSource.planForWriting()提供。
如何扩展Spark Datasource API
所有DataSource的扩展都是基于spark\sql\core\src\main\scala\org\apache\spark\sql\sources\interfaces.scala提供的接口来实现:
一般来讲,自定义数据源需要实现以下接口和功能:
- BaseRelation:代表了一个抽象的数据源,描述了数据源和Spark SQL交互
- 数据扫描接口(根据需要实现):
- TableScan:全表数据扫描
- PrunedScan:返回指定列数据,其他的列数据源不用返回
- PrunedFilteredScan:指定列的同时,附加一些过滤条件,只返回满足过滤条件的数据
- RelationProvider: 根据用户提供的参数返回一个BaseRelation
- 数据源RDD: 将DataSource的数据读取后装配成RDD
以JDBC为例看一下DataSource扩展的流程:
(1) 【JDBCRelation】
JDBCRelation实现了BaseRelation、PrunedFilteredScan和InsertableRelation接口,在Spark层面描述了JDBC DataSource,以及数据读取(buildScan)和写入(insert)逻辑,它的全部工作就是重写以上三个接口的方法,方法清单:
BaseRelation:sqlContext、needConversion、schema、unhandledFilters
PrunedFilteredScan:上文提到了,这个是提供列裁剪和过滤的读取接口,只需要实现一个方法buildScan就好了,buildScan通过调用JDBCRDD.scanTable将从数据库中读出的数据装配成RDD。
InsertableRelation:实现写入接口insert,将DataFrame写入DataSource,调用的是DataFrameWriter的jdbc方法。
(2) 【JdbcRelationProvider】
JdbcRelationProvider实现了CreatableRelationProvider、RelationProvider、DataSourceRegister。重写了shortName和两个createRelation方法:
- DataSourceRegister:shortName方法比较简单,就是为DataSource提供一个别名,这样用户在使用实现的DataSource API的时候,提供这个别名就可以了。
- RelationProvider:重写createRelation方法,根据用户提供的参数创建baseRelation
- CreatableRelationProvider:重写createRelation方法,基于给定的DataFrame和用户参数返回baseRelation,它描述了当数据已存在情况下的createRelation行为。支持写入模式如append、overwrite。
(3) 【JDBCRDD】
一个JDBCRDD代表了关系数据库中的一张表,在Spark的Driver和Executor端都必须能够通过JDBC访问这张表,其中Driver获取schema信息,Executor获取数据。
JDBCRDD重写了RDD的getPartitions和compute方法,其中compute方法就是从关系表里读出数据,使用JdbcUtils.resultSetToSparkInternalRows( )将数据转换成SparkInternalRow格式。
JDBCRDD的伴生类中还有两个非常重要的方法:resolveTable和scanTable。这两个方法功能都比较清楚,前者是将表的schema信息以Spark 内部StructType的形式返回,后者其实是使用对应的参数创建了一个JDBCRDD的对象,对象中以RDD[InternalRow]形式映射了当前读取的关系表数据。这两个方法分别被JDBCRelation
中重写的方法-schema
和buildScan
调用。
DataSource API 流程调用链
下图是DataSource的读写流程调用链,以JDBC为例:
File Source 实现原理初探
Spark中内置的基于文件的数据源有以下几种:
- text
- csv
- json
- parquet
- orc
它们都扩展了DataSource中的FileFormat特质,那么什么是FileFormat呢?
FileFormat有读、写两方面的功能:
- 读:将文件中的数据读取成为Spark内部的InternalRow格式
- 写:将Spark内部的InternalRow格式以对应的格式写入文件
该特质有几个主要的接口:
- inferSchema(自动推测模式),返回类型为
Option[StructType]
:
当option中设置inferSchema为true情况下,无需用户编码显示指定模式,而是由系统自动推断模式。但是当该文件格式不支持模式推测或者传入的文件路径非法时,该方法返回None,此时需要用户显示指定schema。基本思路就是将传入的文件路径使用baseRelationToDataFrame方法转换成为DataFrame,然后取一行进行格式推测。 - prepareWrite,返回类型OutputWriterFactory:
这里通过参数spark.sql.sources.outputCommitterClass可以配置用户自定义的output committer。 - supportBatch,是否支持批量列的读入和写出
- isSplitable,单个文件是否能被切分
- buildReader,返回一个能够将单个文件读成Iterator[InternalRow]的方法
DataSource 在匹配类型时,会通过反射得到DataSource类型(FileFormat),返回HadoopFsRelation的BaseRelation,后续通过DataFrameReader的load接口获取DataFrame。
写过程调用DataFrameWriter save方法,构造DataSource实例,通过className确定DataSource类型,然后调用dataSource.write(mode, df)写操作。
接下来看一下datasource的写方法:write方法通过providingClass.newInstance()
实例化分别匹配CreatableRelationProvider和FileFormat,如果是CreatableRelationProvider
(其实针对内置的datasource,就是JDBCRelationProvider),调用dataSource.createRelation,如果是FileFormat
类型,构造InsertIntoHadoopFsRelationCommand
,其实该实例是一个RunnableCommand
,得到该实例后,就通过调用sparkSession.sessionState.executePlan(plan).toRdd
来执行RunnableCommand
。