数据湖—Delta Lake -之基础操作

1.数据湖的简单介绍:

1.1 官网

https://delta.io/

看一张官网的图


datalake.png

1.2 特点:

1.不限格式,来之不拒,均可流入
2.集中存储、到处可访问。
3.高性能分析能力 -- 借助于Spark、MR、SparkSQL等高性能分析计算引擎,可以对海量的数据进行分析。
4.原始数据存储
5.数据湖是一个存储企业的各种各样原始数据的大型仓库,其中的数据可供存取、处理、分析及传输。

1.3 数据湖,数据仓库, 数据集市 的对比

比较 数据仓库 数据集市 数据湖
应用范围 全公司 部门或者小组 全公司
数据类型 结构化数据处理 结构化数据处理 任意格式数据处理
存储规模 大量 中等规模(小型数仓) 海量
数据应用 维度建模,指标分析 小范围的数据分析 海量任意格式分析,不限应用类型
新应用开发周期

1.3 写时的模式

数据在写入之前,就需要定义好数据的schema,数据按照schema的定义写入

1.4 读时模式

数据在写入的时候,不需要定义Schema,在需要使用的时候在使用Schema定义它写时模式和读时模式是两种截然不同的数据处理方法。
数据湖就是一种读时模式思想的具体体现
1.相比较写时模式而言,读时模式因为是数据在使用到的时候再定义模型结构(Schema),因此能够提高数据模型定义的灵活性,可以满足不同上层业务的高效率分析需求。
2.因为,对于写时模式而言,如果想要事后更改Schema是有很高的成本的。
3.而读时模式可以在用的时候再定义Schema就很灵活了,同一套数据可以用不同的Schema来定义,来获取不同的效。

1.5 特点:

1.轻松的收集数据(读时模式):数据湖与数据仓库的一大区别就是,Schema On Read,即在使用数据时才需要Schema信息;而数据仓库是Schema On Write,即在存储数据时就需要设计好Schema。这样,由于对数据写入没有限制,数据湖可以更容易的收集数据。
2.不需要关心数据结构:存储数据无限制,任意格式数据均可存储,只要你能分析就能存。
3.全部数据都是共享的(集中存储),多个业务单元或者研究人员可以使用全部的数据,以前由于一些数据分布于不同的系统上,聚合汇总数据是很麻烦的。
4.从数据中发掘更多价值(分析能力):数据仓库和数据市场由于只使用数据中的部分属性,所以只能回答一些事先定义好的问题;而数据湖存储所有最原始、最细节的数据,所以可以回答更多的问题。并且数据湖允许组织中的各种角色通过自助分析工具(MR、Spark、SparkSQL等),对数据进行分析,以及利用AI、机器学习的技术,从数据中发掘更多的价值。
5.具有更好的扩展性和敏捷性:数据湖可以利用分布式文件系统来存储数据,因此具有很高的扩展能力。开源技术的使用还降低了存储成本。数据湖的结构没那么严格,因此天生具有更高的灵活性,从而提高了敏捷性。

1.6 数据湖的要求

1.安全:数据集中存储,就对数据安全有了更高的要求,对权限的管控要求更加严格。
2.可拓展的:随着业务扩张、数据增多,要求数据湖体系可以随需求扩展其能力。
3.可靠的:作为一个集中存储的数据中心,可靠性也很重要,三天两头坏掉那是不可以的。
4.吞吐量:数据湖作为海量数据的存储,对数据的吞吐量要求就必须很高。
5.原有格式存储:数据湖我们定义为 所有数据的原始数据集中存储库,那么存储进入数据湖的数据就是未经修饰的、原始的数据
6.支持多种数据源的输入:不限制数据类型,任意数据可以写入
7.多分析框架的支持:因为数据格式各种各样,并不全是结构化数据,所以,要求支持多种分析框架对数据湖中的数据进行提取、分析。包括但不限于:批处理的、实时的、流的、机器学习的、图形计算的等等。

1.7数据湖的原则

    1. 分离数据 和 业务
    2.存储和计算的分离(可选,比较适用云平台)
    3.Lambda架构 VS Kappa架构 VS IOTA架构 - 
    4.管理服务的重要性和选择合适的工具
        4.1安全 (Kerberos)
        4.2权限(Ranger)

2.Data Lake 的基本操作

2.1 Data Lake 的特点

1.  ACID 事务控制 :Delta Lake将ACID事务带入您的数据湖。它提供了可序列化性,最强的隔离级别。
2.  可伸缩的元数据处理: Delta Lake可以轻松处理具有数十亿个分区和文件的PB级表
4.  数据版本控制 : Delta Lake提供了数据快照,使开发人员可以访问和还原到较早版本的数据以进行审核,回滚或重现实验。
5.  开放的数据格式 :Delta Lake中的所有数据均以Apache Parquet格式存储,从而使Delta Lake能够利用Parquet固有的高效压缩和编码方案。
6.  统一的批处理和流处理的source 和 sink : Delta Lake中的表既是批处理表,又是流计算的source 和 sink。
7.  Schema执行: Delta Lake提供了指定和执行模式的功能。这有助于确保数据类型正确并且存在必需的列,从而防止不良数据导致数据损坏.
8.  Schema演化: 大数据在不断变化。 Delta Lake使您可以更改可自动应用的表模式,而无需繁琐的DDL
9.  审核历史记录 :Delta Lake事务日志记录有关数据所做的每项更改的详细信息,从而提供对更改的完整审核跟踪
10. 更新和删除 : Delta Lake支持Scala / Java API进行合并,更新和删除数据集。
10.100%和 Apache Spark 的API兼容    : 和spark 完全兼容。

