比pgload更快更方便写入大数据量至Greenplum的Greenplum-Spark Connector

前序

Greenplum是目前比较优秀的mpp数据库,其官方推荐了几种将外部数据写入Greenplum方式,包含:通用的Jdbc,pgcopy和pgload以及Pivotal Greenplum-Spark Connector等。

  • Jdbc:Jdbc方式,写大数据量会很慢。
  • pgcopy:其中pgcopy是及其不推荐的一种,因为其写数据必须经过Greenplum的master,因此也只建议小数据量使用。
  • pgload:适合写大数据量数据,能并行写入。但其缺点是需要安装客户端,包括gpfdist等依赖,安装起来很麻烦。需要了解可以参考pgload
  • Greenplum-Spark Connector:基于Spark并行处理,并行写入Greenplum,并提供了并行读取的接口。也是接下来该文重点介绍的部分。

2. Greenplum-Spark Connector读数据架构

一个Spark application,是由Driver和Executor节点构成。当Spark application使用Greenplum-Spark Connector加载Greenplum数据时,其Driver端会通过JDBC的方式请求Greenplum的master节点获取相关的元数据信息。Connector将会根据这些元数据信息去决定Spark的Executor去怎样去并行的读取该表的数据。

Greenplum数据库存储数据是按segment组织的,Greenplum-Spark Connector在加载Greenplum数据时,需要指定Greenplum表的一个字段作为Spark的partition字段,Connector会使用这个字段的值来计算,该Greenplum表的某个segment该被哪一个或多个Spark partition读取。

其读取过程如下:

  1. Spark Driver通过Jdbc的方式连接Greenplum master,并读取指定表的相关元数据信息。然后根据指定的分区字段以及分区个数去决定segment怎么分配。
  2. Spark Executor端会通过Jdbc的方式连接Greenplum master,创建Greenplum外部表。
  3. 然后Spark Executor通过Http方式连接Greenplum的数据节点,获取指定的segment的数据。该获取数据的操作在Spark Executor并行执行。

其示意流程图如下:


Greenplum-Spark Connector

3. Greenplum-Spark Connector写数据流程

  1. GSC在Spark Executor端通过Jetty启动一个Http服务,将该服务封装为支持Greenplum的gpfdist协议。
  2. GSC在Spark Executor端通过Jdbc方式连接Greenplum master,创建Greenplum外部表,该外部表文件地址指向该Executor所启动的gpfdist协议地址。SQL示例如下:
CREATE READABLE EXTERNAL TABLE
"public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42" (LIKE "public"."rank_a1")
LOCATION ('gpfdist://10.0.8.145:44772/spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42')
FORMAT 'CSV'
(DELIMITER AS '|'
 NULL AS '')
ENCODING 'UTF-8'
  1. GSC在Spark Executor端通过Jdbc方式连接Greenplum master,然后执行insert语句至真实的表中,数据来源于这张外部表。SQL示例如下:
INSERT INTO "public"."rank_a1"
SELECT *
FROM "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42"

至于这张外部表的数据,是否落地当前Executor服务器,不清楚。猜测不会落地,而是直接通过Http直接传递给了Greenplum对应的Segment。

  1. GSC监听onApplicationEnd事件,在Spark application结束后,删除创建的外部表。

4. Greenplum-Spark Connector使用

  1. 下载GSC Jar包。
    下载地址:Pivotal Network
    可直接下载最新版本的GSC即1.6.2,支持Greenplum5.0之后的版本。greenplum-spark_<spark-version>-<gsc-version>.jar,如:
greenplum-spark_2.11-1.6.2.jar

  1. maven中引入:
        <dependency>
            <groupId>io.pivotal.greenplum.spark</groupId>
            <artifactId>greenplum-spark_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>
  1. spark提交引入:
  • spark-shell或spark-submit时候,通过--jars加入greenplum-spark_2.11-1.6.2.jar。
  • 将greenplum-spark_2.11-1.6.2.jar与Spark application包打成 uber jar 提交。

5. Greenplum-Spark Connector参数

