一、需求描述
统计Youtube视频网站的常规指标,各种TopN指标:
--统计视频观看数Top10
--统计视频类别热度Top10
--统计视频观看数Top20所属类别包含这Top20视频的个数
--统计视频观看数Top50所关联视频的所属类别Rank
--统计每个类别中的视频热度Top10
--统计每个类别中视频流量Top10
--统计上传视频最多的用户Top10以及他们上传的视频
--统计每个类别视频观看数Top10
二、知识储备梳理
2.1、order by,sort by,distribute by,cluster by
背景表结构
在讲解中我们需要贯串一个 例子,所以需要设计一个情景,对应 还要有一个表结构和填充数据。如下:有3个字段,分别为personId标识某一个人,company标识一家公司名称,money标识该公司每年盈利收入(单位:万元人民币)
personId | company | money |
p1 | 公司1 | 100 |
p2 | 公司2 | 200 |
p1 | 公司3 | 150 |
p3 | 公司4 | 300 |
建表导入数据:
create table company_info( personId string, company string, money float )row format delimited fields terminated by "\t" load data local inpath “company_info.txt” into table company_info; |
2.1.1、order by
hive中的order by语句会对查询结果做一次全局排序,即,所有的mapper产生的结果都会交给一个reducer去处理,无论数据量大小,job任务只会启动一个reducer,如果数据量巨大,则会耗费大量的时间。
尖叫提示:如果在严格模式下,order by需要指定limit数据条数,不然数据量巨大的情况下会造成崩溃无输出结果。涉及属性:set hive.mapred.mode=nonstrict/strict
例如:按照money排序的例子
select * from company_info order by money desc; |
2.1.2、sort by
hive中的sort by语句会对每一块局部数据进行局部排序,即,每一个reducer处理的数据都是有序的,但是不能保证全局有序。
2.1.3、distribute by
hive中的distribute by一般要和sort by一起使用,即将某一块数据归给(distribute by)某一个reducer处理,然后在指定的reducer中进行sort by排序。
尖叫提示:distribute by必须写在sort by之前
尖叫提示:涉及属性mapreduce.job.reduces,hive.exec.reducers.bytes.per.reducer
例如:不同的人(personId)分为不同的组,每组按照money排序。
select * from company_info distribute by personId sort by personId, money desc; |
2.1.4、cluster by
hive中的cluster by在distribute by和sort by排序字段一致的情况下是等价的。同时,cluster by指定的列只能是降序,即默认的descend,而不能是ascend。
例如:写一个等价于distribute by 与sort by的例子
select * from company_info distribute by personId sort by personId; 等价于 select * from compnay_info cluster by personId; |
2.2、行转列、列转行(UDAF与UDTF)
2.2.1、行转列
表结构:
name | constellation | blood_type |
孙悟空 | 白羊座 | A |
大海 | 射手座 | A |
宋宋 | 白羊座 | B |
猪八戒 | 白羊座 | A |
凤姐 | 射手座 | A |
创建表及数据导入:
create table person_info( name string, constellation string, blood_type string) row format delimited fields terminated by "\t"; load data local inpath “person_info.tsv” into table person_info; |
例如:把星座和血型一样的人归类到一起
select t1.base, concat_ws('|', collect_set(t1.name)) name from (select name, concat(constellation, ",", blood_type) base from person_info) t1 group by t1.base; |
2.2.2、列转行
表结构:
movie | category |
《疑犯追踪》 | 悬疑,动作,科幻,剧情 |
《Lie to me》 | 悬疑,警匪,动作,心理,剧情 |
《战狼2》 | 战争,动作,灾难 |
创建表及导入数据:
create table movie_info( movie string, category array<string>) row format delimited fields terminated by "\t" collection items terminated by ","; load data local inpath "movie_info.tsv" into table movie_info; |
例如:将电影分类中的数组数据展开
select movie, category_name from movie_info lateral view explode(category) table_tmp as category_name; |
2.3、数组操作
“fields terminated by”:字段与字段之间的分隔符。
“collection items terminated by”:一个字段中各个子元素item的分隔符。
2.4、orc存储
orc即Optimized Row Columnar (ORC) file,在RCFile的基础上演化而来,可以提供一种高效的方法在Hive中存储数据,提升了读、写、处理数据的效率。
2.5、Hive分桶
Hive可以将表或者表的分区进一步组织成桶,以达到:
1、数据取样效率更高
2、数据处理效率更高
桶通过对指定列进行哈希来实现,将一个列名下的数据切分为“一组桶”,每个桶都对应了一个该列名下的一个存储文件。
2.5.1、直接分桶
开始操作之前,需要将hive.enforce.bucketing属性设置为true,以标识Hive可以识别桶。
create table music( id int, name string, size float) row format delimited fields terminated by "\t" clustered by (id) into 4 buckets; |
该代码的意思是将music表按照id将数据分成了4个桶,插入数据时,会对应4个 reduce操作,输出4个文件。
2.5.2、在分区中分桶
当数据量过大,需要庞大分区数量时,可以考虑桶,因为分区数量太大的情况可能会导致文件系统挂掉,而且桶比分区有更高的查询效率。数据最终落在哪一个桶里,取决于clustered by的那个列的值的hash数与桶的个数求余来决定。虽然有一定离散性,但不能保证每个桶中的数据量是一样的。
create table music2( id int, name string, size float) partitioned by (date string) clustered by (id) sorted by(size) into 4 bucket row format delimited fields terminated by "\t";
load data local inpath 'demo/music.txt' into table music2 partition(date='2017-08-30'); |
三、项目
3.1、数据结构
3.1.1、视频表
字段 | 备注 | 详细描述 |
video id | 视频唯一id | 11位字符串 |
uploader | 视频上传者 | 上传视频的用户名String |
age | 视频年龄 | 视频上传日期和2007年2月15日之间的整数天(Youtube的独特设定) |
category | 视频类别 | 上传视频指定的视频分类 |
length | 视频长度 | 整形数字标识的视频长度 |
views | 观看次数 | 视频被浏览的次数 |
rate | 视频评分 | 满分5分 |
ratings | 流量 | 视频的流量,整型数字 |
conments | 评论数 | 一个视频的整数评论数 |
related ids | 相关视频id | 相关视频的id,最多20个 |
3.1.2、用户表
字段 | 备注 | 字段类型 |
uploader | 上传者用户名 | string |
videos | 上传视频数 | int |
friends | 朋友数量 | int |
3.2原始数据存放地
HDFS目录:
视频数据集:/youtube/video/2008
用户数据集:/youtube/users/2008
3.3、技术选型
* Hadoop 2.7.2
* Hive 1.2.2
* Mysql 5.6
3.3.1、数据清洗
Hadoop MapReduce
3.3.2、数据分析
MapReduce or Hive
3.4、ETL原始数据
通过观察原始数据形式,可以发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。
该项目的pom.xml文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.z</groupId> <artifactId>youtube</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging>
<name>youtube</name> <url>http://maven.apache.org</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<repositories> <repository> <id>centor</id> <url>http://central.maven.org/maven2/</url> </repository> </repositories>
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-resourcemanager</artifactId> <version>2.7.2</version> </dependency> </dependencies> </project> |
3.6.1、ETL之ETLUtil
package com.z.youtube.util;
public class ETLUtils { /** * 1、过滤不合法数据 * 2、去掉&符号左右两边的空格 * 3、\t换成&符号 * @param ori * @return */ public static String getETLString(String ori){ String[] splits = ori.split("\t"); //1、过滤不合法数据 if(splits.length < 9) return null; //2、去掉&符号左右两边的空格 splits[3] = splits[3].replaceAll(" ", "");
StringBuilder sb = new StringBuilder(); //3、\t换成&符号 for(int i = 0; i < splits.length; i++){ sb.append(splits[i]); if(i < 9){ if(i != splits.length - 1){ sb.append("\t"); } }else{ if(i != splits.length - 1){ sb.append("&"); } } } return sb.toString(); } } |
3.6.2、ETL之Mapper
package com.z.youtube.mr.etl;
import java.io.IOException;
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import com.z.youtube.util.ETLUtil;
public class VideoETLMapper extends Mapper<Object, Text, NullWritable, Text>{ Text text = new Text();
@Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String etlString = ETLUtil.oriString2ETLString(value.toString());
if(StringUtils.isBlank(etlString)) return;
text.set(etlString); context.write(NullWritable.get(), text); }
} |
3.6.3、ETL之Runner
package com.z.youtube.mr.etl;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
public class VideoETLRunner implements Tool { private Configuration conf = null;
@Override public void setConf(Configuration conf) { this.conf = conf; }
@Override public Configuration getConf() {
return this.conf; }
@Override public int run(String[] args) throws Exception { conf = this.getConf(); conf.set("inpath", args[0]); conf.set("outpath", args[1]);
Job job = Job.getInstance(conf, "youtube-video-etl");
job.setJarByClass(VideoETLRunner.class);
job.setMapperClass(VideoETLMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(0);
this.initJobInputPath(job); this.initJobOutputPath(job);
return job.waitForCompletion(true) ? 0 : 1; }
private void initJobOutputPath(Job job) throws IOException { Configuration conf = job.getConfiguration(); String outPathString = conf.get("outpath");
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(outPathString); if(fs.exists(outPath)){ fs.delete(outPath, true); }
FileOutputFormat.setOutputPath(job, outPath);
}
private void initJobInputPath(Job job) throws IOException { Configuration conf = job.getConfiguration(); String inPathString = conf.get("inpath");
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(inPathString); if(fs.exists(inPath)){ FileInputFormat.addInputPath(job, inPath); }else{ throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString); } }
public static void main(String[] args) { try { int resultCode = ToolRunner.run(new VideoETLRunner(), args); if(resultCode == 0){ System.out.println("Success!"); }else{ System.out.println("Fail!"); } System.exit(resultCode); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } } |
3.6.4、执行ETL
赠送编译打包命令提示:-P local clean package
$ bin/yarn jar ~/softwares/jars/youtube-0.0.1-SNAPSHOT.jar \ com.z.youtube.etl.ETLYoutubeVideosRunner \ /youtube/video/2008/0222 \ /youtube/output/video/2008/0222 |
3.5、准备工作
3.5.1、创建表
创建表:youtube_ori,youtube_user_ori,
创建表:youtube_orc,youtube_user_orc
youtube_ori:
create table youtube_ori( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) row format delimited fields terminated by "\t" collection items terminated by "&" stored as textfile; |
youtube_user_ori:
create table youtube_user_ori( uploader string, videos int, friends int) clustered by (uploader) into 24 buckets row format delimited fields terminated by "\t" stored as textfile; |
然后把原始数据插入到orc表中
youtube_orc:
create table youtube_orc( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) clustered by (uploader) into 8 buckets row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc; |
youtube_user_orc:
create table youtube_user_orc( uploader string, videos int, friends int) clustered by (uploader) into 24 buckets row format delimited fields terminated by "\t" stored as orc; |
3.5.2、导入ETL后的数据
youtube_ori:
load data inpath "/youtube/output/video/2008/0222" into table youtube_ori; |
youtube_user_ori:
load data inpath "/youtube/user/2008/0903" into table youtube_user_ori; |
3.5.3、向ORC表插入数据
youtube_orc:
insert into table youtube_orc select * from youtube_ori; |
youtube_user_orc:
insert into table youtube_user_orc select * from youtube_user_ori; |
3.6、业务分析
3.6.1、统计视频观看数Top10
思路:
1) 使用order by按照views字段做一个全局排序即可,同时我们设置只显示前10条。
最终代码:
select videoId, uploader, age, category, length, views, rate, ratings, comments from youtube_orc order by views desc limit 10; |
3.6.2、统计视频类别热度Top10
思路:
1) 即统计每个类别有多少个视频,显示出包含视频最多的前10个类别。
2) 我们需要按照类别group by聚合,然后count组内的videoId个数即可。
3) 因为当前表结构为:一个视频对应一个或多个类别。所以如果要group by类别,需要先将类别进行列转行(展开),然后再进行count即可。
4) 最后按照热度排序,显示前10条。
最终代码:
select category_name as category, count(t1.videoId) as hot from ( select videoId, category_name from youtube_orc lateral view explode(category) t_catetory as category_name) t1 group by t1.category_name order by hot desc limit 10; |
3.6.3、统计出视频观看数最高的20个视频的所属类别以及类别包含这Top20视频的个数
思路:
1) 先找到观看数最高的20个视频所属条目的所有信息,降序排列
2) 把这20条信息中的category分裂出来(列转行)
3) 最后查询视频分类名称和该分类下有多少个Top20的视频
最终代码:
select category_name as category, count(t2.videoId) as hot_with_views from ( select videoId, category_name from ( select * from youtube_orc order by views desc limit 20) t1 lateral view explode(category) t_catetory as category_name) t2 group by category_name order by hot_with_views desc; |
3.6.4、统计视频观看数Top50所关联视频的所属类别的热度排名
思路:
1) 查询出观看数最多的前50个视频的所有信息(当然包含了每个视频对应的关联视频),记为临时表t1
t1:观看数前50的视频 select * from youtube_orc order by views desc limit 50; |
2) 将找到的50条视频信息的相关视频relatedId列转行,记为临时表t2
t2:将相关视频的id进行列转行操作 select explode(relatedId) as videoId from t1; |
3) 将相关视频的id和youtube_orc表进行inner join操作
t5:得到两列数据,一列是category,一列是之前查询出来的相关视频id (select distinct(t2.videoId), t3.category from t2 inner join youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name; |
4) 按照视频类别进行分组,统计每组视频个数,然后排行
最终代码:
select category_name as category, count(t5.videoId) as hot from ( select videoId, category_name from ( select distinct(t2.videoId), t3.category from ( select explode(relatedId) as videoId from ( select * from youtube_orc order by views desc limit 50) t1) t2 inner join youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name) t5 group by category_name order by hot desc; |
3.6.5、统计每个类别中的视频热度Top10,以Music为例
思路:
1) 要想统计Music类别中的视频热度Top10,需要先找到Music类别,那么就需要将category展开,所以可以创建一张表用于存放categoryId展开的数据。
2) 向category展开的表中插入数据。
3) 统计对应类别(Music)中的视频热度。
最终代码:
创建表类别表:
create table youtube_category( videoId string, uploader string, age int, categoryId string, length int, views int, rate float, ratings int, comments int, relatedId array<string>) row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc; |
向类别表中插入数据:
insert into table youtube_category select videoId, uploader, age, categoryId, length, views, rate, ratings, comments, relatedId from youtube_orc lateral view explode(category) catetory as categoryId; |
统计Music类别的Top10(也可以统计其他)
select videoId, views from youtube_category where categoryId = "Music" order by views desc limit 10; |
3.6.6、统计每个类别中视频流量Top10,以Music为例
思路:
1) 创建视频类别展开表(categoryId列转行后的表)
2) 按照ratings排序即可
最终代码:
select videoId, views, ratings from youtube_category where categoryId = "Music" order by ratings desc limit 10; |
3.6.7、统计上传视频最多的用户Top10以及他们上传的观看次数在前20的视频
思路:
1) 先找到上传视频最多的10个用户的用户信息
select * from youtube_user_orc order by videos desc limit 10; |
2) 通过uploader字段与youtube_orc表进行join,得到的信息按照views观看次数进行排序即可。
最终代码:
select t2.videoId, t2.views, t2.ratings, t1.videos, t1.friends from ( select * from youtube_user_orc order by videos desc limit 10) t1 join youtube_orc t2 on t1.uploader = t2.uploader order by views desc limit 20; |
3.6.8、统计每个类别视频观看数Top10
思路:
1) 先得到categoryId展开的表数据
2) 子查询按照categoryId进行分区,然后分区内排序,并生成递增数字,该递增数字这一列起名为rank列
3) 通过子查询产生的临时表,查询rank值小于等于10的数据行即可。
最终代码:
select t1.* from ( select videoId, categoryId, views, row_number() over(partition by categoryId order by views desc) rank from youtube_category) t1 where rank <= 10; |
四、可能出现的问题
4.1、JVM堆内存溢出
描述:java.lang.OutOfMemoryError: Java heap space
解决:在yarn-site.xml中加入如下代码
<property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>2.1</value> </property> <property> <name>mapred.child.java.opts</name> <value>-Xmx1024m</value> </property> |