2.2 Data lake 的操作: Spark Scala Shell -- 要求只是使用的Spark版本:>=2.4.2

bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0

操作如图:

[root@master01 spark-2.4.7-bin-hadoop2.7]# bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/opt/module/spark-2.4.7-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
io.delta#delta-core_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-811cb329-3b4b-4a62-ab7a-d2287a1901dc;1.0
    confs: [default]
    found io.delta#delta-core_2.11;0.5.0 in central
    found org.antlr#antlr4;4.7 in central
    found org.antlr#antlr4-runtime;4.7 in central
    found org.antlr#antlr-runtime;3.5.2 in central
    found org.antlr#ST4;4.0.8 in central
    found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
    found org.glassfish#javax.json;1.0.4 in central
    found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 376ms :: artifacts dl 6ms
    :: modules in use:
    com.ibm.icu#icu4j;58.2 from central in [default]
    io.delta#delta-core_2.11;0.5.0 from central in [default]
    org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
    org.antlr#ST4;4.0.8 from central in [default]
    org.antlr#antlr-runtime;3.5.2 from central in [default]
    org.antlr#antlr4;4.7 from central in [default]
    org.antlr#antlr4-runtime;4.7 from central in [default]
    org.glassfish#javax.json;1.0.4 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   8   |   0   |   0   |   0   ||   8   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-811cb329-3b4b-4a62-ab7a-d2287a1901dc
    confs: [default]
    0 artifacts copied, 8 already retrieved (0kB/9ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/06/09 23:27:26 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://master01.pxx.com:4041
Spark context available as 'sc' (master = local[*], app id = local-1623252446880).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val data = spark.range(0, 5)

2.2 官网命令:

bin/spark-shell --packages io.delta:delta-core_2.12:1.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
其实可以自行bin/spark-shell --packages io.delta:delta-core_2.12:1.0.0

2.3 按照官网命令走

1.创建表, 并且读取表

scala> val data = spark.range(0, 5)
data: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> data.write.format("delta").save("/tmp/delta-table02")                                                                         
scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
+---+
| id|
+---+
|  2|
|  0|
|  4|
|  3|
|  1|
+---+
scala> 
  1. 更新操作
scala> val data01 = spark.range(5,10)
data01: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> data01.write.format("delta").mode("overwrite").save("/tmp/delta-table02")                                                                             
scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
+---+
| id|
+---+
|  8|
|  7|
|  5|
|  6|
|  9|
+---+
scala> 

3.Delta Lake提供了编程api,用于有条件地更新、删除和合并(upsert)数据到表中

scala> import io.delta.tables._
import io.delta.tables._

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val deltaTable = DeltaTable.forPath("/tmp/delta-table02")
deltaTable: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@5e88e2e7

// 通过将每个偶数值加100来更新每个偶数值
scala> deltaTable.update(condition=expr("id % 2 ==0"), set = Map("id"->expr("id+100")))
                                                                                
scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
+---+                                                                           
| id|
+---+
|106|
|  7|
|  5|
|108|
|  9|
+---+

// 删除偶数
scala> deltaTable.delete(condition = expr("id % 2 ==0"))
                                                                                
scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
+---+                                                                           
| id|
+---+
|  7|
|  5|
|  9|
+---+


scala> val newData = spark.range(0,20).toDF
newData: org.apache.spark.sql.DataFrame = [id: bigint]
// 合并新数据
 deltaTable.as("oldData").merge(newData.as("newData"),"oldData.id=newData.id").whenMatched.update(Map("id" -> col("newData.id"))).whenNotMatched.insert(Map("id" ->col("newData.id"))).excute()
                                                                                                                                                                                             ^

scala> deltaTable.as("oldData").merge(newData.as("newData"),"oldData.id=newData.id").whenMatched.update(Map("id" -> col("newData.id"))).whenNotMatched.insert(Map("id" ->col("newData.id"))).execute()
[Stage 86:===================================>                 (135 + 51) / 200]21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 56.31% for 12 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 51.98% for 13 writers
[Stage 86:===============================================>     (180 + 20) / 200]21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 48.27% for 14 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 45.05% for 15 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 42.24% for 16 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 39.75% for 17 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 37.54% for 18 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 35.57% for 19 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 33.79% for 20 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 35.57% for 19 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 37.54% for 18 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 39.75% for 17 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 42.24% for 16 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 45.05% for 15 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 48.27% for 14 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 51.98% for 13 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 56.31% for 12 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                
scala> deltaTable.toDF.show()
+---+                                                                           
| id|
+---+
|  0|
|  2|
|  6|
|  1|
| 10|
| 11|
| 15|
| 12|
|  4|
| 19|
| 14|
|  5|
|  9|
| 13|
|  8|
| 18|
| 16|
|  7|
|  3|
| 17|
+---+
scala> 

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容