[实战系列]SelectDB Cloud Spark Connector 最佳实践

前言

企业正在经历其数据资产的爆炸式增长,这些数据包括批式或流式传输的结构化、半结构化以及非结构化数据,随着海量数据批量导入的场景的增多,企业对于 Data Pipeline 的需求也愈加复杂。新一代云原生实时数仓 SelectDB Cloud 作为一款运行于多云之上的云原生实时数据仓库,致力于通过开箱即用的能力为客户带来简单快速的数仓体验。在生态方面,SelectDB Cloud 提供了丰富的数据连接器插件(Connector)来连接各种来自周边大数据工具的数据源,内置 Kafka、Flink、Spark、DataX 等常见的 Connector。基于此,企业开发者能够更加便捷的将数据移动到 SelectDB Cloud 上,并利用 SelectDB Cloud 从数据资产中获取更高的价值。

SelectDB Cloud 基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB,运行于多家云上,为客户提供极简运维和极致性价比的数仓服务。

本篇将带来 Spark SelectDB Connector 的详细介绍与实践。

什么是 Spark SelectDB Connector?
Spark SelectDB Connector 作为 SelectDB Cloud 上大数据量的导入方式之一,可以利用 Spark天然的分布式计算优势将数据导入到 SelectDB Cloud 中。具体来讲,Spark SelectDB Connector 支持将其他数据源(PostgreSQL, HDFS, S3等)的数据通过 Spark 计算引擎后同步到 SelectDB Cloud 的数据表中。
利用 Spark SelectDB Connector,开发者能够使用 Spark 将上游数据源读取到 DataFrame 中,然后使用 Spark SelectDB Connector 将大规模数据导入到SelectDB Cloud 数据仓库的表中;同时,开发者可以使用 Spark 的 JDBC 的方式来读取 SelectDB Cloud 表中的数据。

基础架构

image.png

在整个架构中,通常 Spark SelectDB Connector 作为外部数据写入到 SelectDB Cloud 的桥梁,优化了传统地使用 JDBC 这种低性能的连接写入方式,以其分布式、高效的特性丰富了整个数据链路。

工作原理
Spark Selectdb Connector 底层实现依赖于 SelectDB Cloud 的 stage 导入方式,当前支持两种Stage 导入方式:

  • 通过创建对象存储上的 stage 来进行批量数据拉取导入,这个主要适合大批量数据导入,使用前提是用户有自己的对象存储及其相关密钥;
  • 基于内置的 stage 的推送导入,这个主要是和小批量推送,使用较简单;

对于第一种导入方式,依赖于用户自己的对象存储,首先需要在 SelectDB Cloud 中创建 External Stage,然后将创建的 External Stage 的访问权限给用户,用户可以将需要导入的数据存储已经配置好的External Stage的存储中,通过 Spark 调用 SelectDB Cloud 的 copy into 接口(/copyinto)将对象存储的数据导入SelectDB Cloud的表中。
对于第二种导入方式,主要依赖于 SelectDB Cloud 提供的内置对象存储,Spark 通过调用SelectDB Cloud的upload接口(/copy/upload)会返回一个重定向的对象存储地址,使用 http 的方式向S3地址发送字节流,待数据上传完成之后在调用 SelectDB Cloud 的 copy into 接口(/copyinto)将对象存储的数据导入 SelectDB Cloud 的表中。

使用教程
下载方式
我们已经预编译了三个包供大家来直接下载,详细版本以及下载地址见下表:

| Connector | Runtime Jar |
| 2.3.4-2.11-1.0 | spark-selectdb-connector-2.3.4_2.11-1.0 |
| 3.1.2-2.12-1.0 | spark-selectdb-connector-3.1.2_2.12-1.0 |
| 3.2.0-2.12-1.0 | spark-selectdb-connector-3.2.0_2.12-1.0 |

注意:

  • Connector的格式为:spark-selectdb-connector-{spark.version}_{scala.version}-${connector.version}-SNAPSHOT.jar;
  • 所有的jar包是通过java 8来编译的;
  • 如有其他版本需求可通过SelectDB官网的联系方式来联系我们;

本地开发
一般我们本地开发通过 maven 引入依赖的方式将 Spark SelectDB Connector 的包引入到我们的项目中,在此之前需要将下载的 jar 通过mvn install的方式安装到本地仓库,在 maven 中使用以下方式添加依赖。

<dependency>
  <groupId>com.selectdb.spark</groupId>
  <artifactId>spark-selectdb-connector-3.1_2.12</artifactId>
  <version>1.0</version>
</dependency>

Spark Standalone & Cluster 方式
如果使用 Spark Standalone 或者 Spark Cluster 的方式运行我们的 Spark 程序,只需要将下载的jar 放到 Spark 安装目录的 jars 目录下即可。

注意:
如果多节点Spark,需要在每个Spark节点的jars目录下放一份Spark SelectDB Connector的jar包。

