1.需求描述—统计video影音视频网站的常规指标,各种TopN指标:
-1-统计视频观看数Top5 -2-统计视频类别热度Top5 -3-统计出视频观看数最高的5个视频的所属类别以及类别包含Top5视频的个数
-4-统计视频观看数Top5所关联视频的所属类别Rank(排序) -5-统计每个类别中的视频热度Top5 -6-统计每个类别中视频流量Top5
-7-统计上传视频最多的用户Top5以及他们上传的观看次数在前5的视频 -8-统计每个类别视频观看数Top5
2.项目 1)数据结构
(1)视频表
字段 |
备注 |
详细描述 |
video_id |
视频唯一id |
11位字符串 |
up_loader |
视频上传者 |
上传视频的用户名String |
age |
视频年龄 |
视频在平台上的整数天 |
category |
视频类别 |
上传视频指定的视频分类 |
length |
视频长度 |
整形数字标识的视频长度 |
views |
观看次数 |
视频被浏览的次数 |
rate |
视频评分 |
满分5分 |
Ratings |
流量 |
视频的流量,整型数字 |
conments |
评论数 |
一个视频的整数评论数 |
related ids |
相关视频id |
相关视频的id,最多20个 |
(2)用户表
字段 |
备注 |
字段类型 |
up_loader |
上传者用户名 |
string |
videos |
上传视频数 |
int |
friends |
朋友数量 |
int |
2) ETL原始数据
通过观察原始数据形式,发现视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。
—Java中处理数据—
(1)ETL之ETLUtil
public class ETLUtil {
public static String oriString2ETLString(String ori){
StringBuilder etlString = new StringBuilder();
String[] splits = ori.split("\t");
if(splits.length < 9) return null;
splits[3] = splits[3].replace(" ", "");
for(int i = 0; i < splits.length; i++){
if(i < 9){
if(i == splits.length - 1){
etlString.append(splits[i]);
}else{
etlString.append(splits[i] + "\t");
}
}else{
if(i == splits.length - 1){
etlString.append(splits[i]);
}else{
etlString.append(splits[i] + "&");
}
}
}
return etlString.toString();
}
}
(2)ETL之Mapper (包:IOException、StringUtils、NullWritable、Text、Mapper、ETLUtil)
public class VideoETLMapper extends Mapper<Object, Text, NullWritable, Text>{
Text text = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String etlString = ETLUtil.oriString2ETLString(value.toString());
if(StringUtils.isBlank(etlString)) return;
text.set(etlString);
context.write(NullWritable.get(), text);
}
}
(3)ETL之Runner (包:IOException、Configuration、FileSystem、Path、NullWritable、Text、Job、FileInputFormat、FileOutputFormat、Tool、ToolRunner)
public class VideoETLRunner implements Tool {
private Configuration conf = null;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public int run(String[] args) throws Exception {
conf = this.getConf();
conf.set("inpath", args[0]);
conf.set("outpath", args[1]);
Job job = Job.getInstance(conf);
job.setJarByClass(VideoETLRunner.class);
job.setMapperClass(VideoETLMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
this.initJobInputPath(job);
this.initJobOutputPath(job);
return job.waitForCompletion(true) ? 0 : 1;
}
private void initJobOutputPath(Job job) throws IOException {
Configuration conf = job.getConfiguration();
String outPathString = conf.get("outpath");
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(outPathString);
if(fs.exists(outPath)){
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
}
private void initJobInputPath(Job job) throws IOException {
Configuration conf = job.getConfiguration();
String inPathString = conf.get("inpath");
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(inPathString);
if(fs.exists(inPath)){
FileInputFormat.addInputPath(job, inPath);
}else{
throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString);
}
}
public static void main(String[] args) {
try {
int resultCode = ToolRunner.run(new VideoETLRunner(), args);
if(resultCode == 0){
System.out.println("Success!");
}else{
System.out.println("Fail!");
}
System.exit(resultCode);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
(4)执行ETL
[ ]# bin/yarn jar ~/softwares/jars/video-0.0.1-SNAPSHOT.jar/
com.atguigu.etl.ETLVideosRunner/
/video/2008/0222/
/output/video/2008/0222
3. 准备工作 1)创建表
创建表:video_ori,video_user_ori 创建表:video_orc,video_user_orc
(1)video_ori
create table video_ori(video_Id string,up_loader string,age int,
category array<string>,length int,views int,rate float,ratings int,comments int,relatedId array<string>)
row format delimited fields terminated by "\t" collection items terminated by "&" stored as textfile;
(2)video_user_ori
create table video_user_ori(up_loader string,videos int,friends int)
row format delimited fields terminated by "\t" stored as textfile;
—然后把原始数据插入到orc表中—
(3)video_orc:
create table video_orc(video_Id string,up_loader string,age int,
category array<string>,length int,views int,rate float,ratings int,comments int,relatedId array<string>)
row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc;
(4)video_user_orc:
create table video_user_orc(up_loader string,videos int,friends int)
row format delimited fields terminated by "\t" stored as orc;
2)导入ETL后的数据
(1)video-ori: load data inpath "/output/video/2008/0222" into table video_ori;
(2)video_user_ori: load data inpath "/video/user/2008/0903" into table video_user_ori;
3)向ORC表插入数据
(1)video-orc: insert into table video_orc select * from video_ori;
(2)video_user_orc: insert into table video_user_orc select * from video_user_ori;
4.业务分析
-1-统计视频观看数Top5 (思路:使用order by按照views字段做一个全局排序,同时设置只显示前5条。)
select video_Id, up_loader, age, views from video_ori order by views desc limit 5;
-2-统计视频类别热度Top5 (思路:按照类别group by聚合,然后count组内的videoId个数,按照热度排序,显示前5条
因为当前表结构为:一个视频对应一个或多个类别。所以如果要group by类别,需要先将类别进行列转行(展开),然后再进行count)
select category_name as category, count(t1.video_Id) as hot from (select video_Id,category_name
from video_ori lateral view explode(category) t_catetory as category_name) t1
group by t1.category_name order by hot desc limit 5;
-3-统计出视频观看数最高的5个视频的所属类别以及类别包含Top5视频的个数 (思路:先找到观看数最高的20个视频所属条目的所有信息,降序排列
再把这20条信息中的category分裂出来(列转行),最后查询视频分类名称和该分类下有多少个Top5的视频)
select category_name as category, count(t2.video_Id) as hot_with_views from (
select video_Id, category_name from (select *from video_ori order by views desc limit 5)
t1 lateral view explode(category) t_catetory as category_name) t2 group by category_name order by hot_with_views desc;
-4-统计视频观看数Top5所关联视频的所属类别排序 (思路:查询出观看数最多的前5个视频的所有信息(当然包含了每个视频对应的关联视频),记为临时表t1,
将找到的5条视频信息的相关视频relatedId列转行,记为临时表t2,将相关视频的id和video_orc表进行inner join操作,按照视频类别进行分组,统计每组视频个数,然后排行)
t1表:
select *from video_orc order by views desc limit 5;
t2表:
select explode(relatedId) as videoId from t1;
t5表:
(select distinct(t2.video_Id),t3.category from t2 inner join video_orc t3 on t2.video_Id = t3.video_Id)
t4 lateral view explode(category) t_catetory as category_name;
最后代码:
select category_name as category, count(t5.video_Id) as hot from (select video_Id, category_name
from (select distinct(t2.video_Id), t3.category from (select explode(relatedId) as video_Id
from (select *from video_ori order by views desc limit 5) t1) t2 inner join video_ori t3 on t2.video_Id = t3.video_Id) t4 lateral
view explode(category) t_catetory as category_name) t5 group by category_name order by hot desc;
-5-统计每个类别中的视频热度Top5,以xy为例 (思路: 要想统计xy类别中的视频热度Top5,需要先找到xy类别,那么就需要将category展开,所以可以创建一张表用于存放categoryId展开的数据。 向category展开的表中插入数据。统计对应类别(xy)中的视频热度。)
1.创类别(展开)表:
create table video_category(video_Id string, up_loader string, age int,
categoryId string, length int, views int, rate float, ratings int, comments int, relatedId array<string>)
row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc;
2.向类别表中插入数据:
insert into table video_category select video_Id,up_loader,age,categoryId,length,views,rate,
ratings,comments,relatedId from video_ori lateral view explode(category) catetory as categoryId;
3.统计xy类别的Top5(也可以统计其他)
select video_Id, views from video_category where categoryId = "xy" order by views desc limit 5;
-6-统计每个类别中视频流量Top5,以dz为例 (思路: 创建视频类别展开表(categoryId列转行后的表),按照ratings排序)
1.创类别(展开)表:
create table video_ratings(video_Id string, up_loader string, age int,
categoryId string, length int, views int, rate float, ratings int, comments int, relatedId array<string>)
row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc;
2.向类别表中插入数据:
insert into table video_ratings select video_Id,up_loader,age,categoryId,length,
views,rate,ratings,comments,relatedId from video_ori lateral view explode(category) catetory as categoryId;
3. 按照ratings排序,以dz为例
select video_Id,views,ratings from video_ratings where categoryId = "dz" order by ratings desc limit 5;
-7-统计上传视频最多的用户Top5以及他们上传的观看次数在前5的视频 (思路:先找到上传视频最多的5个用户的用户信息,通过up_loader字段与video_orc表进行join,得到的信息按照views观看次数进行排序即可。)
1.找到上传视频最多的5个用户的用户信息
select * from video_user_ori order by videos desc limit 5;
2. 通过up_loader字段与video_ori表进行join,得到的信息按照views观看次数进行排序
select t2.video_Id, t2.views,t2.ratings,t1.videos,t1.friends from (select * from video_user_ori order by videos desc limit 5) t1
join video_ori t2 on t1.up_loader = t2.up_loader order by views desc limit 5;
-8-统计每个类别视频观看数Top5 (思路:先得到categoryId展开的表数据,子查询按照categoryId进行分区,然后分区内排序,并生成递增数字,该递增数字这一列起名为rank列,通过子查询产生的临时表,查询rank值小于等于5的数据行)
最终代码:
select t1.*from (select video_Id,categoryId,views,
row_number() over(partition by categoryId order by views desc) rank from video_category) t1 where rank <= 10;
原文:https://www.cnblogs.com/Hinine-zyx5852/p/11534718.html