由于商品信息表和超市分店信息表的数据量不大,且基本无改动,因此可以选择全量更新的方式将数据加载到数据仓库。而来自各超市分店的商品销售清单的数据量很大,且每天会有新插入的数据记录,因此,在将数据加载到数据仓库时,可以选择增量加载方式
在本实例中,对于数据仓库的存储,采用 HDFS 和 Hive,在 ETL 过程中,使用 HiveQL。图 3-19 为各级数据表的关系。
首先,在 Hive 中,创建数据库接入层对应的表,代码如下:
# 切换到hadoop用户
su hadoop
# 进入到hive
hive
-- 创建超市分店信息表
DROP TABLE IF EXISTS ods_market_info;
create table ods_market_info(
market_id string comment '超市分店编号',
market_address string comment '超市分店地址',
start_time string comment '有效期起始时间',
end_time string comment '有效期终止时间',
market_name string comment '超市分店名称',
create_time string comment '创建时间',
update_time string comment '更新时间'
)
partitioned by(dt string)
row format delimited fields terminated by '\t';
--创建商品信息表
DROP TABLE IF EXISTS ods_product_info;
CREATE TABLE ods_product_info(
product_id int comment '商品id',
type_name string comment '类别名',
supplier_phone string comment '供应商手机号',
supplier_address string comment '供应商地址',
product_price string comment '商品价格',
product_desc string comment '商品说明',
start_time string comment '有效期起始时间',
end_time string comment '有效期终止时间',
product_name string comment '商品名称',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '商品信息表'
partitioned by(dt string)
row format delimited fields terminated by '\t';
--创建清单记录表
DROP TABLE IF EXISTS ods_sale_info;
CREATE TABLE ods_sale_info(
order_id string comment '清单号',
order_status string comment '清单状态',
market_id string comment' 超市分店编号',
product_num int comment '商品数量',
product_id int comment '商品id',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '清单记录表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
# 查看到新建成功的表
hive> show tables;
OK
course
ods_market_info
ods_product_info
ods_sale_info
stu
stu1
Time taken: 0.026 seconds, Fetched: 6 row(s)
批量造 mysql 表的数据,采用存储过程的方式
mysql 中创建业务关系表,product_info(商品信息表)、market_info(超市分店信息表)、sale_info(清单记录表)
mysql -uroot -p
use hive
-- 创建商品信息表,以id为主键
create table product_info(
id int(10) not null auto_increment primary key,
product_id int comment '商品id',
type_name varchar(100) comment '类别名',
supplier_phone varchar(100) comment '供应商手机号',
supplier_address varchar(100) comment '供应商地址',
product_price varchar(100) comment '商品价格',
product_desc varchar(100) comment '商品说明',
start_time varchar(100) comment '有效期起始时间',
end_time varchar(100) comment '有效期终止时间',
product_name varchar(100) comment '商品名称',
create_time varchar(100) comment '创建时间',
update_time varchar(100) comment '更新时间'
) engine=innodb default charset=utf8;
-- 创建超市分店信息表
create table market_info(
id int(10) not null auto_increment primary key,
market_id varchar(100) comment '超市分店编号',
market_address varchar(100) comment '超市分店地址',
start_time varchar(100) comment '有效期起始时间',
end_time varchar(100) comment '有效期终止时间',
market_name varchar(100) comment '超市分店名称',
create_time varchar(100) comment '创建时间',
update_time varchar(100) comment '更新时间'
) engine=innodb default charset=utf8;
--创建清单记录表
create table sale_info(
id int(10) not null auto_increment primary key,
order_id varchar(100) comment '清单号',
order_status varchar(100) comment '清单状态',
market_id varchar(100) comment' 超市分店编号',
product_num int comment '商品数量',
product_id int comment '商品id',
create_time varchar(100) comment '创建时间',
update_time varchar(100) comment '更新时间'
) engine=innodb default charset=utf8;
# 查看到各个新建的三个表
mysql> show tables;
| market_info |
| product_info |
| sale_info |
+-------------------------------+
77 rows in set (0.01 sec)
mysql -uroot -p
use hive
insert into market_info(market_id,market_address,start_time,end_time,market_name,create_time,update_time) values ('1000001','湖南省长沙市开福区万达广场1021号','2021-12-16','2028-12-17','大润发开福万达店','2021-12-12','2021-12-12');
insert into market_info(market_id,market_address,start_time,end_time,market_name,create_time,update_time) values ('1000002','湖南省长沙市岳麓区万达广场1021号','2021-12-16','2028-12-17','大润发岳麓万达店','2021-12-12','2021-12-12');
insert into market_info(market_id,market_address,start_time,end_time,market_name,create_time,update_time) values ('1000003','湖南省长沙市雨花区万达广场1021号','2021-12-16','2028-12-16','大润发雨花万达店','2021-12-12','2021-12-12');
mysql -uroot -p
use hive
drop procedure insert_product_info;
delimiter //
create procedure insert_product_info(type_name varchar(100),product_price varchar(100),start_time varchar(100),end_time varchar(100),create_time varchar(100),update_time varchar(100),num int)
begin
declare str char(62) default 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
declare product_name char(100);
declare product_id int;
declare i int default 0;
while i<= num DO
-- 生成商品名称随机数
set product_name=concat("商品名称",substring(str,1+floor(rand()*61),2),substring(str,1+floor(rand()*61),3));
-- 生成商品ID随机数
set product_id = floor(rand()*1000);
set i=i+1;
INSERT INTO `hive`.`product_info` (`product_id`, `type_name`, `supplier_phone`, `supplier_address`, `product_price`, `product_desc`, `start_time`, `end_time`, `product_name`, `create_time`, `update_time`) VALUES (product_id, type_name, '18576759590', '湖南省常德市', product_price, '产品描述', start_time, end_time, product_name, create_time, update_time);
end while;
end;
//
# 下面这种方式调用,后面的100就是插入100条数据
mysql> call insert_product_info('食品','50','2021-12-16', '2022-12-17', '2021-12-15','2021-12-15',100) //
mysql> call insert_product_info('酒水','100','2021-12-16', '2022-12-17', '2021-12-15','2021-12-15',100) //
# 查询到各个插入成功的数据
mysql> select count(*) from product_info;//
+----------+
| count(*) |
+----------+
| 202 |
+----------+
mysql -uroot -p
use hive
drop procedure insert_sale_info;
delimiter //
create procedure insert_sale_info(order_status varchar(10),market_id varchar(100),product_num int,product_id int,create_time varchar(100),update_time varchar(100),num int)
begin
declare order_id int;
declare i int default 0;
while i<= num DO
set i=i+1;
-- 随机生成订单id
set order_id = floor(rand()*100);
INSERT INTO `hive`.`sale_info` (`order_id`, `order_status`, `market_id`, `product_num`, `product_id`, `create_time`, `update_time`) VALUES (order_id, order_status, market_id, product_num, product_id, create_time, update_time);
end while;
end;
//
# 注意//这个分隔符,是区分存储过程的,调用存储过程注意market_id,product_id的值,要从相应的超市分店,商品信息表中找到对应数据
mysql> call insert_sale_info('待付款','1000001',5, 221,'2021-12-15','2021-12-15',100) //
mysql> call insert_sale_info('已付款','1000002',10, 182, '2021-12-14','2021-12-14',100) //
# 查询到刚刚插入的数据
mysql> select count(*) from sale_info;//
+----------+
| count(*) |
+----------+
| 203 |
+----------+
1 row in set (0.00 sec)
wget https://github.com/alibaba/DataX/archive/master.zip
unzip DataX-master.zip
sudo wget --no-check-certificate https://dlcdn.apache.org/maven/maven-3/3.8.4/binaries/apache-maven-3.8.4-bin.tar.gz
tar -zxvf apache-maven-3.8.4-bin.tar.gz
# 配置maven环境变量
vi /etc/profile
export M2_HOME=/usr/local/apache-maven-3.8.4 //本地maven安装home目录
export PATH=$PATH:$M2_HOME/bin
# 生效环境变量设置
source /etc/profile
vi settings.xml
进行如下修改 -- 设置仓库地址
<localRepository>/usr/local/apache-maven-3.8.4/repo</localRepository>
-- 设置阿里云镜像
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>central</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
[root@VM-24-13-centos resp]# mvn -version
Apache Maven 3.8.4 (9b656c72d54e5bacbed989b64718c159fe39b537)
Maven home: /usr/local/apache-maven-3.8.4
Java version: 1.8.0_311, vendor: Oracle Corporation, runtime: /usr/local/jdk1.8.0_311/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1160.11.1.el7.x86_64", arch: "amd64", family: "unix"
<mysql.driver.version>8.0.26</mysql.driver.version>
<!-- reader -->
<module>mysqlreader</module>
<module>hdfsreader</module>
<module>streamreader</module>
<!-- writer -->
<module>mysqlwriter</module>
<module>hdfswriter</module>
<module>streamwriter</module>
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>
<module>hbase20xsqlreader</module>
<module>hbase20xsqlwriter</module>
<module>kuduwriter</module>
<properties>
<hive.version>3.1.2</hive.version>
<hadoop.version>3.0.3</hadoop.version>
</properties>
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
WARNING] Assembly file: /usr/local/DataX-master/target/datax is not a regular file (it may be a directory). It cannot be attached to the project build for installation or deployment.
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for datax-all 0.0.1-SNAPSHOT:
[INFO] kuduwriter ......................................... SUCCESS [ 2.148 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[root@VM-24-13-centos target]# cp datax.tar.gz /usr/local/
cd /usr/local/
tar -zxvf datax.tar.gz
创建分区信息,手动创建分区路径
不过奇怪的是我用下面命令的方式创建,用 datax 导入报错找不到创建的分区
hdfs dfs -mkdir -p /user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-21
insert into ods_market_info partition(dt = '2021-12-21')
# 切换用户
su hadoop
# 进入到hive模式
hive
# 使用hive数据库
use hive;
# 手动插入分区信息内容
hive>insert into ods_market_info partition(dt = '2021-12-21') values ('111','222','33','44','55','66','77');
# 查看到刚刚插入的信息
hive>select * from ods_market_info;
# 新增完后,可以删除表中数据,也可不删
hive>truncate table ods_market_info;
hive>exit;
[hadoop@VM-24-13-centos root]$ hadoop fs -ls /user/hive/warehouse/hive.db/ods_market_info/
drwxr-xr-x - hadoop supergroup 0 2021-12-22 15:57 /user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-21
drwxr-xr-x - hadoop supergroup 0 2021-12-22 15:57 /user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-22
[root@VM-24-13-centos job]# ls
job.json mysql_hive_ods_market_info.json
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"market_id",
"market_address",
"start_time",
"end_time",
"market_name",
"create_time",
"update_time"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://localhost:3306/hive"
],
"table": [
"market_info"
]
}
],
"password": "hive1234",
"username": "hive"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://localhost:9000",
"fileType": "text",
"path": "/user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-21",
"fileName": "ods_market_info",
"column": [{
"name": "market_id",
"type": "string"
},
{
"name": "market_address",
"type": "string"
},
{
"name": "start_time",
"type": "string"
},
{
"name": "end_time",
"type": "string"
},
{
"name": "market_name",
"type": "string"
},
{
"name": "create_time",
"type": "string"
},
{
"name": "update_time",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
}
}
}]
}
}
hive> show create table hive.ods_market_info;
.....
LOCATION
'hdfs://localhost:9000/user/hive/warehouse/hive.db/ods_market_info'
....执行命令后在结果中可以看到 LOCATOIN,就是 hive 在 hdfs 中的存储目录。填写到 writer 下的 path 中,dt 就是刚刚创建的分区
# 回到root用户
[hadoop@VM-24-13-centos root]$ su
[root@VM-24-13-centos ~]# cd /usr/local/datax/bin/
# DHADOOP_USER_NAME一定要用hadoop用户,用其他用户会报错没有权限
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_market_info.json
[root@VM-24-13-centos bin]# su hadoop
[hadoop@VM-24-13-centos bin]$ hive
hive> select * from hive.ods_market_info;
000001 湖南省长沙市开福区万达广场1021号 2021-12-16 00:00:00 2028-12-16 23:59:59 大润发开福万达店 2021-12-12 16:00:00 2021-12-12 16:00:00 2021-12-21
1000002 湖南省长沙市岳麓万达广场1021号 2021-12-16 00:00:00 2028-12-16 23:59:59 大润发岳麓万达店 2021-12-12 16:00:00 2021-12-12 16:00:00 2021-12-21
# 切换用户
su hadoop
# 进入到hive模式
hive
# 使用hive数据库
use hive;
# 手动插入分区信息内容
hive>insert into ods_product_info partition(dt = '2021-12-21') VALUES (11, '222', '18576759590', '湖南省常德市', '222', '产品描述', '333', '444', '555', '666', '77');
# 查看到刚刚插入的信息
hive>select * from ods_product_info;
# 新增完后,可以删除表中数据,也可不删
hive>truncate table ods_product_info;
hive>exit;
[hadoop@VM-24-13-centos root]$ hadoop fs -ls /user/hive/warehouse/hive.db/ods_product_info/
drwxr-xr-x - hadoop supergroup 0 2021-12-23 09:31 /user/hive/warehouse/hive.db/ods_product_info/dt=2021-12-21
[root@VM-24-13-centos job]# ls
-rwxrwxrwx 1 root root 1587 Dec 21 18:05 job.json
-rw-r--r-- 1 root root 1861 Dec 22 15:54 mysql_hive_ods_market_info.json
-rw-r--r-- 1 root root 1861 Dec 23 09:36 mysql_hive_ods_product_info.json
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"product_id",
"type_name",
"supplier_phone",
"supplier_address",
"product_price",
"product_desc",
"start_time",
"end_time",
"product_name",
"create_time",
"update_time",
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://localhost:3306/hive"
],
"table": [
"product_info"
]
}
],
"password": "hive1234",
"username": "hive"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://localhost:9000",
"fileType": "text",
"path": "/user/hive/warehouse/hive.db/ods_product_info/dt=2021-12-21",
"fileName": "ods_product_info",
"column": [{
"name": "product_id",
"type": "int"
},
{
"name": "type_name",
"type": "string"
},
{
"name": "supplier_phone",
"type": "string"
},
{
"name": "supplier_address",
"type": "string"
},
{
"name": "product_price",
"type": "string"
},
{
"name": "product_desc",
"type": "string"
},
{
"name": "start_time",
"type": "string"
},
{
"name": "end_time",
"type": "string"
},
{
"name": "product_name",
"type": "string"
},
{
"name": "create_time",
"type": "string"
},
{
"name": "update_time",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
}
}
}]
}
}
# 回到root用户
[hadoop@VM-24-13-centos root]$ su
[root@VM-24-13-centos ~]# cd /usr/local/datax/bin/
# DHADOOP_USER_NAME一定要用hadoop用户,用其他用户会报错没有权限
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_product_info.json
任务启动时刻 : 2021-12-23 10:30:52
任务结束时刻 : 2021-12-23 10:31:05
任务总计耗时 : 12s
任务平均流量 : 2.22KB/s
记录写入速度 : 20rec/s
读出记录总数 : 201
读写失败总数 : 0
su hadoop
hive
use hive;
hive> select count(product_id) from ods_product_info;
Total MapReduce CPU Time Spent: 0 msec
OK
201
Time taken: 2.154 seconds, Fetched: 1 row(s)
# 切换用户
su hadoop
# 进入到hive模式
hive
# 使用hive数据库
use hive;
# 手动插入分区信息内容
hive>insert into ods_sale_info partition(dt = '2021-12-21') values (1, '222', '333', 4, 55, '666', '77');
# 查看到刚刚插入的信息
hive>select * from ods_sale_info;
# 新增完后,可以删除表中数据,也可不删
hive>truncate table ods_sale_info;
hive>exit;
[hadoop@VM-24-13-centos root]$ hadoop fs -ls /user/hive/warehouse/hive.db/ods_sale_info/
drwxr-xr-x - hadoop supergroup 0 2021-12-23 11:09 /user/hive/warehouse/hive.db/ods_sale_info/dt=2021-12-21
[root@VM-24-13-centos job]# ls
-rwxrwxrwx 1 root root 1587 Dec 21 18:05 job.json
-rw-r--r-- 1 root root 1861 Dec 22 15:54 mysql_hive_ods_market_info.json
-rw-r--r-- 1 root root 2267 Dec 23 10:28 mysql_hive_ods_product_info.json
-rw-r--r-- 1 root root 2267 Dec 23 11:06 mysql_hive_ods_sale_info.json
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"order_id",
"order_status",
"market_id",
"product_num",
"product_id",
"create_time",
"update_time"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://localhost:3306/hive"
],
"table": [
"sale_info"
]
}
],
"password": "hive1234",
"username": "hive"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://localhost:9000",
"fileType": "text",
"path": "/user/hive/warehouse/hive.db/ods_sale_info/dt=2021-12-21",
"fileName": "ods_sale_info",
"column": [{
"name": "order_id",
"type": "string"
},
{
"name": "order_status",
"type": "string"
},
{
"name": "market_id",
"type": "string"
},
{
"name": "product_num",
"type": "int"
},
{
"name": "product_id",
"type": "int"
},
{
"name": "create_time",
"type": "string"
},
{
"name": "update_time",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
}
}
}]
}
}
# 回到root用户
[hadoop@VM-24-13-centos root]$ su
[root@VM-24-13-centos ~]# cd /usr/local/datax/bin/
# DHADOOP_USER_NAME一定要用hadoop用户,用其他用户会报错没有权限
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_sale_info.json
任务启动时刻 : 2021-12-23 11:17:21
任务结束时刻 : 2021-12-23 11:17:34
任务总计耗时 : 12s
任务平均流量 : 1.07KB/s
记录写入速度 : 20rec/s
读出记录总数 : 202
读写失败总数 : 0
su hadoop
hive
use hive;
hive> select count(order_id) from ods_sale_info;
Stage-Stage-1: HDFS Read: 27232 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
202
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_sale_info.json
su hadoop
hive
use hive;
hive> select count(order_id) from ods_sale_info;
Stage-Stage-1: HDFS Read: 27232 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
404
su hadoop
hive
use hive;
hive>drop table if exists tmp_ods_to_dwd_sale_info;
create table tmp_ods_to_dwd_sale_info
as select a.order_id,a.order_status,a.market_id,a.product_num,a.product_id,a.create_time,a.update_time from
(select order_id,order_status,market_id,product_num,product_id,create_time,update_time, ROW_NUMBER() OVER(partition by order_id order BY create_time DESC) rn FROM ods_sale_info) a
WHERE a.rn=1;
# 查看到的只有85条数据
hive> select count(*) from tmp_ods_to_dwd_sale_info;
OK
85
# 切换到hadoop用户
su hadoop
# 进入到hive
hive
-- 创建超市分维度表
DROP TABLE IF EXISTS dw_dim_market_info;
create table dw_dim_market_info(
market_id string comment '超市分店编号',
market_address string comment '超市分店地址',
effective_date string comment '有效期起始时间',
expriry_date string comment '有效期终止时间',
market_name string comment '超市分店名称'
) comment '创建超市分维度表'
partitioned by(dt string)
row format delimited fields terminated by '\t';
--创建商品维度表
DROP TABLE IF EXISTS dw_dim_product_info;
CREATE TABLE dw_dim_product_info(
product_id int comment '商品id',
type_name string comment '类别名',
supplier_phone string comment '供应商手机号',
supplier_address string comment '供应商地址',
product_price string comment '商品价格',
product_desc string comment '商品说明',
effective_date string comment '有效期起始时间',
expriry_date string comment '有效期终止时间',
product_name string comment '商品名称'
) comment '商品维度表'
partitioned by(dt string)
row format delimited fields terminated by '\t';
--创建日期维度表
DROP TABLE IF EXISTS dw_dim_date_info;
CREATE TABLE dw_dim_date_info(
date_id string comment '日期id',
year_value string comment '年',
month_value string comment'月',
day_value string comment '日',
date_value string comment '年-月-日',
is_weekend string comment '是否周末', -- 0表示非周末,1表示周末
day_of_week string comment '一周中的周几'
) comment '日期维度表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
--创建销售事实表
DROP TABLE IF EXISTS dwd_sale_fact;
CREATE TABLE dwd_sale_fact(
order_id string comment '清单号',
order_status string comment '清单状态',
market_id string comment' 超市分店编号',
date_id string comment '日期id',
product_num int comment '商品数量',
product_id int comment '商品id',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '销售事实表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
# 查看到新建成功的表
hive> show tables;
OK
course
dw_dim_date_info
dw_dim_market_info
dw_dim_product_info
dwd_sale_fact
ods_market_info
ods_product_info
ods_sale_info
stu
stu1
tmp_ods_to_dwd_sale_info
tmp_ods_to_dwd_sale_info
中的create_time
有关联关系,要造一些相等条件的数据su hadoop
hive
hive>use hive;
hive>insert into dw_dim_date_info partition(dt = '2021-12-21') VALUES ('2021122101', '2021', '12','15','2021-12-15', '0', '51');
hive>insert into dw_dim_date_info partition(dt = '2021-12-21') VALUES ('2021122102', '2021', '12', '24','2021-12-24', '0', '51');
hive>insert into dw_dim_date_info partition(dt = '2021-12-21') VALUES ('2021122103', '2021', '12', '25','2021-12-25', '1', '51');
hive> select * from dw_dim_date_info;
OK
2021122101 2021 12 15 2021-12-15 0 51 2021-12-21
2021122102 2021 12 24 2021-12-24 0 51 2021-12-21
2021122103 2021 12 25 2021-12-25 1 51 2021-12-21
hive>insert into dw_dim_market_info partition(dt = '2021-12-21') select market_id,market_address,market_name,start_time as effective_date,end_time as expiry_date
from ods_market_info;
hive> select * from dw_dim_market_info;
OK
1000001 湖南省长沙市开福区万达广场1021号 大润发开福万达店 2021-12-16 2028-12-17 2021-12-21
1000002 湖南省长沙市岳麓区万达广场1021号 大润发岳麓万达店 2021-12-16 2028-12-17 2021-12-21
1000003 湖南省长沙市雨花区万达广场1021号 大润发雨花万达店 2021-12-16 2028-12-16 2021-12-21
hive>insert into dw_dim_product_info partition(dt = '2021-12-21') select product_id,product_name,type_name,supplier_phone,supplier_address,product_price,product_desc,start_time as effective_date,end_time as expiry_date
from ods_product_info;
hive> select * from dw_dim_product_info;
OK
221 商品名称78ABC 食品 18576759590 湖南省常德市 50 产品描述 2021-12-16 2022-12-17 2021-12-21
545 商品名称XYyzA 食品 18576759590 湖南省常德市 50 产品描述 2021-12-16 2022-12-17 2021-12-21
639 商品名称GHdef 食品 18576759590 湖南省常德市 50 产品描述 2021-12-16 2022-12-17 2021-12-21
459 商品名称cdtuv 食品 18576759590 湖南省常德市 50 产品描述 2021-12-16 2022-12-17 2021-12-21
tmp_ods_to_dwd_sale_info
表是上述处理重复销售清单记录表的过滤后的临时表hive>insert into dwd_sale_fact partition(dt = '2021-12-21') select a.order_id,a.order_status,a.market_id,b.date_id,a.product_num,a.product_id,a.create_time,a.update_time
from tmp_ods_to_dwd_sale_info a
inner join dw_dim_date_info b
on a.create_time=b.date_value;
# 查询到2021-12-15的关联数据
hive> select * from dwd_sale_fact;
OK
11 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
12 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
14 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
16 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
17 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
20 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
....
-- 创建DWS层清单记录表
drop table if exists dws_order_info;
create table dws_order_info (
order_id string comment '清单号',
order_status string comment '清单状态',
market_id string comment' 超市分店编号',
include_product_a int comment '是否包括商品A',
date_id string comment '日期id',
a_num int comment '商品A数量',
product_info string comment '商品信息',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '清单记录表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
-- 创建中间表,添加is_product_a字段
drop table if exists tmp_dwd_to_dws_order_info;
create table tmp_dwd_to_dws_order_info as select
order_id,order_status,market_id,date_id,
Case
when product_id=221 then 1
else 0
end as is_product_a, -- 是否为商品A
case
when product_id=221 then product_num
else 0
end as a_num, -- 商品A的数量
product_id,
product_num,
create_time,
update_time
from dwd_sale_fact;
# 查询到以及过滤的product_id为221的清单数据
hive> select * from tmp_dwd_to_dws_order_info;
OK
11 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
12 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
14 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
16 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
17 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
20 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
22 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
23 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
24 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
25 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
-- 按照清单号进行清单数据汇总
hive>insert into dws_order_info partition(dt = '2021-12-21')
select order_id,order_status,market_id,date_id,
case
when sum(is_product_a)>0 then 1
else 0
end as include_product_a,
sum(a_num) as a_num,
concat_ws('_',collect_list(cast(product_id as string)),collect_list(cast(product_num as string))) as product_info,
create_time,update_time
from tmp_dwd_to_dws_order_info group by order_id,order_status,market_id,date_id,create_time,update_time;
hive> select * from dws_order_info;
OK
11 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21
12 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21
14 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21
16 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21
17 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21
20 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21
product_info
就有这样的问题drop table if exists dwn_order_info_by_day;
create table dwn_order_info_by_day
as select
count(distinct c.order_id) as consumption_num, -- 商品A销售清单
sum(c.a_num) as day_num, -- 商品A消费总数
sum(c.include_product_a)/count(distinct c.order_id) as buy_a_rate -- 购买商品A的消费比例
from
(
select
a.order_id as order_id,
a.a_num as a_num,
a.include_product_a as include_product_a,
b.year_value as year_value,
b.month_value as month_value,
b.day_value as day_value
from dws_order_info a
left join dw_dim_date_info b on a.date_id=b.date_id) c
group by c.day_value;
-- 查询到商品A的购买数据记录
hive> select * from dwn_order_info_by_day;
OK
62 310 2.021122101E9
Time taken: 0.098 seconds, Fetched: 1 row(s)