Delta Lake 平台化实践(离线篇)

Delta Lake 是什么?简单的说就是为大数据场景添加了事务功能,并且支持了 update/delete/merge into 等功能, Delta Lake 初探

本文是在 Delta Lake 0.4 与 Spark 2.4 集成、平台化过程中的一些实践与思考

SQL 支持

DML

背景

delta lake 0.4 只支持以 api 的方式使用 Delete/Update/Merge Into 等 DML,对习惯了使用 sql 的终端用户会增加其学习使用成本。

解决方式

使用 spark sql extension 以插件化的方式扩展 sql parser ,增加 DML 语法的支持。在 spark 推出 sql extension 功能前,也可以用通过 aspectj 通过拦截 sql 的方式实现增加自定义语法的功能。

  1. 在自定义扩展 g4 文件中相应的 antlr4 DML 语法,部分参考了 databricks 商业版的语法
   statement
       : DELETE FROM table=qualifiedName tableAlias
                 (WHERE where=booleanExpression)?                              #deleteFromTable
       | UPDATE table=qualifiedName tableAlias upset=setClause
                 (WHERE where=booleanExpression)?                              #updateTable
       | VACUUM table=qualifiedName
           (RETAIN number HOURS)? (DRY RUN)?                                   #vacuumTable
       | (DESC | DESCRIBE) HISTORY table=qualifiedName
           (LIMIT limit=INTEGER_VALUE)?                                        #describeDeltaHistory
       | MERGE INTO target=qualifiedName targetAlias=tableAlias
               USING (source=qualifiedName |
                 '(' sourceQuery=query')') sourceAlias=tableAlias
               ON mergeCondition=booleanExpression
               matchedClause*
               notMatchedClause*                                               #mergeIntoTable
  1. 实现对应的 visit,将 sql 翻译为 delta api,以最简单的 delete 为例
     override def visitDeleteFromTable(ctx: DeleteFromTableContext): AnyRef = withOrigin(ctx) {
       DeleteTableCommand(
         visitTableIdentifier(ctx.table),
         Option(getText(ctx.where)))
     }
     
     case class DeleteTableCommand(table: TableIdentifier,
                                 where: Option[String]) extends RunnableCommand {
     override def run(sparkSession: SparkSession): Seq[Row] = {
       DeltaUtils.deltaTableCheck(sparkSession, table, "DELETE")
       val deltaTable = DeltaUtils.getDeltaTable(sparkSession, table)
       if (where.isEmpty) {
         deltaTable.delete()
       } else {
         deltaTable.delete(where.get)
       }
       Seq.empty[Row]
     }
   }
  1. 启动 Spark 时加载打包的 extension jar ,初始化 SparkSession 时指定 Extension 类。
val spark = SparkSession.builder
    .enableHiveSupport()
    .config("spark.sql.extensions", "cn.tongdun.spark.sql.TDExtensions")

tip
spark 3 之前不支持配置多个 extension ,如果遇到使用多个 extension 的情况,可以将多个 extension 在一个 extension 代码中进行注入。

以同时增加 tispark extension 和 自定义 extension 为例

override def apply(extensions: SparkSessionExtensions): Unit = {
        extensions.injectParser(TiParser(getOrCreateTiContext))
    extensions.injectResolutionRule(TiDDLRule(getOrCreateTiContext))
    extensions.injectResolutionRule(TiResolutionRule(getOrCreateTiContext))
    extensions.injectPlannerStrategy(TiStrategy(getOrCreateTiContext))
    extensions.injectParser { (session, parser) => new TDSparkSqlParser(session, parser)}
}

Query

识别 delta table 有三种实现方式

  1. 使用相应表名前缀/后缀作为标识
  2. 在 table properties 中增加相应的参数进行识别
  3. 判断表是否存在_delta_log

我们一开始是使用 delta_ 的前缀作为 delta 表名标识,这样实现最为简单,但是如果用户将 hive(parquet) 表转为 hive(delta) ,要是表名发生变化则需要修改相关代码,所以后面改为在table propertie 中增加相应的参数进行识别。
也可以通过判断是否存在 _delta_log 文件识别,该方式需要在建表时写入带有 schema 信息的空数据。

Query 通过对sql执行进行拦截,判断 Statement 为 SELECT 类型,然后将 delta 表的查询翻译成对应的 api 进行查询。

if (statementType == SELECT) {
        TableData tableData = (TableData) statementData.getStatement();
    sql = DatasourceAdapter.selectAdapter(tableData, sparkSession, sql);
}

Insert

Insert 需要考虑 INSERT_VALUES / INSERT_SELECT 还有分区表/非分区表以及写入方式的一些情况。

sql 类型判断

if (INSERT_SELECT == statementType) {
        isDeltaTable = DatasourceAdapter.deltaInsertSelectAdapter(sparkSession, statementData);
} else if (INSERT_VALUES == statementType) {
        isDeltaTable = DatasourceAdapter.deltaInsertValuesAdapter(sparkSession, statementData);
}

