笔记汇总

Hive Join

  1. common join
    如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join.整个过程包含Map、Shuffle、Reduce阶段。
  • Map阶段:读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key;Map输出的value为join之后所关心的列即(select或者where中需要用到的);同时在value中还会包含表的Tag信息,用于标明此value对应哪个表;
  • shuffle阶段:根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中。
  • reduce阶段:通过Tag来判断每一个value是来自table1还是table2,在内部分成2组,做集合笛卡尔乘积。
  • 缺点:shuffle的网络传输和排序性能很低,reduce 端对2个集合做乘积计算,很耗内存,容易导致OOM。
  1. map join
    在map 端进行join,其原理是broadcast join,即把小表作为一个完整的驱动表来进行join操作。通常情况下,要连接的各个表里面的数据会分布在不同的Map中进行处理。即同一个Key对应的Value可能存在不同的Map中。这样就必须等到 Reduce中去连接。要使MapJoin能够顺利进行,那就必须满足这样的条件:除了一份表的数据分布在不同的Map中外,其他连接的表的数据必须在每 个Map中有完整的拷贝。MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率也会高很多。
  • 首先会启动一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中(使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的參数是文件的URI)。
  • 接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。

Spark Join

spark提供了三种join实现:hash join,sort merge join以及broadcast join。

  1. hash join:
    通过分区的形式将大批量的数据通过hash划分成n份较小的数据集进行并行计算。
  • 对两张表分别按照join keys进行重分区,即shuffle,目的是为了让有相同join keys值的记录分到对应的分区中
  • 对对应分区中的数据进行join,此处先将小表分区构造为一张hash表,然后根据大表分区中记录的join keys值拿出来进行匹配
  • 总结:要将来自buildIter的记录放到hash表中,那么每个分区来自buildIter的记录不能太大,否则就存不下,默认情况下hash join的实现是关闭状态。buildIter总体估计大小以及分区后的大小要超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件

2、sort join:
hash join对于实现大小表比较合适,但是两个表都非常大时,对内存计算造成很大的压力。

  • 实现方式:不需要将一侧数据全部加载后再进行hash join,但需要在join前将数据排序
  • 在shuffle read阶段,分别对streamIter和buildIter进行merge sort,在遍历streamIter时,对于每条记录,都采用顺序查找的方式从buildIter查找对应的记录,由于两个表都是排序的,每次处理完streamIter的一条记录后,对于streamIter的下一条记录,只需从buildIter中上一次查找结束的位置开始查找,所以说每次在buildIter中查找不必重头开始。

3、broadCast join:
Broadcast不会内存溢出,因为数据保存级别StoreageLevel是MEMORY_AND_DISK模式

  • 设计思想:避免大量的shuffle。若buildIter是一个非常小的表,其实没必要做shuffle了,直接将buildIter广播到每个计算节点,然后将buildIter放到hash表中。
  • 步骤:
    a. broadcast阶段:将小表广播分发到大表所在的所有主机。分发方式可以有driver分发。
    b. 在每个executor上执行单机版hash join,小表映射,大表试探。

