数仓--DW--Hadoop数仓实践Case-05-维度表增加列

业务背景

  • 业务的扩展或变化是不可避免的, 尤其像互联网行业, 需求变更已经成为常态, 唯一不变的就是变化本身, 其中最常碰到的扩展是给一个已经存在的表增加列。
  • 以销售订单为例, 假设因为业务需要, 在操作型源系统的客户表中增加了送货地址的4个字段, 并在销售订单表中增加了销售数量字段。 由于数据源表增加了字段, 数
    据仓库中的表也要随之修改。本节说明如何在客户维度表和销售订单事实表上添加列, 并在新列上应用SCD2, 以及对定时装载脚本所做的修改。
  • 增加列之后的数据库模型如下所示(蓝色部分为增加列):


    销售订单增加列.PNG

数据准备-修改mysql中consumer/sale_order表

  • mysql修改表语句
-- 修改mysql中consumer表
alter table 
    consumer
add
    shipping_address varchar(50) after consumer_province,
add
    shipping_zip_code int after shipping_address,
add 
    shipping_city varchar(30) after shipping_zip_code,
add 
    shipping_province varchar(50) after shipping_city;
-- 修改mysql中sale_order表
alter table
    sale_order
add 
    order_quantity int  after order_mount;
  • hive 修改表
  • 因为hive中表保存的是orcfile,虽然可以采用添加列的方式,但是采用这种方式在hive低版本中极易出现错误,在hive2.x版本会有较好的兼容性。所以我们采用的策略是:先对已经存在的源表进行重命名,然后再创建表,再进行数据的迁移。
  • 注意订单事实表可以保存成parquet格式或者是textfile,这个表不涉及到某些字段的更新,只是会追加数据,所以没必要非要保存成orcfile;
  • hive中需要修改的表为:ods.ods_consumer,source.source_consumer_dim,ods.ods_sale_order,dw.sale_order_fact共四张表,因为牵涉到从mysql中抽取到hive ods层数据,再进行装载到source层数据。
  • 修改hive数据库、数据表语句如下:
-- ****************************************************
-- @Author:  LiYahui
-- @Date:  Created in  2019/04/13 10:32
-- @Description: TODO  修改hive中表,增加相应的列和数据迁移
-- @Version: V1.0 
-- ****************************************************
--  hive 修改ods.ods_consumer表
alter table
    ods.ods_consumer
add  columns
    (consumer_shipping_address varchar(50) comment '顾客送货地址',
    consumer_shipping_zip_code int comment 'shipping_zip_code',
    consumer_shipping_city varchar(30) comment 'shipping_city',
    consumer_shipping_province varchar(50) comment 'shipping_province'
    );
-- hive 修改ods.ods_sale_order表
alter table
    ods.ods_sale_order
add columns (order_quantity int comment 'order_quantity');
-- 修改hive中表名
alter table source.source_consumer_dim rename to source.source_consumer_dim_old;
-- 创建新表
-- 创建客户维度表
create table if not exists source.source_consumer_dim(
consumer_key int  comment "代理键",
consumer_number varchar(50) comment "顾客编号",
consumer_name varchar(50) comment "顾客名称",
consumer_street_address varchar(50) comment "顾客地址",
consumer_zip_code varchar(50) comment "邮政编码",
consumer_city varchar(50) comment "城市",
consumer_province varchar(50) comment "省份",
consumer_shipping_address varchar(50) comment '顾客送货地址',
consumer_shipping_zip_code int comment 'shipping_zip_code',
consumer_shipping_city varchar(30) comment 'shipping_city',
consumer_shipping_province varchar(50) comment 'shipping_province'
consumer_valid_from date comment "有效期开始日期",
consumer_valid_to date comment "有效期结束日期",
consumer_indicator varchar(50) comment "状态指示器",
consumer_version int comment "顾客变化版本号"
)comment "客户维度表"
clustered by (consumer_key) into
8 buckets
stored as
orc tblproperties ('transactional'='true')
;

-- 将旧表的数据导入到新表中
insert into 
    source.source_consumer_dim
select 
    consumer_key,
    fromconsumer_number,
    consumer_name,
    consumer_street_address,
    consumer_zip_code,
    consumer_city,
    consumer_province,
    null,null,null,null,
    consumer_valid_from,
    consumer_valid_to,
    consumer_version
from
    source.source_consumer_dim_old;
-- 删除旧表
drop table source.source_consumer_dim_old;