Spark On Yarn
Yarn集群模式运行的Spark,则将此文件放入预部署包中。例如将 spark-selectdb-connector-2.3.4-2.11-1.0.-SNAPSHOT.jar 上传到 hdfs并在spark.yarn.jars参数上添加 hdfs上的Jar包路径

  • 上传spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar 到hdfs。
hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /your_local_path/spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar /spark-jars/

  • 在集群中添加spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar 依赖。
spark.yarn.jars=hdfs:///spark-jars/doris-spark-connector-3.1.2-2.12-1.0.0.jar

使用场景

通过 sparksql 的方式写入

val selectdbHttpPort = "127.0.0.1:47968"
val selectdbJdbc = "jdbc:mysql://127.0.0.1:18836/test"
val selectdbUser = "admin"
val selectdbPwd = "selectdb2022"
val selectdbTable = "test.test_order"

CREATE TEMPORARY VIEW test_order
USING selectdb
OPTIONS(
 "table.identifier"="test.test_order",
 "jdbc.url"="${selectdbJdbc}",
 "http.port"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.file.type"="json"
);

insert into test_order select  order_id,order_amount,order_status from tmp_tb ;

通过 DataFrame 方式写入

val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "待付款"),
  ("2", 200, null),
  ("3", 300, "已收货")
)).toDF("order_id", "order_amount", "order_status")

df.write
  .format("selectdb")
  .option("selectdb.http.port", selectdbHttpPort)
  .option("selectdb.table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 4)
  .option("sink.max-retries", 2)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

具体案例
本章节我们以一个例子来演示如何使用 Spark SelectDB Connector,演示的环境各版本如下:

| Java | Spark | Scala | SelectDB Cloud |
| 1.8 | 3.1.2 | 2.12 | 2.2.1 |

在开始导入之前,我们需要做几项准备工作:

  1. Spark 环境构建,从官网下载 Spark 安装包,本次演示所用 Spark 安装包 spark-3.1.2-bin-hadoop3.2.tgz;
  2. 构造导入的数据,此次我们只是测试,构造4条数据来完成导入;
  3. Selectdb Cloud 创建仓库以及集群,设置admin 的密码,开通公网连接,将我们 Spark 环境的公网ip配置到ip白名单中;

我们先来看构建我们的 Spark 环境,下载spark-3.1.2-bin-hadoop3.2.tgz安装包,解压安装包;

wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
tar xvzf spark-3.1.2-bin-hadoop3.2.tgz

将spark-selectdb-connector-3.1.2_2.12-1.0-SNAPSHOT.jar放到/opt/selectdb/spark-3.1.2-bin-hadoop3.2/jars目录下

image.png

导入的原始数据如下:

1,100,已下单
2,200,已付款
3,300,已发货
4,400,已收货

SelectDB Cloud 中创建数据表:

 CREATE TABLE `spark_selectdb_connector` (
  `order_id` varchar(30) NULL,
  `order_amount` int(11) NULL,
  `order_status` varchar(30) NULL
) ENGINE=OLAP
DUPLICATE KEY(`order_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`order_id`) BUCKETS 10
PROPERTIES (
"persistent" = "false"
); 

image.png

我们以 spark-shell 的方式将我们的测试数据导入到 SelectDB Cloud 的数据表中:

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

val session = SparkSession.builder().master("local[*]").getOrCreate()

val scam = StructType(StructField("order_id",StringType)::StructField("order_amount",IntegerType)::StructField("order_status",StringType)::Nil)
val df = spark.read.schema(scam).csv("file:///opt/selectdb/data/test.txt")

df.write.format("selectdb")
.option("selectdb.http.port", "81.70.4.52:36511")
.option("selectdb.table.identifier", "test.spark_selectdb_connector")
.option("user", "admin")
.option("password", "Admin12345")
.option("sink.batch.size", 4)
.option("sink.max-retries", 2)
.save()

image.png

Spark 任务执行完成后,我们可以通过 mysql-client 连接 Selectdb Cloud,查看我们通过导入的数据。

image.png

至此,我们通过 Spark SelectDB Connector 导入数据的案例就结束了。

总结
本篇我们从Spark SelectDB Connector的原理以及实践等各方面做了详细介绍,大家有以下几种场景需求的情况可以使用这种连接器:

  • 以 Spark 为计算引擎构建的技术架构体系,减少其他组件引入的成本;
  • 大规模数据 ETL 离线写入SelectDB Cloud,利用 Spark 分布式计算的特性,降低doris集群资源消耗成本;

Spark SelectDB Connector 以 Spark 这个大数据计算的优秀组件作为核心,实现了利用 Spark 将外部数据源的大数据量同步到 SelectDB Cloud,便于我们实现大批量数据的快速同步,继而利用 SelectDB Cloud 为基石构建新一代的云原生数据仓库,结合 SelectDB Cloud 强大的分析计算性能,能够为企业带来业务便捷性以及增效将本的目标。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,324评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,356评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,328评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,147评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,160评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,115评论 1 296
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,025评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,867评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,307评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,528评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,688评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,409评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,001评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,657评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,811评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,685评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,573评论 2 353