参数名 参数描述 作用域
url Jdbc连接的url。 读,写
dbschema Greenplum数据库的schema,GSC创建的临时外部表也在该schema下,默认值为public。 读,写
dbtable Greenplum数据库的表名,GSC在读取时,会读取dbschema下的表。GSC在写数据时,如果该表不存在会自动创建。 读,写
driver Jdbc driver全类名,非必填,在GSC Jar包中已经包含了driver包。 读,写
user 用户名 读,写
password 密码 读,写
partitionColumn Greenplum数据表的字段,该字段将作为Spark分区的字段,支持integer, bigint, serial, bigserial4中类型,该字段名需小写。该字段为必填,且必须是Greenplum表建表时 DISTRIBUTED BY (<column>)语句中的字段。
partitions Spark分区数,非必填,其默认值为Greenplum的primary segments数量。
truncate 当在Spark中指定了输出模式为SaveMode.Overwrite时候,写的目标表存在的时候的策略,非必填。默认为false,即GSC将会先删除然后重新创建目标表,然后在写数据。当为true时,GSC将会先truncates目标表,然后在写入数据。
iteratorOptimization 指定写数据时内存模式,非必填。默认指为true,GSC将会使用 Iterator 方式。当为false时,GSC将会在写数据时将数据存储在内存中。
server.port 指定在Spark Worker端启动gpfdist服务的端口号,非必填。默认情况下会使用随机的端口号。 读,写
server.useHostname 指定是否使用Spark Worker节点的host name为gpfdis服务的地址,非必填。默认为false。 读,写
pool.maxSize GSC连接Greenplum的连接池的最大连接数,默认为64。 读,写
pool.timeoutMs 非活动连接被认为是空闲连接的时间,毫秒值。默认为10000(10秒)。 读,写
pool.minIdle GSC连接Greenplum的连接池的最小空闲连接数,默认为0。 读,写

6. 从Greenplum读取数据

  1. DataFrameReader.load()方式:
val gscReadOptionMap = Map(
      "url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
      "user" -> "bill",
      "password" -> "changeme",
      "dbschema" -> "myschema",
      "dbtable" -> "table1",
      "partitionColumn" -> "id"
)

val gpdf = spark.read.format("greenplum")
      .options(gscReadOptionMap)
      .load()
  1. spark.read.greenplum()方式:
val url = "jdbc:postgresql://gpmaster.domain:15432/tutorial"
val tblname = "avgdelay"
val jprops = new Properties()
jprops.put("user", "user2")
jprops.put("password", "changeme")
jprops.put("partitionColumn", "airlineid")
val gpdf = spark.read.greenplum(url, tblname, jprops)

然鹅,这种方式必然需要引入一个隐式转换,官网也没介绍。

7. 写数据至Greenplum

7.1. 写数据示例:

val gscWriteOptionMap = Map(
      "url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
      "user" -> "bill",
      "password" -> "changeme",
      "dbschema" -> "myschema",
      "dbtable" -> "table2",
)

dfToWrite.write.format("greenplum")
      .options(gscWriteOptionMap)
      .save()

在通过GSC写到Greenplum表时,如果表已经存在或表中已经存在数据,可通过DataFrameWriter.mode(SaveMode savemode)方式指定其输出模式。相关模式行为如下:

SaveMode 行为
ErrorIfExists 如果Greenplum数据表已经存在则GSC直接返回错误,该策略为默认策略。
Append 直接将Spark中数据追加至表中。
Ignore 如果Greenplum数据表已经存在,GSC将不会写数据至表中也不会去修改已经存在的数据。
Overwrite 如果Greenplum数据表已经存在,则truncate参数将会生效。默认为false,即GSC将会先删除然后重新创建目标表,然后在写数据。当为true时,GSC将会先truncates目标表,然后在写入数据。

7.2. GSC自动建表:

  1. 创建的Greenplum表将不会有distribution列,如下为GSC生成的建表语句:
CREATE TABLE "public"."rank_a1" 
("id" INTEGER NOT NULL, "rank" TEXT, "year" INTEGER NOT NULL, "gender" INTEGER NOT NULL, "count" INTEGER NOT NULL);
  1. 创建的Greenplum表的字段名将会使用Spark DataFrame中的字段名。
  2. 在GSC自动建表时,将会为字段名加上双引号,这将使Greenplum区分大小写。
  3. 当Spark DataFrame的字段不为nullable时,GSC自动建表的字段将是 NOT NULL。
  4. 将会对应的Spark DataFrame字段类型映射为Greenplum的字段类型。参考,字段类型映射表