-- 修改销售订单事实表
alter table dw.sale_order_fact rename to dw.sale_order_fact_old;
-- 创建新表
-- 创建订单事实表
create table if not exists dw.sale_order_fact(
order_sk int comment 'order surrogate key',
customer_sk int comment 'customer surrogate key',
product_sk int comment 'product surrogate key',
order_date_sk string comment 'date surrogate key',
order_amount decimal (10 , 2 ) comment'order amount',
order_quantity int comment 'order_quantity'
)comment "销售订单事实表"
clustered by
(order_sk) into
8 buckets
stored as
orc tblproperties ('transactional'='true')
;
-- 将旧表中数据插入到新表中
insert into dw.sale_order_fact  select *,null from dw.sale_order_fact_old;
-- 删除旧表
drop table dw.sale_order_fact_old;

修改sqoop作业

  • 由于增加了数据列, 销售订单表的增量抽取作业要把销售数量这个新增列的数据抽取过来, 因此需要重建。
  • 修改定期装载脚本 1-sqoop_extract_mysql2hive_daily.sh
  • 需要注意的是:一:获取上次拉取id,二是将新添加的字段一同拉取。
  • 注意:sqoop需要提前配置开启metastore功能,需要在配置文件中进行提前配置,以便于增量导入数据。也可以将sqoop的metastore数据保存在mysql中。要不然无法通过命令读取sqoop metastore
#!/bin/bash
# @Author:  LiYahui
# @Date:  Created in  2019/04/12 11:20
# @Description: TODO  修改sqoop增量抽取作业
# @Version: V1.0
#!/bin/bash
# 读取sqoop的元数据信息,前提是已经在sqoop-site.xml配置开启了sqoop元数据
last_value=`sqoop job --show myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://liyahui-02:16000/sqoop | grep incremental.last.value | awk '{print $3}'`
# 删除元数据中的sqoop job
sqoop job --delete myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://liyahui-02:16000/sqoop
#重新创建sqoop-job
sqoop job --meta-connect jdbc:hsqldb:hsql://liyahui-02:16000/sqoop   \
--create myjob_incremental_import \
-- import \
--connect "jdbc:mysql://liyahui-02:3306/dw_source_data?useSSL=false&user=root&password=liyahui" \
--table sale_order \
--target-dir  /user/root/dw/ods/ods_sale_order  \
-m 4 \
--fields-terminated-by "\t" \
--incremental append \
--check-column order_number \
--last-value ${last-value}

修改定期装载ETL hivesql

  • 修改数据库模式后, 还要修改已经使用的定期装载HiveQL脚本, 增加对新增数据列的处理。需要对"2-update_source_consumer_dim_scd2_scd1.hql" 和"5-update_dw_sale_order_fact.hql"

part-01 修改source_consumer_dim SCD2过期时间

-- **********************************************
-- 装载客户维度,处理consumer_street_address /已删除记录/consumer_shipping_address SCD2类型
-- 2-update_source_consumer_dim_scd2_scd1.hql    part-01部分
-- *********************************************
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1); 
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date); 
set hivevar:expired_indicator=cast('Expired' as string);
--更新 设置已删除记录和 consumer_street_address 改变的 列上 SCD2过期日期 、consumer_indicator的值为Expired
update
        source.source_consumer_dim
set consumer_valid_to=${hivevar:pre_date} ,
    consumer_indicator=${hivevar:expired_indicator}
where 
    source_consumer_dim.consumer_key
