join长尾
背景
sql在join执行阶段会将join key相同的数据分发到同一个执行instance上处理。如果某个key上的数据量比较多,会导致该instance执行时间比其它instance执行时间长。其表现为:执行日志中该join task的大部分instance都已执行完成,但少数几个instance一直处于执行中,这种现象称之为长尾。
长尾类别&优化方法
小表长尾
join倾斜时,如果某路输入比较小,可以采用mapjoin避免倾斜。mapjoin的原理是将join操作提前到map端执行,这样可以避免因为分发key不均匀导致数据倾斜。但是mapjoin的使用有限制,必须是join中的从表比较小才可用。所谓从表,即left outer join 中的右表,或者right outer join中的左表。
热点值长尾
如果是因为热点值导致长尾,并且join的输入比较大无法用mapjoin,可以先将热点key取出,对于主表数据用热点key切分成热点数据和非热点数据两部分分别处理,最后合并。
举例说明,
有两张表,日志表log即用户点击的日志,含有商品ID字段:p_id;
商品表product,含有商品名称p_nam,商品ID:p_id
需要计算所有商品的pv
--取热点值,取商品pv大于10000的商品到临时表
INSERT TABLE topk_product;
SELECT DISTINCT
p_id
FROM
(
SELECT p_id, COUNT(1) AS pv FROM log GROUP BY p_id
)
a
WHERE
pv > 10000;
--取出非热点值和商品 join 得到非热点商品的pv
SELECT
b.p_id,
b.p_name,
c.pv
FROM
(
SELECT p_id, p_name FROM product
)
b
JOIN
(
SELECT
m.*
FROM
(
SELECT p_id, COUNT(1) AS pv FROM log
)
m
LEFT JOIN
(
SELECT p_id FROM topk_product
)
n
ON
m.p_id = n.p_id
AND n.p_id id NULL--注意这里
)
c ON b.p_id = c.p_id
--取出热点值和商品 join 得到热点商品的pv
SELECT
b.p_id,
b.p_name,
c.pv
FROM
(
SELECT
a.*
FROM
(
SELECT p_id, p_name FROM product
)
a
JOIN
(
SELECT p_id FROM topk_product
)
b
ON
a.p_id = b.p_id
)
b
JOIN
(
SELECT
m.*
FROM
(
SELECT p_id, COUNT(1) AS pv FROM log
)
m
JOIN
(
SELECT p_id FROM topk_product
)
n
ON
m.p_id = n.p_id
)
c ON b.p_id = c.p_id
--最后用union all 热点和非热点数据即可
空值长尾
join时,假设左表存在大量空值,空值聚集在一个reduce上,由于左表存在大量的记录,无法用mapjoin。此时可以使用coalesce(left_table.key,rand()*9999)将key为空的情况下赋予随机值,来避免空值集中造成长尾。
或者这样写也可:coalesce(site_id,'') /left outer join xxx where coalesce(xxxxxx,'null')!='null'
map长尾
map端读取数据时,由于文件大小分布不均匀,一些map任务读取并处理的数据特别多,一些map任务处理的数据特别少,造成map端长尾。这种倾斜没有特别好的方法,只能调节splitsize来增加mapper数量,让数据分片更小,以期望获得更为均匀的分配。
reduce长尾
由于distinct操作的存在,数据无法在map端的shuffle阶段根据group by 先做一次聚合操作,减少传输的数据量,而是将所有的数据都传输到reduce端,当key的数据分布不均匀时,就会导致reduce端长尾,特别当多个distinct同时出现在一段sql代码中时,数据就会被分发多次,不仅会造成数据膨胀N倍,也会把长尾现象放大N倍。
--只有一个distinct的情况
--原sql
SELECT
d1,
d2,
COUNT(DISTINCT
CASE
WHEN a IS NOT NULL
THEN b
END) AS b_distinct_cnt
FROM
xxx
GROUP BY
d1,
d2
--修改后的sql;建立临时表,先count在sum
CREATE TABLE tmpl AS
SELECT
d1,
d2,
b,
COUNT(
CASE
WHEN a IS NOT NULL
THEN b
END) AS b_cnt
FROM
xxx
GROUP BY
d1,
d2,
b
SELECT
d1,
d2,
SUM(
CASE
WHEN b_cnt > 0
THEN 1
ELSE 0
END) AS b_distinct_cnt
FROM
tmpl
GROUP BY
d1,
d2
--多个distinct的情况
--原始sql
SELECT
d1,
d2,
COUNT(DISTINCT
CASE
WHEN a IS NOT NULL
THEN b
END) AS b_distinct_cnt,
COUNT(DISTINCT
CASE
WHEN e IS NOT NULL
THEN c
END) AS c_distinct_cnt
FROM
xxx
GROUP BY
d1,
d2
--修改后的sql
CREATE TABLE tmpl AS
SELECT
d1,
d2,
b,
COUNT(
CASE
WHEN a IS NOT NULL
ELSE b
END) AS b_cnt
FROM
xxx
GROUP BY
d1,
d2,
b
CREATE TABLE tmpl_1 AS
SELECT
d1,
d2,
SUM(
CASE
WHEN b_cnt > 0
THEN 1
ELSE 0
END) AS b_distinct_cnt
FROM
tmpl
GROUP BY
d1,
d2
CREATE TABLE tmp2 AS
SELECT
d1,
d2,
c,
COUNT(
CASE
WHEN e IS NOT NULL
ELSE c
END) AS c_cnt
FROM
xxx
GROUP BY
d1,
d2,
c
CREATE TABLE tmp2_1 AS
SELECT
d1,
d2,
SUM(
CASE
WHEN c_cnt > 0
THEN 1
ELSE 0
END) AS c_distinct_cnt
FROM
tmp2
GROUP BY
d1,
d2
SELECT
t1.d1,t1.d2,
t1.b_distinct_cnt,
t2.c_distinct_cnt
from tmpl_1 t1
LEFT join tmp2_1 t2
on t1.d1=t2.d1
and t1.d2=t2.d2