7.3. 提前手动建表:

  1. 将Spark DataFrame的字段名的数据写至Greenplum表的对应的字段中。值得注意的是,GSC在做映射的时候,是严格区分大小写的。
  2. 写至Greenplum的字段的数据类型,与对应的Spark DataFrame一致,具体参见字段类型映射
  3. 如果Spark数据中某列包含空数据,需确保对应的Greenplum表的列没有被指定为NOT NULL。
  4. Greenplum表中建表时其字段顺序可以与Spark DataFrame中不一致。但Greenplum表中不能出现不存在在Spark DataFrame中的字段。如下例子:
// Greenplum 中的字段
CREATE TABLE public.rank_a1 (
    id int4 NOT NULL,
    "rank" text NULL,
    "year" int4 NOT NULL,
    gender int4 NOT NULL,
    count int4 NOT NULL
)
DISTRIBUTED BY (id);

// Spark DataFrame中的字段
var df = Seq((2, "a|b", 2, 2, 2),(3, "a|b", 3, 3, 3)).toDF("id", "rank", "year", "gender")

// 在写数据至public.rank_a1表时,将会报错如下
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (5): _1, _2, _3, _4, _5
New column names (4): id, rank, year, gender
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.sql.Dataset.toDF(Dataset.scala:435)
    at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44)
    at com.lt.spark.greenplum.GreenplumWrite$.main(GreenplumWrite.scala:14)
    at com.lt.spark.greenplum.GreenplumWrite.main(GreenplumWrite.scala)

  1. 确保指定的用户对于该表有读写的权限,自动建表,需要有建表的权限。

8. Troubleshooting

8.1. 端口相关问题

错误信息 原因 解决办法
java.lang.RuntimeException:<port-number> is not a valid port number. 通过server.port所指定的端口无效,比如1024以内,为系统使用端口 指定端口在[1024-65535]之间
java.lang.RuntimeException:Unable to start GpfdistService on any of ports=<list-of-port-numbers> 通过server.port指定的端口已经被占用 从新指定一个未被占用的端口,或不指定该参数

8.2. Greenplum连接数问题

当连接Greenplum的连接数接近Greenplum数据库配置的最大连接数(max_connections)时。Spark application将会抛出 connection limit exceeded 错误。

排查过程:

  1. 查询Greenplum数据的最大连接数:
postgres=# show max_connections;
 max_connections
-----------------
 250
(1 row)
  1. 查询当前连接Greenplum数据库的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity;
  1. 查询指定的用户连接Greenplum数据的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE datname='tutorial';
postgres=# SELECT count(*) FROM pg_stat_activity WHERE usename='user1';
  1. 查询Greenplum数据库空闲和活动的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query='<IDLE>';
postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query!='<IDLE>';
  1. 查询连接Greenplum数据库名,用户名,客户端地址,客户端ip,当前查询语句:
postgres=# SELECT datname, usename, client_addr, client_port, current_query FROM pg_stat_activity;

如果确认是Spark application使用连接数过多,则配置JDBC Connection Pooling相关参数,减少连接数。

8.3. Greenplum Database Data Length Errors

在使用Greenplum 4.x或5.x的时候,可能会报出“data line too long”错误。这是因为在Greenplum数据库中参数项“gp_max_csv_line_length”默认值是1M。需要登陆Greenplum master修改这个参数值,示例如下,通过gpconfig修改该参数的值为5M:

gpadmin@gpmaster$ gpconfig -c gp_max_csv_line_length -v 5242880
gpadmin@gpmaster$ gpstop -u

9. 类型映射表

9.1. Greenplum to Spark

Greenplum Data Type Spark Data Type
bigint LongType
bigSerial LongType
boolean BooleanType
char StringType
date DateType
decimal DecimalType
float4 FloatType
float8 DoubleType
int IntegerType
serial IntegerType
smallInt ShortType
text StringType
time TimeStampType
timestamp TimeStampType
timestamptz TimeStampType
timetz TimeStampType
varchar StringType

9.2. Spark to Greenplum

Spark Data Type Greenplum Data Type
BinaryType bytea
BooleanType boolean
DateType date
DecimalType numeric
DoubleType float8
FloatType float4
IntegerType int
LongType bigint
ShortType smallInt
StringType text
TimeStampType timestamp

10. 参考

  1. Greenplum-Spark Connector官方文档
  2. Greenplum建表语句文档
  3. Greenplum参数配置官方文档

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,717评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,501评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,311评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,417评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,500评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,538评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,557评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,310评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,759评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,065评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,233评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,909评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,548评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,172评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,420评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,103评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,098评论 2 352

推荐阅读更多精彩内容