in
(
select a.consumer_key 
from
-- 选择过期时间为"9999-12-31"的表内容,作为表a
    (select source_consumer_dim.consumer_key,consumer_number,consumer_street_address,consumer_shipping_address
        from 
            source.source_consumer_dim
        where 
            consumer_valid_to=${hivevar:max_date}
    ) a 
    -- join得出地址变化的列
    left join ods.ods_consumer b
    on
        a.consumer_number=b.customer_number
    -- 过滤出删除的列和地址改变的列
    where 
            b.customer_number is null 
        or
          -- <=>处理null值
            (!(a.consumer_street_address <=> b.customer_street_address)
              or
            !(a.consumer_shipping_address<=>b.customer_shipping_address))
 );
  • 语句说明:
  • 同客户地址一样, 新增的送货地址列也是用SCD2新增历史版本。 与建立的数仓--DW--Hadoop数仓实践Case-04-数据定期装载定期装载脚本中相同部分比较, 会发现这里使用了一个新的关系操作符“<=>”, 这是因为原来的脚本中少判断了一种情况。 在源系统库中, 客户地址和送货地址列都是允许为空的, 这样的设计是出于灵活性和容错性的考虑。 我们以送货地址为例进行讨论。
  • 使用“t1.shipping_address <> t2.shipping_address”条件判断送货地址是否更改, 根据不等号两边的值是否为空, 会出现以下三种情况:
    (1) t1.shipping_address和t2.shipping_address都不为空。 这种情况下如果两者相等则返回false, 说明地址没有变化; 否则返回true, 说明地址改变了, 逻辑正确。
    (2) t1.shipping_address和t2.shipping_address都为空。 两者的比较会演变成null<>null, 根据Hive对“<>”操作符的定义, 会返回NULL。 因为查询语句中只会返回判断条件为true的记录, 所以不会返回数据行, 这符合我们的逻辑, 说明地址没有改变。
    (3) t1.shipping_address和t2.shipping_address只有一个为空。 就是说地址列从NULL变成非NULL, 或者从非NULL变成NULL, 这种情况明显应该新增一个版本, 但根据“<>”的定义, 此时返回值是NULL, 查询不会返回行, 不符合我们的需求。
  • 现在使用“!(a.shipping_address <=> b.shipping_address)”作为判断条件, 我们先看一下Hive里是怎么定义“<=>”操作符的: A <=> B — Returns same result with EQUAL(=)operator for non-null operands, but returns TRUE if both are NULL, FALSE if one of the them is NULL。 从这个定义可知, 当A和B都为NULL时返回TRUE, 其中一个为NULL时返回FALSE, 其他情况与等号返回相同的结果。 下面再来看这三种情况:
    (1) t1.shipping_address和t2.shipping_address都不为空。 这种情况下如果两者相等则返回!(true), 即false, 说明地址没有变化, 否则返回!(false), 即true, 说明地址改变了, 符合我们的逻辑。
    (2) t1.shipping_address和t2.shipping_address都为空。 两者的比较会演变成!(null<=>null), 根据“<=>”的定义, 会返回!(true), 即返回false。 因为查询语句中只会返回判断条件为true的记录, 所以查询不会返回行, 这符合我们的逻辑, 说明地址没有改变。
    (3) t1.shipping_address和t2.shipping_address只有一个为空。 根据“<=>”的定义, 此时会返回!(false), 即true, 查询会返回行, 符合我们的需求。
  • 空值的逻辑判断有其特殊性, 为了避免不必要的麻烦, 数据库设计时应该尽量将字段设计成非空, 必要时用默认值代替NULL, 并将此作为一个基本的设计原则。

part-02 处理SCD2新增行

  • 开发脚本如下:
-- **********************************************
-- 装载客户维度,处理consumer_street_address  SCD2类型   新增行
-- 2-update_source_consumer_dim_scd2_scd1.hql    part-02部分
-- *********************************************
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1); 
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date); 
set hivevar:expired_indicator=cast('Expired' as string);
-- 处理source_consumer_dim的列customer_street_addresses上的新增行 SCD2方式
insert into
    source.source_consumer_dim
select 
-- 生成新的代理键
        row_number() over(order by t1.consumer_number)+t2.sk_max,
        t1.consumer_number,t1.consumer_name,t1.consumer_street_address,t1.consumer_zip_code,
        t1.consumer_city,t1.consumer_province,
t1.consumer_shipping_address,t1.consumer_zip_code,t1.consumer_shipping_city,
t1.consumer_shipping_province,
t1.consumer_valid_from,t1.consumer_valid_to,
        "Current" as consumer_indicator,t1.consumer_version
from
    (
    select 
            b.customer_number as consumer_number,
            b.customer_name as consumer_name,
            b.customer_street_address as consumer_street_address,
            b.customer_zip_code as consumer_zip_code,
            b.customer_city as consumer_city,
            b.customer_state as consumer_province,
            b.consumer_shipping_address,
            b.consumer_zip_code,
            b.consumer_shipping_city,
            b.consumer_shipping_province,
            a.consumer_version+1 as consumer_version,
            ${hivevar:pre_date} as consumer_valid_from,
            ${hivevar:max_date} as  consumer_valid_to
    from 
        source.source_consumer_dim  a
    inner join
        ods.ods_consumer b
    on
        a.consumer_number=b.customer_number and a.consumer_valid_to=${hivevar:pre_date}
    left join
        source.source_consumer_dim  c
    on
        a.consumer_number=c.consumer_number and c.consumer_valid_to=${hivevar:max_date}
    where 
         (!(a.consumer_street_address <=> b.customer_street_address)
              or
            !(a.consumer_shipping_address<=>b.customer_shipping_address))
       and 
        c.consumer_key is null
    ) t1
cross join
    (select coalesce(max(consumer_key),0) as sk_max from source.source_consumer_dim) t2;

part-03 处理SCD1

  • hive 脚本如下
