【Spark】DataSource API

什么是Spark Datasource API

Spark Datasource API 是一套连接外部数据源和Spark引擎的框架
它主要是给Spark框架提供一种快速读取外界数据的能力,它可以方便地把不同的数据格式通过DataSource API注册成Spark的表,然后通过Spark SQL直接读取。它可以充分利用Spark分布式的优点进行并发读取,而且SparkSQL本身有一个很好的Catalyst优化引擎,能够极大的加快任务的执行。
Spark Datasource API 同时提供了一套优化机制,如将列剪枝和过滤操作下推至数据源侧,减少数据读取数量,提高数据处理效率。

我们先看一下Spark DataSource API 典型的工作方式:

sparkSession //SparkSession
      .read 
      .format("csv") //驱动类,类似JDBC的driver class
      .option(Map(....)) //你需要额外传递给驱动类的参数
      .load("hdfs:///...") //数据文件路径

或者

CREATE TEMPORARY TABLE table_test USING druid OPTIONS (timeColumn "time",datasource "spark-druid-test ",zkHost "...")
CREATE TABLE spark_druid_test USING druid OPTIONS 
(timeColumn "time",
datasource "spark-demo",
zkHost "node1:2015,node2:2015,node3:2015")

目前Spark DataSource的来源主要有三个:

  • Spark 原生支持的DataSource,如常用的csv,orc,parquet;
  • Spark Packages 网站中纳入的第三方包;
  • 随其他项目一起发布的内嵌DataSource,如ES-Hadoop等。

Spark Datasource API 工作流程

读流程
sparkSession.read 返回DataFrameReader,它是DataSource 读数据的入口,DataFrameReader中提供了读取多种Spark内置DataSource 的方法,包括参数配置接口。

sparkSession通过调用不同DataSource的接口,实现DataSource的参数配置,最终通过load方法真正建立Spark与DataSource的连接。

load函数最重要的功能就是将baseRelation转换成DataFrame,该功能是通过sparkSession的
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame
接口实现的,其中的参数baseRelation通过DataSource类的resolveRelation方法提供。

resolveRelation中使用反射创建出对应DataSource实例,协同用户指定的userSpecifiedSchema进行匹配,匹配成功返回对应的baseRelation:

  • 如果是基于文件的,返回HadoopFsRelation实例
  • 非文件的,返回如KafkaRelation或者JDBCRelation

上面提到baseRelationToDataFrame接受baseRelation参数返回DataFrame,是通过Dataset.ofRows(sparkSession,logicalPlan)方法实现的,其中的参数logicPlan是由LogicalRelation(baseRelation)得到。

写流程
dataSet.write 返回DataFrameWriter类型对象, 它是DataSource写数据的入口,与读机制类似,DataFrameWriter提供了DataSource的接口和参数配置方法,底层落到save方法上,运行runCommand执行写入过程,runCommand所需的LogicalPlan由对应的DataSource.planForWriting()提供。

如何扩展Spark Datasource API

所有DataSource的扩展都是基于spark\sql\core\src\main\scala\org\apache\spark\sql\sources\interfaces.scala提供的接口来实现:

  • interface

一般来讲,自定义数据源需要实现以下接口和功能:

  • BaseRelation:代表了一个抽象的数据源,描述了数据源和Spark SQL交互
  • 数据扫描接口(根据需要实现):
    • TableScan:全表数据扫描
    • PrunedScan:返回指定列数据,其他的列数据源不用返回
    • PrunedFilteredScan:指定列的同时,附加一些过滤条件,只返回满足过滤条件的数据
  • RelationProvider: 根据用户提供的参数返回一个BaseRelation
  • 数据源RDD: 将DataSource的数据读取后装配成RDD

以JDBC为例看一下DataSource扩展的流程:

(1) 【JDBCRelation】

  • JDBCRelation

JDBCRelation实现了BaseRelation、PrunedFilteredScan和InsertableRelation接口,在Spark层面描述了JDBC DataSource,以及数据读取(buildScan)和写入(insert)逻辑,它的全部工作就是重写以上三个接口的方法,方法清单:

  • BaseRelation:sqlContext、needConversion、schema、unhandledFilters

  • PrunedFilteredScan:上文提到了,这个是提供列裁剪和过滤的读取接口,只需要实现一个方法buildScan就好了,buildScan通过调用JDBCRDD.scanTable将从数据库中读出的数据装配成RDD。

  • InsertableRelation:实现写入接口insert,将DataFrame写入DataSource,调用的是DataFrameWriter的jdbc方法。

  • buildScan & insert