Full Outer Join

  • 对于fullouter,IterA和IterB同时为streamedIter和hashedIter,即先IterA=streamedIter,IterB=hashedIter进行leftouter,然后再用先IterB=streamedIter,IterA=hashedIter进行leftouter,再把两次结果进行合并
  • 对于FullOuterJoin,如果采用HashJoin方式来实现,代价较大,需要建立双向的Hash表,而基于SortJoin,它的代价与其他几种Join相差不大,因此`FullOuter默认都是基于SortJon来实现。

Spark RDD

弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

  • 它是一组分区,分区是Spark中数据集最小的单位。也就是说Spark当中数据是以分区为单位存储的,不同的分区被存储在不同的节点上。这也是分布式计算的基础。
  • 它是一个应用在这个分区上的计算任务。在Spark当中数据和执行的操作是分开的,并且Spark基于懒加载的机制,也就是在真正触发计算的行动操作出现前,Spark会存储起来哪对哪些数据执行哪些计算。数据和计算之间的映射关系就存储在RDD中。
  • RDD之间的依赖关系,RDD之间存在转化关系,一个RDD可以通过转化操作转化成其他RDD,这些转化操作都会被记录下来。当部分数据丢失的时候,Spark可以通过记录的依赖关系重新计算丢失部分的数据,而不是重新计算所有数据。
  • 一个分区的方法,也就是计算分区的函数。Spark中支持基于hash的hash分区和基于范围的range分区。
  • 一个列表,存储每个分区的存储位置

Spark RDD缓存

Spark RDD 是惰性求值的,而有时候希望能多次使用同一个 RDD。如果简单地对 RDD 调用行动操作,Spark 每次都会重算 RDD 及它的依赖,这样就会带来太大的消耗。为了避免多次计算同一个 RDD,可以让 Spark 对数据进行持久化。
Spark 可以使用 persist 和 cache 方法将任意 RDD 缓存到内存、磁盘文件系统中。缓存是容错的,如果一个 RDD 分片丢失,则可以通过构建它的转换来自动重构。被缓存的 RDD 被使用时,存取速度会被大大加速。一般情况下,Executor 内存的 60% 会分配给 cache,剩下的 40% 用来执行任务。
cache 是 persist 的特例,将该 RDD 缓存到内存中。persist 可以让用户根据需求指定一个持久化级别。

  • MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
  • MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
  • MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
  • MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
  • DISK_ONLY : 只在磁盘上缓存 RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
  • OFF_HEAP(实验中): 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。

谓词下推

谓词下推的基本思想:将过滤表达式尽可能移动至靠近数据源的位置,以使真正执行时能直接跳过无关的数据。
hive官网上给出了outer join的谓词下推规则

  • Join(只包括left join ,right join,full join)中的谓词如果是保留表的,则不会下推
  • Join(只包括left join ,right join,full join)之后的谓词如果是Null Supplying tables的,则不会下推

Spark DAG&Stage

https://www.studytime.xin/article/spark-knowledge-rdd-stage.html

Spark Shuffle机制及调优

https://blog.csdn.net/qichangjian/article/details/88039576

MR&Spark Shuffle详解

https://mp.weixin.qq.com/s?__biz=MzUxOTU5Mjk2OA==&mid=2247485991&idx=1&sn=79c9370801739813b4a624ae6fa55d6c&chksm=f9f60740ce818e56a18f8782d21d376d027928e434f065ac2c251df09d2d4283710679364639&scene=21#wechat_redirect

Spark为什么快

Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。

  • 消除了冗余的 HDFS 读写: Hadoop 每次 shuffle 操作后,必须写到磁盘,而 Spark 在 shuffle 后不一定落盘,可以 cache 到内存中,以便迭代时使用。如果操作复杂,很多的 shufle 操作,那么 Hadoop 的读写 IO 时间会大大增加,也是 Hive 更慢的主要原因了。
  • 消除了冗余的 MapReduce 阶段: Hadoop 的 shuffle 操作一定连着完整的 MapReduce 操作,冗余繁琐。而 Spark 基于 RDD 提供了丰富的算子操作,且 reduce 操作产生 shuffle 数据,可以缓存在内存中。+
    JVM 的优化: Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM,基于进程的操作。而 Spark 每次 MapReduce 操作是基于线程的,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。每次启动 JVM 的时间可能就需要几秒甚至十几秒,那么当 Task 多了,这个时间 Hadoop 不知道比 Spark 慢了多少。
    记住一种反例 考虑一种极端查询:
Select month_id, sum(sales) from T group by month_id;

这个查询只有一次 shuffle 操作,此时,也许 Hive HQL 的运行时间也许比 Spark 还快,反正 shuffle 完了都会落一次盘,或者都不落盘。
结论 :Spark 快不是绝对的,但是绝大多数,Spark 都比 Hadoop 计算要快。这主要得益于其对 mapreduce 操作的优化以及对 JVM 使用的优化

Spark reduce by key& group by key

  • 相同点
  1. 都作用于RDD[k,v]
  2. 都根据Key来分组聚合
  3. 默认分区数量是不变的,但都可以通过参数指定分区数量
  • 不同点
  1. group by key默认没有聚合函数,得到的返回值是RDD[k,Iterable[V]]
  2. reduce by key必须传聚合函数,得到的返回值是RDD[k,聚合后的V]
    3.groupbykey.map()=reducebykey
  • 最重要的区别
    他们都是要经过shuffle的,groupByKey在方法shuffle之间不会合并原样进行shuffle,。reduceByKey进行shuffle之前会先做合并,这样就减少了shuffle的io传送,所以效率高一点

Spark读取文件,分片

  1. 调用textFile方法,需要传入文件路径,分区数
  2. 调用hadoopFile方法,获取Hadoop configuration并广播,设置读取的文件路径,实例化Hadoop RDD
  3. HadoopRDD的getPartitions()方法,设置分片,分片规则如下:
    Math.max(minSize,Math.min(goalSize,blockSize))
  • goalSize:是根据用户期望的分区数算出来的,每个分区的大小,总文件大小/用户期望分区数
  • minSize :InputSplit的最小值,由配置参数mapred.min.split.size(在/conf/mapred-site.xml文件中配置)确定,默认是1(字节)
  • blockSize :文件在HDFS中存储的block大小(在/conf/hdfs-site.xml文件中配置),不同文件可能不同,默认是64MB或者128MB。

Spark统一内存模型

1.Spark统一内存模型
相关内存设置参数:
spark.executor.memory=8G;
spark.executor.memoryOverhead=6G;
spark.memory.fraction=0.6;
其中spark.memory.fraction不能设置太高,需要为otherememory留一些富裕内存因为spark内存统计信息收集是由延迟的,如果该值太大,且spill较重的情况下,会导致内存释放不及时而oom。
jvm堆内的内存分为四个部分:
Unified Memory:统一内存,包含Storage内存和Execution内存,由spark.memory.fraction控制(spark2.0+默认为0.6,占可用内存(系统内存减去预留内存)的60%,spark1.6默认0.75)
Storage内存:主要用于rdd的缓存(比如Broadcast的数据),缓存数据,由spark.storage.storageFraction控制(默认0.5,占统一内存的50%)
Execution内存:用于缓存在执行shuffle过程中产生的中间数据(由1-spark.storage.storageFraction控制),用户spark的计算,shuffle,sort,aggregation这些计算会用到的内存
Storage内存和Execution内存之间存在动态占用机制,若己方不足对方空余则可占用对方,Execution内存被对方占用后可强制回收
Other Memory:其它,默认占可用内存的40%,用于spark内部的一些元数据,用户的数据结构,防止在稀疏和异常大的记录的情况下出现对内存估计不足导致oom时的内存缓冲
reservedMemory:预留内存300M,用于保障spark的正常运行

execution内存和storage内存动态占用机制的理解:
1.不适用缓存(storage)的应用程序可以将整个空间用于执行(execution),从而避免不必要的磁盘溢写
2.storage曾经想execution借用了空间,它缓存的数据可能非常的多,然后execution又不需要那么大的空间,假设现在storage占了80%,execution占了20%,然后当execution空间不足时,execution会向内存管理器发信号把storage曾经占用的超过50%数据的那部分强制挤掉(注意:drop后数据会不会丢失主要是看你在程序设置的storage_level来决定你是Drop到哪里,可能是
drop到磁盘)
3.execution空间不足的情况下,除了选择向storage借用空间以外,也可以把一部分数据spill到磁盘,但很多时候基于性能调优的考虑不想把数据溢写到磁盘,会优先选择向storage借空间。如果此时storage实际占用不足50%,则会借空间给execution。但当storage发现自己空间不足时(指不能放下一个完整的block),只能等execution释放空间。

Flink相关问题

https://www.cnblogs.com/qiu-hua/p/13767131.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,076评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,658评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,732评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,493评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,591评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,598评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,601评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,348评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,797评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,114评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,278评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,953评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,585评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,202评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,180评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,139评论 2 352

推荐阅读更多精彩内容