-- **********************************************
-- 装载客户维度,处理consumer_name   SCD1类型  
-- 2-update_source_consumer_dim_scd2_scd1.hql    part-03部分
-- *********************************************
--处理consumer_name 的SCD1,修改所有记录中的consumer_name
drop table if exists source.source_consumer_tmp;
-- 创建临时过渡表
create table source.source_consumer_tmp
as
select 
        a.consumer_key,a.consumer_number,
        b.customer_name,b.customer_street_address,b.customer_zip_code,
      a.consumer_city,a.consumer_province,
a.consumer_shiipping_address,
a.consumer_shipping_zip_code,
a.consumer_shipping_city,
a.consumer_shipping_province,
a.consumer_valid_from,
a.consumer_valid_to,
a.consumer_indicator,
a.consumer_version
from 
    source.source_consumer_dim a,ods.ods_consumer b
where 
    a.consumer_number=b.customer_number and !(a.consumer_name <=> b.customer_name);
--删除 source.source_consumer_dim中consumer_name修改的列
delete 
from 
    source.source_consumer_dim
where 
    consumer_key
in (select consumer_key from source.source_consumer_tmp);
-- 将source.source_consumer_tmp中数据插入到source.source_consumer_dim中;
insert into 
source.source_consumer_dim
select * from source.source_consumer_tmp;
  • 语句说明:consumer_name 列上的scd1处理只是在select语句中增加了送货地址的四列, 并出于同样的原因使用了“<=>”关系操作符。

part-04 处理ods.ods_consumer新增行

-- **********************************************
-- 装载客户维度,处理ods.ods_consumer 新增行
-- 2-update_source_consumer_dim_scd2_scd1.hql    part-04部分
-- *********************************************
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1); 
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date); 
set hivevar:expired_indicator=cast('Expired' as string);
-- 处理新增的customer数据
insert into 
    source.source_consumer_dim
select
        row_number() over(order by t1.consumer_number)+t2.sk_max,
        t1.consumer_number,t1.consumer_name,t1.consumer_street_address,t1.consumer_zip_code,
        t1.consumer_city,t1.consumer_province,
t1.consumer_shipping_address,t1.consumer_shipping_zip_code,
t1.consumer_shipping_city,t1.consumer_shipping_province,
        ${hivevar:pre_date},
        ${hivevar:max_date},
        'Current',
        1
from
    (
    select 
            a.customer_number as consumer_number,
            a.customer_name as consumer_name,
            a.customer_street_address as consumer_street_address,
            a.customer_zip_code as consumer_zip_code,
            a.customer_city as consumer_city,
            a.customer_state as consumer_province,
            a.customer_shipping_address as consumer_shipping_address,
            a.customer_shipping_zip_code as consumer_shipping_code,
            a.customer_shipping_city as consumer_shipping_city,
            a.customer_shipping_province as consumer_shipping_province
    from
        ods.ods_consumer a
    left join 
        source.source_consumer_dim b
    on
        a.customer_number=b.consumer_number
    where 
        b.consumer_key is null
    ) t1
cross join
    (select coalesce(max(consumer_key),0) as sk_max from source.source_consumer_dim) t2
;

合并

  • 将part-01 至part-04按照顺序合并即可。

修改dw.sale_order_fact

  • 修改5-update_dw_sale_order_fact.hql,添加order_quantity字段,较简单
  • 修改脚本如下:
-- ****************************************************
-- @Author:  LiYahui
-- @Date:  Created in  2019/04/13 13:30
-- @Description: TODO  定期装载dw.sale_order_fact
-- @Version: V1.0 
-- ****************************************************
-- 装载订单事实表
insert into 
    dw.sale_order_fact
select 
        order_key,
        consumer_key,
        product_key,
        day_key,
        order_amount,
      order_quantity
from
    ods.ods_sale_order a,
    source.source_order_dim b,
    source.source_consumer_dim c,
    source.source_product_dim d,
    source.source_date_dim e,
    ods.ods_cdc_time f
where
        a.order_number=b.order_key
    and
        a.customer_number=c.consumer_number
    and
        a.order_date >=c.consumer_valid_from
    and
        a.entry_date < c.consumer_valid_to
    and
        a.product_code=d.product_code
    and
        a.order_date > d.product_valid_from
    and
        a.order_date < d.product_valid_to
    and
        to_date(a.order_date) = e.day_key
    and
        a.entry_date >= f.last_load
    and 
        a.entry_date < f.current_load
;

总结

  • 关于sqoop的配置,需要提前配置sqoop的元数据信息,才能通过sqoop job的形式查询到增量导入的最大id。
  • 将修改过的脚本重新整合,进行修改。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350

推荐阅读更多精彩内容