(2) 【JdbcRelationProvider】

  • JdbcRelationProvider

JdbcRelationProvider实现了CreatableRelationProvider、RelationProvider、DataSourceRegister。重写了shortName和两个createRelation方法:

  • DataSourceRegister:shortName方法比较简单,就是为DataSource提供一个别名,这样用户在使用实现的DataSource API的时候,提供这个别名就可以了。
  • RelationProvider:重写createRelation方法,根据用户提供的参数创建baseRelation
  • CreatableRelationProvider:重写createRelation方法,基于给定的DataFrame和用户参数返回baseRelation,它描述了当数据已存在情况下的createRelation行为。支持写入模式如append、overwrite。

(3) 【JDBCRDD】
一个JDBCRDD代表了关系数据库中的一张表,在Spark的Driver和Executor端都必须能够通过JDBC访问这张表,其中Driver获取schema信息,Executor获取数据。
JDBCRDD重写了RDD的getPartitions和compute方法,其中compute方法就是从关系表里读出数据,使用JdbcUtils.resultSetToSparkInternalRows( )将数据转换成SparkInternalRow格式。
JDBCRDD的伴生类中还有两个非常重要的方法:resolveTable和scanTable。这两个方法功能都比较清楚,前者是将表的schema信息以Spark 内部StructType的形式返回,后者其实是使用对应的参数创建了一个JDBCRDD的对象,对象中以RDD[InternalRow]形式映射了当前读取的关系表数据。这两个方法分别被JDBCRelation中重写的方法-schemabuildScan调用。

DataSource API 流程调用链

下图是DataSource的读写流程调用链,以JDBC为例:

  • read
  • write

File Source 实现原理初探

Spark中内置的基于文件的数据源有以下几种:

  • text
  • csv
  • json
  • parquet
  • orc

它们都扩展了DataSource中的FileFormat特质,那么什么是FileFormat呢?

FileFormat有读、写两方面的功能:

  • 读:将文件中的数据读取成为Spark内部的InternalRow格式
  • 写:将Spark内部的InternalRow格式以对应的格式写入文件

该特质有几个主要的接口:

  • inferSchema(自动推测模式),返回类型为Option[StructType]
    当option中设置inferSchema为true情况下,无需用户编码显示指定模式,而是由系统自动推断模式。但是当该文件格式不支持模式推测或者传入的文件路径非法时,该方法返回None,此时需要用户显示指定schema。基本思路就是将传入的文件路径使用baseRelationToDataFrame方法转换成为DataFrame,然后取一行进行格式推测。
  • prepareWrite,返回类型OutputWriterFactory:
    这里通过参数spark.sql.sources.outputCommitterClass可以配置用户自定义的output committer。
  • supportBatch,是否支持批量列的读入和写出
  • isSplitable,单个文件是否能被切分
  • buildReader,返回一个能够将单个文件读成Iterator[InternalRow]的方法

DataSource 在匹配类型时,会通过反射得到DataSource类型(FileFormat),返回HadoopFsRelation的BaseRelation,后续通过DataFrameReader的load接口获取DataFrame。

写过程调用DataFrameWriter save方法,构造DataSource实例,通过className确定DataSource类型,然后调用dataSource.write(mode, df)写操作。

接下来看一下datasource的写方法:write方法通过providingClass.newInstance()实例化分别匹配CreatableRelationProvider和FileFormat,如果是CreatableRelationProvider(其实针对内置的datasource,就是JDBCRelationProvider),调用dataSource.createRelation,如果是FileFormat类型,构造InsertIntoHadoopFsRelationCommand,其实该实例是一个RunnableCommand,得到该实例后,就通过调用sparkSession.sessionState.executePlan(plan).toRdd来执行RunnableCommand

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,776评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,527评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,361评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,430评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,511评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,544评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,561评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,315评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,763评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,070评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,235评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,911评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,554评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,173评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,424评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,106评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,103评论 2 352