业务背景
- 业务的扩展或变化是不可避免的, 尤其像互联网行业, 需求变更已经成为常态, 唯一不变的就是变化本身, 其中最常碰到的扩展是给一个已经存在的表增加列。
- 以销售订单为例, 假设因为业务需要, 在操作型源系统的客户表中增加了送货地址的4个字段, 并在销售订单表中增加了销售数量字段。 由于数据源表增加了字段, 数
据仓库中的表也要随之修改。本节说明如何在客户维度表和销售订单事实表上添加列, 并在新列上应用SCD2, 以及对定时装载脚本所做的修改。 -
增加列之后的数据库模型如下所示(蓝色部分为增加列):
数据准备-修改mysql中consumer/sale_order表
- mysql修改表语句
-- 修改mysql中consumer表
alter table
consumer
add
shipping_address varchar(50) after consumer_province,
add
shipping_zip_code int after shipping_address,
add
shipping_city varchar(30) after shipping_zip_code,
add
shipping_province varchar(50) after shipping_city;
-- 修改mysql中sale_order表
alter table
sale_order
add
order_quantity int after order_mount;
- hive 修改表
- 因为hive中表保存的是orcfile,虽然可以采用添加列的方式,但是采用这种方式在hive低版本中极易出现错误,在hive2.x版本会有较好的兼容性。所以我们采用的策略是:先对已经存在的源表进行重命名,然后再创建表,再进行数据的迁移。
- 注意订单事实表可以保存成parquet格式或者是textfile,这个表不涉及到某些字段的更新,只是会追加数据,所以没必要非要保存成orcfile;
- hive中需要修改的表为:ods.ods_consumer,source.source_consumer_dim,ods.ods_sale_order,dw.sale_order_fact共四张表,因为牵涉到从mysql中抽取到hive ods层数据,再进行装载到source层数据。
- 修改hive数据库、数据表语句如下:
-- ****************************************************
-- @Author: LiYahui
-- @Date: Created in 2019/04/13 10:32
-- @Description: TODO 修改hive中表,增加相应的列和数据迁移
-- @Version: V1.0
-- ****************************************************
-- hive 修改ods.ods_consumer表
alter table
ods.ods_consumer
add columns
(consumer_shipping_address varchar(50) comment '顾客送货地址',
consumer_shipping_zip_code int comment 'shipping_zip_code',
consumer_shipping_city varchar(30) comment 'shipping_city',
consumer_shipping_province varchar(50) comment 'shipping_province'
);
-- hive 修改ods.ods_sale_order表
alter table
ods.ods_sale_order
add columns (order_quantity int comment 'order_quantity');
-- 修改hive中表名
alter table source.source_consumer_dim rename to source.source_consumer_dim_old;
-- 创建新表
-- 创建客户维度表
create table if not exists source.source_consumer_dim(
consumer_key int comment "代理键",
consumer_number varchar(50) comment "顾客编号",
consumer_name varchar(50) comment "顾客名称",
consumer_street_address varchar(50) comment "顾客地址",
consumer_zip_code varchar(50) comment "邮政编码",
consumer_city varchar(50) comment "城市",
consumer_province varchar(50) comment "省份",
consumer_shipping_address varchar(50) comment '顾客送货地址',
consumer_shipping_zip_code int comment 'shipping_zip_code',
consumer_shipping_city varchar(30) comment 'shipping_city',
consumer_shipping_province varchar(50) comment 'shipping_province'
consumer_valid_from date comment "有效期开始日期",
consumer_valid_to date comment "有效期结束日期",
consumer_indicator varchar(50) comment "状态指示器",
consumer_version int comment "顾客变化版本号"
)comment "客户维度表"
clustered by (consumer_key) into
8 buckets
stored as
orc tblproperties ('transactional'='true')
;
-- 将旧表的数据导入到新表中
insert into
source.source_consumer_dim
select
consumer_key,
fromconsumer_number,
consumer_name,
consumer_street_address,
consumer_zip_code,
consumer_city,
consumer_province,
null,null,null,null,
consumer_valid_from,
consumer_valid_to,
consumer_version
from
source.source_consumer_dim_old;
-- 删除旧表
drop table source.source_consumer_dim_old;
-- 修改销售订单事实表
alter table dw.sale_order_fact rename to dw.sale_order_fact_old;
-- 创建新表
-- 创建订单事实表
create table if not exists dw.sale_order_fact(
order_sk int comment 'order surrogate key',
customer_sk int comment 'customer surrogate key',
product_sk int comment 'product surrogate key',
order_date_sk string comment 'date surrogate key',
order_amount decimal (10 , 2 ) comment'order amount',
order_quantity int comment 'order_quantity'
)comment "销售订单事实表"
clustered by
(order_sk) into
8 buckets
stored as
orc tblproperties ('transactional'='true')
;
-- 将旧表中数据插入到新表中
insert into dw.sale_order_fact select *,null from dw.sale_order_fact_old;
-- 删除旧表
drop table dw.sale_order_fact_old;
修改sqoop作业
- 由于增加了数据列, 销售订单表的增量抽取作业要把销售数量这个新增列的数据抽取过来, 因此需要重建。
- 修改定期装载脚本 1-sqoop_extract_mysql2hive_daily.sh
- 需要注意的是:一:获取上次拉取id,二是将新添加的字段一同拉取。
- 注意:sqoop需要提前配置开启metastore功能,需要在配置文件中进行提前配置,以便于增量导入数据。也可以将sqoop的metastore数据保存在mysql中。要不然无法通过命令读取sqoop metastore
#!/bin/bash
# @Author: LiYahui
# @Date: Created in 2019/04/12 11:20
# @Description: TODO 修改sqoop增量抽取作业
# @Version: V1.0
#!/bin/bash
# 读取sqoop的元数据信息,前提是已经在sqoop-site.xml配置开启了sqoop元数据
last_value=`sqoop job --show myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://liyahui-02:16000/sqoop | grep incremental.last.value | awk '{print $3}'`
# 删除元数据中的sqoop job
sqoop job --delete myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://liyahui-02:16000/sqoop
#重新创建sqoop-job
sqoop job --meta-connect jdbc:hsqldb:hsql://liyahui-02:16000/sqoop \
--create myjob_incremental_import \
-- import \
--connect "jdbc:mysql://liyahui-02:3306/dw_source_data?useSSL=false&user=root&password=liyahui" \
--table sale_order \
--target-dir /user/root/dw/ods/ods_sale_order \
-m 4 \
--fields-terminated-by "\t" \
--incremental append \
--check-column order_number \
--last-value ${last-value}
修改定期装载ETL hivesql
- 修改数据库模式后, 还要修改已经使用的定期装载HiveQL脚本, 增加对新增数据列的处理。需要对"2-update_source_consumer_dim_scd2_scd1.hql" 和"5-update_dw_sale_order_fact.hql"
part-01 修改source_consumer_dim SCD2过期时间
-- **********************************************
-- 装载客户维度,处理consumer_street_address /已删除记录/consumer_shipping_address SCD2类型
-- 2-update_source_consumer_dim_scd2_scd1.hql part-01部分
-- *********************************************
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1);
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date);
set hivevar:expired_indicator=cast('Expired' as string);
--更新 设置已删除记录和 consumer_street_address 改变的 列上 SCD2过期日期 、consumer_indicator的值为Expired
update
source.source_consumer_dim
set consumer_valid_to=${hivevar:pre_date} ,
consumer_indicator=${hivevar:expired_indicator}
where
source_consumer_dim.consumer_key
in
(
select a.consumer_key
from
-- 选择过期时间为"9999-12-31"的表内容,作为表a
(select source_consumer_dim.consumer_key,consumer_number,consumer_street_address,consumer_shipping_address
from
source.source_consumer_dim
where
consumer_valid_to=${hivevar:max_date}
) a
-- join得出地址变化的列
left join ods.ods_consumer b
on
a.consumer_number=b.customer_number
-- 过滤出删除的列和地址改变的列
where
b.customer_number is null
or
-- <=>处理null值
(!(a.consumer_street_address <=> b.customer_street_address)
or
!(a.consumer_shipping_address<=>b.customer_shipping_address))
);
- 语句说明:
- 同客户地址一样, 新增的送货地址列也是用SCD2新增历史版本。 与建立的数仓--DW--Hadoop数仓实践Case-04-数据定期装载定期装载脚本中相同部分比较, 会发现这里使用了一个新的关系操作符“<=>”, 这是因为原来的脚本中少判断了一种情况。 在源系统库中, 客户地址和送货地址列都是允许为空的, 这样的设计是出于灵活性和容错性的考虑。 我们以送货地址为例进行讨论。
- 使用“t1.shipping_address <> t2.shipping_address”条件判断送货地址是否更改, 根据不等号两边的值是否为空, 会出现以下三种情况:
(1) t1.shipping_address和t2.shipping_address都不为空。 这种情况下如果两者相等则返回false, 说明地址没有变化; 否则返回true, 说明地址改变了, 逻辑正确。
(2) t1.shipping_address和t2.shipping_address都为空。 两者的比较会演变成null<>null, 根据Hive对“<>”操作符的定义, 会返回NULL。 因为查询语句中只会返回判断条件为true的记录, 所以不会返回数据行, 这符合我们的逻辑, 说明地址没有改变。
(3) t1.shipping_address和t2.shipping_address只有一个为空。 就是说地址列从NULL变成非NULL, 或者从非NULL变成NULL, 这种情况明显应该新增一个版本, 但根据“<>”的定义, 此时返回值是NULL, 查询不会返回行, 不符合我们的需求。 - 现在使用“!(a.shipping_address <=> b.shipping_address)”作为判断条件, 我们先看一下Hive里是怎么定义“<=>”操作符的: A <=> B — Returns same result with EQUAL(=)operator for non-null operands, but returns TRUE if both are NULL, FALSE if one of the them is NULL。 从这个定义可知, 当A和B都为NULL时返回TRUE, 其中一个为NULL时返回FALSE, 其他情况与等号返回相同的结果。 下面再来看这三种情况:
(1) t1.shipping_address和t2.shipping_address都不为空。 这种情况下如果两者相等则返回!(true), 即false, 说明地址没有变化, 否则返回!(false), 即true, 说明地址改变了, 符合我们的逻辑。
(2) t1.shipping_address和t2.shipping_address都为空。 两者的比较会演变成!(null<=>null), 根据“<=>”的定义, 会返回!(true), 即返回false。 因为查询语句中只会返回判断条件为true的记录, 所以查询不会返回行, 这符合我们的逻辑, 说明地址没有改变。
(3) t1.shipping_address和t2.shipping_address只有一个为空。 根据“<=>”的定义, 此时会返回!(false), 即true, 查询会返回行, 符合我们的需求。 - 空值的逻辑判断有其特殊性, 为了避免不必要的麻烦, 数据库设计时应该尽量将字段设计成非空, 必要时用默认值代替NULL, 并将此作为一个基本的设计原则。
part-02 处理SCD2新增行
- 开发脚本如下:
-- **********************************************
-- 装载客户维度,处理consumer_street_address SCD2类型 新增行
-- 2-update_source_consumer_dim_scd2_scd1.hql part-02部分
-- *********************************************
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1);
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date);
set hivevar:expired_indicator=cast('Expired' as string);
-- 处理source_consumer_dim的列customer_street_addresses上的新增行 SCD2方式
insert into
source.source_consumer_dim
select
-- 生成新的代理键
row_number() over(order by t1.consumer_number)+t2.sk_max,
t1.consumer_number,t1.consumer_name,t1.consumer_street_address,t1.consumer_zip_code,
t1.consumer_city,t1.consumer_province,
t1.consumer_shipping_address,t1.consumer_zip_code,t1.consumer_shipping_city,
t1.consumer_shipping_province,
t1.consumer_valid_from,t1.consumer_valid_to,
"Current" as consumer_indicator,t1.consumer_version
from
(
select
b.customer_number as consumer_number,
b.customer_name as consumer_name,
b.customer_street_address as consumer_street_address,
b.customer_zip_code as consumer_zip_code,
b.customer_city as consumer_city,
b.customer_state as consumer_province,
b.consumer_shipping_address,
b.consumer_zip_code,
b.consumer_shipping_city,
b.consumer_shipping_province,
a.consumer_version+1 as consumer_version,
${hivevar:pre_date} as consumer_valid_from,
${hivevar:max_date} as consumer_valid_to
from
source.source_consumer_dim a
inner join
ods.ods_consumer b
on
a.consumer_number=b.customer_number and a.consumer_valid_to=${hivevar:pre_date}
left join
source.source_consumer_dim c
on
a.consumer_number=c.consumer_number and c.consumer_valid_to=${hivevar:max_date}
where
(!(a.consumer_street_address <=> b.customer_street_address)
or
!(a.consumer_shipping_address<=>b.customer_shipping_address))
and
c.consumer_key is null
) t1
cross join
(select coalesce(max(consumer_key),0) as sk_max from source.source_consumer_dim) t2;
part-03 处理SCD1
- hive 脚本如下
-- **********************************************
-- 装载客户维度,处理consumer_name SCD1类型
-- 2-update_source_consumer_dim_scd2_scd1.hql part-03部分
-- *********************************************
--处理consumer_name 的SCD1,修改所有记录中的consumer_name
drop table if exists source.source_consumer_tmp;
-- 创建临时过渡表
create table source.source_consumer_tmp
as
select
a.consumer_key,a.consumer_number,
b.customer_name,b.customer_street_address,b.customer_zip_code,
a.consumer_city,a.consumer_province,
a.consumer_shiipping_address,
a.consumer_shipping_zip_code,
a.consumer_shipping_city,
a.consumer_shipping_province,
a.consumer_valid_from,
a.consumer_valid_to,
a.consumer_indicator,
a.consumer_version
from
source.source_consumer_dim a,ods.ods_consumer b
where
a.consumer_number=b.customer_number and !(a.consumer_name <=> b.customer_name);
--删除 source.source_consumer_dim中consumer_name修改的列
delete
from
source.source_consumer_dim
where
consumer_key
in (select consumer_key from source.source_consumer_tmp);
-- 将source.source_consumer_tmp中数据插入到source.source_consumer_dim中;
insert into
source.source_consumer_dim
select * from source.source_consumer_tmp;
- 语句说明:consumer_name 列上的scd1处理只是在select语句中增加了送货地址的四列, 并出于同样的原因使用了“<=>”关系操作符。
part-04 处理ods.ods_consumer新增行
-- **********************************************
-- 装载客户维度,处理ods.ods_consumer 新增行
-- 2-update_source_consumer_dim_scd2_scd1.hql part-04部分
-- *********************************************
-- 设置SCD的生效时间和结束时间
--结束日期
set hivevar:cur_date = current_date();
-- 结束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1);
-- 设置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date);
set hivevar:expired_indicator=cast('Expired' as string);
-- 处理新增的customer数据
insert into
source.source_consumer_dim
select
row_number() over(order by t1.consumer_number)+t2.sk_max,
t1.consumer_number,t1.consumer_name,t1.consumer_street_address,t1.consumer_zip_code,
t1.consumer_city,t1.consumer_province,
t1.consumer_shipping_address,t1.consumer_shipping_zip_code,
t1.consumer_shipping_city,t1.consumer_shipping_province,
${hivevar:pre_date},
${hivevar:max_date},
'Current',
1
from
(
select
a.customer_number as consumer_number,
a.customer_name as consumer_name,
a.customer_street_address as consumer_street_address,
a.customer_zip_code as consumer_zip_code,
a.customer_city as consumer_city,
a.customer_state as consumer_province,
a.customer_shipping_address as consumer_shipping_address,
a.customer_shipping_zip_code as consumer_shipping_code,
a.customer_shipping_city as consumer_shipping_city,
a.customer_shipping_province as consumer_shipping_province
from
ods.ods_consumer a
left join
source.source_consumer_dim b
on
a.customer_number=b.consumer_number
where
b.consumer_key is null
) t1
cross join
(select coalesce(max(consumer_key),0) as sk_max from source.source_consumer_dim) t2
;
合并
- 将part-01 至part-04按照顺序合并即可。
修改dw.sale_order_fact
- 修改5-update_dw_sale_order_fact.hql,添加order_quantity字段,较简单
- 修改脚本如下:
-- ****************************************************
-- @Author: LiYahui
-- @Date: Created in 2019/04/13 13:30
-- @Description: TODO 定期装载dw.sale_order_fact
-- @Version: V1.0
-- ****************************************************
-- 装载订单事实表
insert into
dw.sale_order_fact
select
order_key,
consumer_key,
product_key,
day_key,
order_amount,
order_quantity
from
ods.ods_sale_order a,
source.source_order_dim b,
source.source_consumer_dim c,
source.source_product_dim d,
source.source_date_dim e,
ods.ods_cdc_time f
where
a.order_number=b.order_key
and
a.customer_number=c.consumer_number
and
a.order_date >=c.consumer_valid_from
and
a.entry_date < c.consumer_valid_to
and
a.product_code=d.product_code
and
a.order_date > d.product_valid_from
and
a.order_date < d.product_valid_to
and
to_date(a.order_date) = e.day_key
and
a.entry_date >= f.last_load
and
a.entry_date < f.current_load
;
总结
- 关于sqoop的配置,需要提前配置sqoop的元数据信息,才能通过sqoop job的形式查询到增量导入的最大id。
- 将修改过的脚本重新整合,进行修改。