HIVE,增量更新数据

生产环境应用场景描述

每天有很多事实表需要增量同步,在HIVE没有开启事务模式的条件下,需要全表重新写入HDFS中,这在需要巨大的IO时间开销,每天的增量数据占总数据的比列很小,这种方式显得非常低效。
现在的想法是,考虑利用 INSERT OVER TABLE 语句对分区表进行指定分区覆盖插入,来实现增量更新的效果。
1,首先创建测试数据集
  • a、全量数据集
CREATE TABLE `ods.employees_all`( `employee_id` double,
`first_name` string,
`last_name` string,
`email` string,
`phone_number` string,
`hire_date` timestamp,
`job_id` string,
`salary` double,
`commission_pct` double,
`manager_id` double,
`department_id` int)
partitioned by (ds string) 
stored as orcfile;

INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(1, 'Eleni', 'Zlotkey', 'EZLOTKEY', '011.44.1344.429018', '2001-01-29 00:00:00', 'SA_MAN', 10500.0, 0.2, 100.0, 80.0, '2001');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(2, 'Mattea', 'Marvins', 'MMARVINS', '011.44.1346.329268', '2001-01-24 00:00:00', 'SA_REP', 7200.0, 0.1, 147.0, 80.0, '2001');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(3, 'David', 'Lee', 'DLEE', '011.44.1346.529268', '2002-02-23 00:00:00', 'SA_REP', 6800.0, 0.1, 147.0, 80.0, '2002');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(4, 'Sundar', 'Ande', 'SANDE', '011.44.1346.629268', '2002-03-24 00:00:00', 'SA_REP', 6400.0, 0.1, 147.0, 80.0, '2002');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(5, 'Amit', 'Banda', 'ABANDA', '011.44.1346.729268', '2003-04-21 00:00:00', 'SA_REP', 6200.0, 0.1, 147.0, 80.0, '2003');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(6, 'shabi', 'shabi', 'shabi', '590.423.4569', '2003-06-25 00:00:00.0', 'IT_PROG', 4800.0, 0.0, 103.0, 60.0, '2003');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(7, 'chunhuo', 'chunhuo', 'chunhuo', '590.423.4560', '2004-02-05 00:00:00.0', 'IT_PROG', 4800.0, 0.0, 103.0, 60.0, '2004');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(8, 'Payam', 'Kaufling', 'PKAUFLIN', '650.123.3234', '2004-05-01 00:00:00.0', 'ST_MAN', 7900.0, 0.0, 100.0, 50.0, '2004');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(9, 'David', 'Bernstein', 'DBERNSTE', '011.44.1344.345268', '2005-03-24 00:00:00.0', 'SA_REP', 9500.0, 0.25, 145.0, 80.0, '2005');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(10, 'Ellen', 'Abel', 'EABEL', '011.44.1644.429267', '2005-05-11 00:00:00.0', 'SA_REP', 11000.0, 0.3, 149.0, 80.0, '2005');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(11, 'Britney', 'Everett', 'BEVERETT', '650.501.2876', '2006-03-03 00:00:00.0', 'SH_CLERK', 3900.0, 0.0, 123.0, 50.0, '2006');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(12, 'Samuel', 'McCain', 'SMCCAIN', '650.501.3876', '2006-07-01 00:00:00.0', 'SH_CLERK', 3200.0, 0.0, 123.0, 50.0, '2006');

  • b、增量数据集
CREATE TABLE `ods.employees_all`( `employee_id` double,
`first_name` string,
`last_name` string,
`email` string,
`phone_number` string,
`hire_date` timestamp,
`job_id` string,
`salary` double,
`commission_pct` double,
`manager_id` double,
`department_id` int)
partitioned by (ds string) 
stored as orcfile;

INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(4, 'Sundar_update', 'Sundar_update', 'Sundar_update', '011.44.1346.629268', '2002-03-24 00:00:00', 'SA_REP', 6400.0, 0.1, 147.0, 80.0, '2002');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(6, 'shabi_update', 'shabi_update', 'shabi_update', '590.423.4569', '2003-06-25 00:00:00.0', 'IT_PROG', 4800.0, 0.0, 103.0, 60.0, '2003');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(8, 'Payam_update', 'Payam_update', 'Payam_update', '650.123.3234', '2004-05-01 00:00:00.0', 'ST_MAN', 7900.0, 0.0, 100.0, 50.0, '2004');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(13, 'David_incremental', 'David_incremental', 'David_incremental', '011.44.1344.345268', '2007-03-24 00:00:00.0', 'SA_REP', 9500.0, 0.25, 145.0, 80.0, '2007');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(14, 'Ellen_incremental', 'Ellen_incremental', 'Ellen_incremental', '011.44.1644.429267', '2007-05-11 00:00:00.0', 'SA_REP', 11000.0, 0.3, 149.0, 80.0, '2007');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(15, 'Britney_incremental', 'Britney_incremental', 'Britney_incremental', '650.501.2876', '2008-03-03 00:00:00.0', 'SH_CLERK', 3900.0, 0.0, 123.0, 50.0, '2008');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(16, 'Samuel_incremental', 'Samuel_incremental', 'Samuel_incremental', '650.501.3876', '2008-07-01 00:00:00.0', 'SH_CLERK', 3200.0, 0.0, 123.0, 50.0, '2008');
2,预览测试数据
  • 全量


    全量数据
  • 增量


    增量数据
3,查看全量表HDFS分区的目录情况
全量表HDFS分区目录
4,用INSER OVERWRITE TABLE语句定向更新目标分区

通过预览表数据,我知道全量表当前有12条数据,6个分区;增量表有7条数据,5个分区,其中2007和2008是新增分区,2002,2003,2004是更新分区。更新语句如下:

INSERT OVERWRITE TABLE ods.employees_all PARTITION(ds) 
SELECT t1.* FROM (
SELECT a.* 
FROM ods.employees_all a 
LEFT join ods.employees_tmp b ON a.employee_id = b.employee_id 
WHERE b.employee_id IS NULL 
  AND EXISTS (SELECT 1 FROM ods.employees_tmp c WHERE a.ds=c.ds )
UNION ALL 
SELECT * FROM ods.employees_tmp 
) t1

其中表t1的结果集为:


t1表数据集,各数据行变化或来源图中已做详细标注

这里需要解释一下的被动更新行,他的意思是这些行本身并不在新增数据集中,但因为其分区与新增数据集中的某些行的分区相同,因此也被命中以便覆盖全量数据集中的目标分区。

由t1数据集中有共有10条数据,其中新增4条,更新3条,被动更新3条,因此如果INSERT OVERWRITE TABLE语句执行成功后,ods.employees_all中应该有16条数据。

以下是更新后的全量数据集:


更新后的全量数据集

数据如预期的完全一致,说明INSERT OVERWRITE TABLE语句确实是分区表做增量更新的最优选择,这种更新方式逻辑清晰简单,实现方式优雅,绝对是不二之选。

5,更新后全量表HDFS分区的目录情况
更新后全量表HDFS分区的目录情况

可以看到,在HDFS目录中,语句执行成功后,自动创建了新增的4条记录所对应的分区。

6,总结

之前一直苦于在HIVE不开启事务的模式下,怎么做增量更新。
刚接触hive,思维还停留在传统的关系型数据库中,对不能DELETE,UPDATE操作的数仓方式非常不适应,对这种无法更新数据行的数仓深感蛋疼(认识还处于低级水平所致)。经过这番探索后发现,其实hive远比自己想的要强大的多,一般常规性的问题,前人早已给出解决方案,自己的困惑完全是来自于低级的无知。

上面的这个例子中,如果有数据库日志更新表或原表中有一个可用的update_time时间戳,t1数据集其实可以在sqoop中的query选项使用,导入HDFS后生成一个临时表,然后直接用这个临时表insert overwrite table到目标全量表。

完。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,454评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,553评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,921评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,648评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,770评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,950评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,090评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,817评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,275评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,592评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,724评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,409评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,052评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,815评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,043评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,503评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,627评论 2 350

推荐阅读更多精彩内容