INSERT_INTO 需要从 catalog 中获取对应的 schema 信息,并将 values 转化为 dataFrame

val rows = statementData.getValues.asScala.map(_.asScala.toSeq).map { x => Row(x: _*) }
import spark.implicits._
val schemaStr = spark.catalog.listColumns(dbName, tableName)
    .map(col => col.name + " " + col.dataType)
    .collect().mkString(",")
val schema = StructType.fromDDL(schemaStr)
val df = spark.createDataFrame(spark.sparkContext.makeRDD[Row](rows), schema)

INSERT_SELECT 则直接访问被解析过的 Delta Query 子句。

partition

由于 delta api 的限制,不支持静态分区,在 table 中解析到对应的动态分区名,使用 partitionBy 写入即可。

至此,已经实现使用 apache spark 2.4 使用 sql 直接操作 delta table 表。

平台化工作

与 hive metastore 的集成,表数据管理 等平台化的一些工作。

浏览 delta 数据

用户在平台上点击浏览数据,如果通过 delta api ,启动 spark job 的方式从 HDFS 读取数据,依赖重,延时高,用户体验差。

基于之前在 parquet 格式上的一些工作,浏览操作可以简化为找出 delta 事务日志中还存活 (add - remove) 的 parquet 文件进行读取,这样就避免了启动 spark 的过程,大多数情况能做到毫秒级返回数据。

需要注意的是,_delta_log 文件只存在父目录,浏览某个分区的数据同样需要浏览父目录获取相应分区内的存活文件。

// DeltaHelper.load 方法会从 _delta_log 目录中找到存活 parquet 文件,然后使用 ParquetFileReader 读取
List<Path> inputFiles;
if (DeltaHelper.isDeltaTable(dir, conf)) {
    inputFiles = DeltaHelper.load(dir, conf);
} else {
    inputFiles = getInputFilesFromDirectory(projectCode, dir);
}

从 delta 0.5 开始,浏览数据的功能可以通过 manifest 文件进行更简单的实现。参考
Delta Presto Integration & Manifests 机制

元数据兼容

将原生 delta lake 基于 path 的工作方式与 hive metastore 进行兼容。

数据写入/删除

  • 数据动态分区插入 - 统计写入的分区信息(我们是通过修改了 spark write 部分的代码得到的写入分区信息),如果分区不存在则自动增加分区 add partition。还有一种更简单的做法是直接使用 msck repair table ,但是这种方式在分区较多的情况下,性能会非常糟糕。

  • 删除分区 - 在界面上操作对某个分区进行删除时,后台调用 delta 删除api,并更新相关 partition 信息。

统计信息更新

元数据中表/分区记录数,大小等元数据的更新支持。

碎片文件整理

非 delta lake 表

  • 小文件整理方式可以参考 Spark 小文件合并优化实践,这种方式采用的是在数据生成后对其进行校验,如果发现碎片文件则进行合并。

  • 小文件整理使用的是同步模式,可能会影响到下游任务的启动时间。

基于 delta lake 的小文件整理要分为两块,存活数据和标记删除的数据

  1. 标记删除的数据

    被 delta 删除的数据,底层 parquet 文件依旧存在,只是在 delta_log 中做了标记,读取时跳过了该文件。

    可以使用 delta 自带的 vacuum 功能删除一定时间之前标记删除的数据。

  2. 存活数据

    不断写入的小文件可以基于 delta 的特性,可以实现一个 compaction 功能,然后后台不断的做异步合并,不影响数据的使用方。

结语

一些限制

由于 delta api 的限制,目前 delta delete / update 不支持子句,可以使用 merge into 语法实现相同功能。

merge 使用场景

upsert
有 a1,a2 两张表,如果 a.1eventId = a2.eventId ,则 a2.data 会覆盖 a1.data,否则将 a2 表中相应的数据插入到 a1 表

MERGE INTO bigdata.table1 a1
USING bigdata.table2 a2
ON a1.eventId = a2.eventId
WHEN MATCHED THEN
  UPDATE SET a1.data = a2.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (a2.date, a2.eventId, a2.data)

ETL 避免数据重复场景
如果 uniqueid 只存在于 a2 表,则插入 a2 表中的相应记录

MERGE INTO logs a1
USING updates a2
ON a1.uniqueId = a2.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

维度表更新场景

  • 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 为 true ,则删除 a1 表相应记录
  • 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 为 false ,则将 a2 表相应记录的 value 更新到 a1 表中
  • 如果没有匹配到相应合作方,且 a2 中 deleted 为 fasle ,则将 a2 表相应记录插入到 a1 表
MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a2.deleted = true THEN DELETE
WHEN MATCHED THEN UPDATE SET a1.value = a2.newValue
WHEN NOT MATCHED AND a2.deleted = false THEN INSERT (partnerCode, value) VALUES (partnerCode, newValue)

历史数据清理场景
如果 a1 和 a2 表的合作方相同,则删除 a1 表中 ds < 20190101 的所有数据

MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a1.ds < '20190101' THEN
  DELETE
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352