一.常用参数
开启中间结果压缩 对于输入数据量有少许减少,但是cpu开销增大,对于单stage任务总体不理想
set hive.exec.compress.intermediate=true;
set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
开启最终输出压缩
set hive.exec.compress.output=true;
mapred.compress.map.output map的输出是否压缩
mapred.map.output.compression.codec map的输出压缩方式
mapred.output.compress reduce的输出是否压缩
mapred.output.compression.codecreduce的输出压缩方式
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GZipCodec
减少MAP数量
设置MAP的分割文件大小 set mapred.max.split.size=512000000
增加MR中MARTASK的可使用内存内存 set mapreduce.map.memory.mb; 以解决MAP时的OOM
map执行时间:map任务启动和初始化的时间+逻辑处理的时间。
所以 如果小文件过多,可以执行set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;在MAP开始前进行小文件合并
调整reduce参数的值
set mapred.reduce.tasks = 15;
set hive.exec.reducers.bytes.per.reducer =
一般根据输入文件的总大小,用它的estimation函数来自动计算reduce的个数:reduce个数 = InputFileSize / bytes per reducer
设置并行度
set hive.exec.paralle=true
set hive.exec.parallel.thread.number=16;
设置mapjoin
set hive.auto.convert.join = true;
set hive.mapjoin.smalltable.filesize = 2500000 ;//刷入内存表的大小(字节)
设置严格模式(一般设置成全局级,而非会话级)
set hive.marped.mode=strict
防止用户执行那些可能意想不到的不好的影响的查询 分区表必须指定分区
开启动态分区排序优化 set hive.optimize.sort.dynamic.partition
开启map任务的mapreduce会引入reduce过程,这样动态分区的那个字段比如日期在传到reducer时会被排序。由于分区字段是排序的,因此每个reducer只需要保持一个文件写入器(file writer)随时处于打开状态(不然每个动态分区都需要打开一个写入器),在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力
二、日常sql编写技巧
1.将大表放后头
Hive假定查询中最后的一个表是大表。它会将其它表缓存起来,然后扫描最后那个表。 因此通常需要将小表放前面,或者标记哪张表是大表:/*streamtable(table_name) */
2.使用相同的连接键
当对3个或者更多个表进行join连接时,如果每个on子句都使用相同的连接键的话,那么只会产生一个MapReduce job。
3.尽量尽早地过滤数据
减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段。
4.尽量原子化操作
尽量避免一个SQL包含复杂逻辑,可以使用中间表来完成复杂的逻辑
5.limit 语句快速出结果
尤其是spark sql
数据倾斜
原因
1)、key分布不均匀
2)、业务数据本身的特性
3)、建表时考虑不周
4)、某些SQL语句本身就有数据倾斜
解决方案:
场景1:join时小表放在前面,并开启mapjoin set hive.auto.convert.join = true;
或使用 set hive.optimize.skewinfo=table_B:(selleer_id) [ ( "0") ("1") ) ] 说明B表的值集中在0和1上
set hive.optimize.skewjoin = true;
场景2:借助随机函数 对关联不到的 如0值或空值进行过滤或随机函数替换处理,避免倾斜
场景3:方案1:通过 union all将数据进行分离,聚集值单独处理
方案2:在MAP端 预聚合 set hive.map.aggr=true
这个设置可以将顶层的聚合操作放在Map阶段执行,从而减轻清洗阶段数据传输和Reduce阶段的执行时间,提升总体性能。确点是更废内存
方案3: 1.hive.groupby.skewindata=true (set hive.groupby.mapaggr.checkinterval=100000;这个是group的键对应的记录条数超过这个值则会进行优化 注意:该参数仅支持单列
hive.groupby.skewindata=true 控制生成两个MR Job。
第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中
场景4: select count(distinct udid) from T 改写成 select count(1) from( select distinct udid from T) t;
区别:增加一个JOB,同时后者在执行子查询的时候,可以有多个reduce参与计算,而前者只有一个reduce
大表JOIN大表时性能问题
1.分桶
建表时采用表分桶的设计,同时join时可以尝试使用bucket map join;基本处理方法是将两个表在join key上做hash bucket,将较小表(sale_history)的bucket设置为较大表(call_result)的数倍。这样数据就会按照join key做hash bucket。这样做的话,小表依然会复制到各个节点上,map join的时候,小表的每一组bucket加载成hashtable,与对应的大表bucket做局部join。
如果两表的join key 都具有唯一性(是主键关联),还可以进一步做sort merge bucket map join ;做法是两表都做bucket的基础上,每个bucket内部还要进行排序,这样做得好处就是在两边的bucket要做局部join的时候,用类似merge sort算法中的merge操作一样把两个bucket顺序遍历一下即可。
1.1 条件
1) set hive.optimize.bucketmapjoin = true;
2) 一个表的bucket数是另一个表bucket数的整数倍
3) bucket列 == join列
4) 必须是应用在map join的场景中
1.2 注意
1)如果表不是bucket的,只是做普通join
2.业务场景处理:
看懂实现逻辑即可,核心逻辑,无外乎是维护一个大客户表,对数据进行放大,结合随机函数,生成新的关联键,确保只关联到其中的一条即可,将数据的集中度进行打散
1).通用方案
此方案的思路是建立一个numbers表,其值只有一列int 行,比如从1到10(具体值可根据倾斜程度确定),然后放大B表10倍,再取模join。代码如下:
select
m.buyer_id,
sum(pay_cnt_90day) as pay_cnt_90day,
sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,
sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,
sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,
sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,
sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,
sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5
from (
select a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day
from ( select buyer_id, seller_id, pay_cnt_90day from table_A) a
join
(
select /*+mapjoin(members)*/
seller_id, sale_level ,member
from table_B
join members
) b
on a.seller_id = b.seller_id
and mod(a.pay_cnt_90day,10)+1 = b.number
) m
group by m.buyer_id
此思路的核心在于,既然按照seller_id分发会倾斜,那么再人工增加一列进行分发,这样之前倾斜的值的倾斜程度会减少到原来的1/10,可以通过配置numbers表改放大倍数来降低倾斜程度,
但这样做的一个弊端是B表也会膨胀N倍。
2).专用方案
通用方案的思路把B表的每条数据都放大了相同的倍数,实际上这是不需要的,只需要把大卖家放大倍数即可:需要首先知道大卖家的名单,即先建立一个临时表动态存放每天最新的大卖家(
比如dim_big_seller),同时此表的大卖家要膨胀预先设定的倍数(1000倍)。
在A表和B表分别新建一个join列,其逻辑为:如果是大卖家,那么concat一个随机分配正整数(0到预定义的倍数之间,本例为0~1000);如果不是,保持不变。具体代码如下:
select
m.buyer_id,
sum(pay_cnt_90day) as pay_cnt_90day,
sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,
sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,
sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,
sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,
sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,
sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5
from (
select a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day
from (
select /*+mapjoin(big)*/
buyer_id, seller_id, pay_cnt_90day,
if(big.seller_id is not null, concat( table_A.seller_id, 'rnd', cast( rand() * 1000 as bigint ), table_A.seller_id) as seller_id_joinkey
from table_A
left outer join
--big表seller_id有重复,请注意一定要group by 后再join,保证table_A的行数保持不变
(select seller_id from dim_big_seller group by seller_id)big
on table_A.seller_id = big.seller_id
) a
join
(
select /*+mapjoin(big)*/
seller_id, sale_level ,
--big表的seller_id_joinkey生成逻辑和上面的生成逻辑一样
coalesce(seller_id_joinkey,table_B.seller_id) as seller_id_joinkey
from table_B
left out join
--table_B表join大卖家表后大卖家行数扩大1000倍,其它卖家行数保持不变
(select seller_id, seller_id_joinkey from dim_big_seller) big
on table_B.seller_id= big.seller_id
) b
on a.seller_id_joinkey= b.seller_id_joinkey
) m
group by m.buyer_id
相比通用方案,专用方案的运行效率明细好了许多,因为只是将B表中大卖家的行数放大了1000倍,其它卖家的行数保持不变,但同时代码复杂了很多,而且必须首先建立大数据表。
3).动态一分为二
实际上方案2和3都用了一分为二的思想,但是都不彻底,对于mapjoin不能解决的问题,终极解决方案是动态一分为二,即对倾斜的键值和不倾斜的键值分开处理,不倾斜的正常join即可,
倾斜的把他们找出来做mapjoin,最后union all其结果即可。
但是此种解决方案比较麻烦,代码复杂而且需要一个临时表存放倾斜的键值。代码如下:
--由于数据倾斜,先找出90天买家超过10000的卖家
insert overwrite table temp_table_B
select
m.seller_id, n.sale_level
from (
select seller_id
from (
select seller_id,count(buyer_id) as byr_cnt
from table_A
group by seller_id
) a
where a.byr_cnt >10000
) m
left join
(
select seller_id, sale_level from table_B
) n
on m.seller_id = n.seller_id;
--对于90天买家超过10000的卖家直接mapjoin,对其它卖家直接正常join即可。
大表关联案例 引用自 https://www.cnblogs.com/shaosks/p/9491905.html