一、需求描述

统计Youtube视频网站的常规指标,各种TopN指标:

--统计视频观看数Top10

--统计视频类别热度Top10

--统计视频观看数Top20所属类别包含这Top20视频的个数

--统计视频观看数Top50所关联视频的所属类别Rank

--统计每个类别中的视频热度Top10

--统计每个类别中视频流量Top10

--统计上传视频最多的用户Top10以及他们上传的视频

--统计每个类别视频观看数Top10

二、知识储备梳理

2.1、order by,sort by,distribute by,cluster by

背景表结构

在讲解中我们需要贯串一个 例子,所以需要设计一个情景,对应 还要有一个表结构和填充数据。如下:有3个字段,分别为personId标识某一个人,company标识一家公司名称,money标识该公司每年盈利收入(单位:万元人民币)

personId

company

money

p1

公司1

100

p2

公司2

200

p1

公司3

150

p3

公司4

300

建表导入数据:

create table company_info(

    personId string,

    company string,

    money float

)row format delimited fields terminated by "\t"

load data local inpath “company_info.txt” into table company_info;

2.1.1、order by

hive中的order by语句会对查询结果做一次全局排序,即,所有的mapper产生的结果都会交给一个reducer去处理,无论数据量大小,job任务只会启动一个reducer,如果数据量巨大,则会耗费大量的时间。

尖叫提示:如果在严格模式下,order by需要指定limit数据条数,不然数据量巨大的情况下会造成崩溃无输出结果。涉及属性:set hive.mapred.mode=nonstrict/strict

例如:按照money排序的例子

select * from company_info order by money desc;

2.1.2、sort by

hive中的sort by语句会对每一块局部数据进行局部排序,即,每一个reducer处理的数据都是有序的,但是不能保证全局有序。

2.1.3、distribute by

hive中的distribute by一般要和sort by一起使用,即将某一块数据归给(distribute by)某一个reducer处理,然后在指定的reducer中进行sort by排序。

尖叫提示:distribute by必须写在sort by之前

尖叫提示:涉及属性mapreduce.job.reduces,hive.exec.reducers.bytes.per.reducer

例如:不同的人(personId)分为不同的组,每组按照money排序。

select * from company_info distribute by personId sort by personId, money desc;

2.1.4、cluster by

hive中的cluster by在distribute by和sort by排序字段一致的情况下是等价的。同时,cluster by指定的列只能是降序,即默认的descend,而不能是ascend。

例如:写一个等价于distribute by 与sort by的例子

select * from company_info distribute by personId sort by personId;

等价于

select * from compnay_info cluster by personId;

 

2.2、行转列、列转行(UDAF与UDTF)

2.2.1、行转列

表结构:

name

constellation

blood_type

孙悟空

白羊座

A

大海

射手座

A

宋宋

白羊座

B

猪八戒

白羊座

A

凤姐

射手座

A

创建表及数据导入:

create table person_info(

name string,

constellation string,

blood_type string)

row format delimited fields terminated by "\t";

load data local inpath “person_info.tsv” into table person_info;

例如:把星座和血型一样的人归类到一起

select

    t1.base,

    concat_ws('|', collect_set(t1.name)) name

from

    (select

        name,

        concat(constellation, ",", blood_type) base

    from

        person_info) t1

group by

    t1.base;

 

2.2.2、列转行

表结构:

movie

category

《疑犯追踪》

悬疑,动作,科幻,剧情

《Lie to me》

悬疑,警匪,动作,心理,剧情

《战狼2》

战争,动作,灾难

创建表及导入数据:

create table movie_info(

    movie string,

    category array<string>)

row format delimited fields terminated by "\t"

collection items terminated by ",";

load data local inpath "movie_info.tsv" into table movie_info;

例如:将电影分类中的数组数据展开

select

    movie,

    category_name

from

    movie_info lateral view explode(category) table_tmp as category_name;

 

2.3、数组操作

“fields terminated by”:字段与字段之间的分隔符。

“collection items terminated by”:一个字段中各个子元素item的分隔符。

2.4、orc存储

orc即Optimized Row Columnar (ORC) file,在RCFile的基础上演化而来,可以提供一种高效的方法在Hive中存储数据,提升了读、写、处理数据的效率。

2.5、Hive分桶

Hive可以将表或者表的分区进一步组织成桶,以达到:

1、数据取样效率更高

2、数据处理效率更高

桶通过对指定列进行哈希来实现,将一个列名下的数据切分为“一组桶”,每个桶都对应了一个该列名下的一个存储文件。

2.5.1、直接分桶

开始操作之前,需要将hive.enforce.bucketing属性设置为true,以标识Hive可以识别桶。

create table music(

    id int,

    name string,

    size float)

row format delimited

fields terminated by "\t"

clustered by (id) into 4 buckets;

该代码的意思是将music表按照id将数据分成了4个桶,插入数据时,会对应4个 reduce操作,输出4个文件。

2.5.2、在分区中分桶

当数据量过大,需要庞大分区数量时,可以考虑桶,因为分区数量太大的情况可能会导致文件系统挂掉,而且桶比分区有更高的查询效率。数据最终落在哪一个桶里,取决于clustered by的那个列的值的hash数与桶的个数求余来决定。虽然有一定离散性,但不能保证每个桶中的数据量是一样的。

create table music2(

    id int,

    name string,

    size float)

partitioned by (date string)

clustered by (id) sorted by(size) into 4 bucket

row format delimited

fields terminated by "\t";

 

load data local inpath 'demo/music.txt' into table music2 partition(date='2017-08-30');

 

三、项目

3.1、数据结构

3.1.1、视频表

 

字段

备注

详细描述

video id

视频唯一id

11位字符串

uploader

视频上传者

上传视频的用户名String

age

视频年龄

视频上传日期和2007年2月15日之间的整数天(Youtube的独特设定)

category

视频类别

上传视频指定的视频分类

length

视频长度

整形数字标识的视频长度

views

观看次数

视频被浏览的次数

rate

视频评分

满分5分

ratings

流量

视频的流量,整型数字

conments

评论数

一个视频的整数评论数

related ids

相关视频id

相关视频的id,最多20个

3.1.2、用户表

字段

备注

字段类型

uploader

上传者用户名

string

videos

上传视频数

int

friends

朋友数量

int

3.2原始数据存放地

HDFS目录:

视频数据集:/youtube/video/2008

用户数据集:/youtube/users/2008

3.3、技术选型

* Hadoop 2.7.2

* Hive 1.2.2

* Mysql 5.6

3.3.1、数据清洗

Hadoop MapReduce

3.3.2、数据分析

MapReduce or Hive

3.4、ETL原始数据

通过观察原始数据形式,可以发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。

 

 

该项目的pom.xml文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

       <modelVersion>4.0.0</modelVersion>

 

       <groupId>com.z</groupId>

       <artifactId>youtube</artifactId>

       <version>0.0.1-SNAPSHOT</version>

       <packaging>jar</packaging>

 

       <name>youtube</name>

       <url>http://maven.apache.org</url>

 

       <properties>

              <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

       </properties>

 

       <repositories>

              <repository>

                     <id>centor</id>

                     <url>http://central.maven.org/maven2/</url>

              </repository>

       </repositories>

 

       <dependencies>

              <dependency>

                     <groupId>junit</groupId>

                     <artifactId>junit</artifactId>

                     <version>3.8.1</version>

                     <scope>test</scope>

              </dependency>

 

              <dependency>

                     <groupId>org.apache.hadoop</groupId>

                     <artifactId>hadoop-client</artifactId>

                     <version>2.7.2</version>

              </dependency>

 

              <dependency>

                     <groupId>org.apache.hadoop</groupId>

                     <artifactId>hadoop-yarn-server-resourcemanager</artifactId>

                     <version>2.7.2</version>

              </dependency>

       </dependencies>

</project>

 

3.6.1、ETL之ETLUtil

package com.z.youtube.util;

 

public class ETLUtils {

       /**

        * 1、过滤不合法数据

        * 2、去掉&符号左右两边的空格

        * 3、\t换成&符号

        * @param ori

        * @return

        */

       public static String getETLString(String ori){

              String[] splits = ori.split("\t");

              //1、过滤不合法数据

              if(splits.length < 9) return null;

              //2、去掉&符号左右两边的空格

              splits[3] = splits[3].replaceAll(" ", "");

             

              StringBuilder sb = new StringBuilder();

              //3、\t换成&符号

              for(int i = 0; i < splits.length; i++){

                     sb.append(splits[i]);

                     if(i < 9){

                            if(i != splits.length - 1){

                                   sb.append("\t");

                            }

                     }else{

                            if(i != splits.length - 1){

                                   sb.append("&");

                            }

                     }

              }

              return sb.toString();

       }

}

 

3.6.2、ETL之Mapper

package com.z.youtube.mr.etl;

 

import java.io.IOException;

 

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

import com.z.youtube.util.ETLUtil;

 

public class VideoETLMapper extends Mapper<Object, Text, NullWritable, Text>{

       Text text = new Text();

      

       @Override

       protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

              String etlString = ETLUtil.oriString2ETLString(value.toString());

             

              if(StringUtils.isBlank(etlString)) return;

             

              text.set(etlString);

              context.write(NullWritable.get(), text);

       }

      

 

}

 

3.6.3、ETL之Runner

package com.z.youtube.mr.etl;

 

import java.io.IOException;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

 

public class VideoETLRunner implements Tool {

       private Configuration conf = null;

 

       @Override

       public void setConf(Configuration conf) {

              this.conf = conf;

       }

 

       @Override

       public Configuration getConf() {

 

              return this.conf;

       }

 

       @Override

       public int run(String[] args) throws Exception {

              conf = this.getConf();

              conf.set("inpath", args[0]);

              conf.set("outpath", args[1]);

 

              Job job = Job.getInstance(conf, "youtube-video-etl");

             

              job.setJarByClass(VideoETLRunner.class);

             

              job.setMapperClass(VideoETLMapper.class);

              job.setMapOutputKeyClass(NullWritable.class);

              job.setMapOutputValueClass(Text.class);

              job.setNumReduceTasks(0);

             

              this.initJobInputPath(job);

              this.initJobOutputPath(job);

             

              return job.waitForCompletion(true) ? 0 : 1;

       }

 

       private void initJobOutputPath(Job job) throws IOException {

              Configuration conf = job.getConfiguration();

              String outPathString = conf.get("outpath");

             

              FileSystem fs = FileSystem.get(conf);

             

              Path outPath = new Path(outPathString);

              if(fs.exists(outPath)){

                     fs.delete(outPath, true);

              }

             

              FileOutputFormat.setOutputPath(job, outPath);

             

       }

 

       private void initJobInputPath(Job job) throws IOException {

              Configuration conf = job.getConfiguration();

              String inPathString = conf.get("inpath");

             

              FileSystem fs = FileSystem.get(conf);

             

              Path inPath = new Path(inPathString);

              if(fs.exists(inPath)){

                     FileInputFormat.addInputPath(job, inPath);

              }else{

                     throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString);

              }

       }

 

       public static void main(String[] args) {

              try {

                     int resultCode = ToolRunner.run(new VideoETLRunner(), args);

                     if(resultCode == 0){

                            System.out.println("Success!");

                     }else{

                            System.out.println("Fail!");

                     }

                     System.exit(resultCode);

              } catch (Exception e) {

                     e.printStackTrace();

                     System.exit(1);

              }

       }

}

3.6.4、执行ETL

赠送编译打包命令提示:-P local clean package

$ bin/yarn jar ~/softwares/jars/youtube-0.0.1-SNAPSHOT.jar \

com.z.youtube.etl.ETLYoutubeVideosRunner \

/youtube/video/2008/0222 \

/youtube/output/video/2008/0222

 

3.5、准备工作

3.5.1、创建表

创建表:youtube_ori,youtube_user_ori,

创建表:youtube_orc,youtube_user_orc

youtube_ori:

create table youtube_ori(

    videoId string,

    uploader string,

    age int,

    category array<string>,

    length int,

    views int,

    rate float,

    ratings int,

    comments int,

    relatedId array<string>)

row format delimited

fields terminated by "\t"

collection items terminated by "&"

stored as textfile;

youtube_user_ori:

create table youtube_user_ori(

    uploader string,

    videos int,

    friends int)

clustered by (uploader) into 24 buckets

row format delimited

fields terminated by "\t"

stored as textfile;

 

然后把原始数据插入到orc表中

youtube_orc:

create table youtube_orc(

    videoId string,

    uploader string,

    age int,

    category array<string>,

    length int,

    views int,

    rate float,

    ratings int,

    comments int,

    relatedId array<string>)

clustered by (uploader) into 8 buckets

row format delimited fields terminated by "\t"

collection items terminated by "&"

stored as orc;

youtube_user_orc:

create table youtube_user_orc(

    uploader string,

    videos int,

    friends int)

clustered by (uploader) into 24 buckets

row format delimited

fields terminated by "\t"

stored as orc;

3.5.2、导入ETL后的数据

youtube_ori:

load data inpath "/youtube/output/video/2008/0222" into table youtube_ori;

youtube_user_ori:

load data inpath "/youtube/user/2008/0903" into table youtube_user_ori;

3.5.3、向ORC表插入数据

youtube_orc

insert into table youtube_orc select * from youtube_ori;

youtube_user_orc

insert into table youtube_user_orc select * from youtube_user_ori;

3.6、业务分析

3.6.1、统计视频观看数Top10

思路:

1) 使用order by按照views字段做一个全局排序即可,同时我们设置只显示前10条。

最终代码:

select

    videoId,

    uploader,

    age,

    category,

    length,

    views,

    rate,

    ratings,

    comments

from

    youtube_orc

order by

    views

desc limit

    10;

3.6.2、统计视频类别热度Top10

思路:

1) 即统计每个类别有多少个视频,显示出包含视频最多的前10个类别。

2) 我们需要按照类别group by聚合,然后count组内的videoId个数即可。

3) 因为当前表结构为:一个视频对应一个或多个类别。所以如果要group by类别,需要先将类别进行列转行(展开),然后再进行count即可。

4) 最后按照热度排序,显示前10条。

最终代码:

select

    category_name as category,

    count(t1.videoId) as hot

from (

    select

        videoId,

        category_name

    from

        youtube_orc lateral view explode(category) t_catetory as category_name) t1

group by

    t1.category_name

order by

    hot

desc limit

    10;

3.6.3、统计出视频观看数最高的20个视频的所属类别以及类别包含这Top20视频的个数

思路:

1) 先找到观看数最高的20个视频所属条目的所有信息,降序排列

2) 把这20条信息中的category分裂出来(列转行)

3) 最后查询视频分类名称和该分类下有多少个Top20的视频

最终代码:

select

    category_name as category,

    count(t2.videoId) as hot_with_views

from (

    select

        videoId,

        category_name

    from (

        select

            *

        from

            youtube_orc

        order by

            views

        desc limit

            20) t1 lateral view explode(category) t_catetory as category_name) t2

group by

    category_name

order by

    hot_with_views

desc;

3.6.4、统计视频观看数Top50所关联视频的所属类别的热度排名

思路:

1) 查询出观看数最多的前50个视频的所有信息(当然包含了每个视频对应的关联视频),记为临时表t1

t1:观看数前50的视频

select

    *

from

    youtube_orc

order by

    views

desc limit

    50;

2) 将找到的50条视频信息的相关视频relatedId列转行,记为临时表t2

t2:将相关视频的id进行列转行操作

select

    explode(relatedId) as videoId

from

       t1;

3) 将相关视频的id和youtube_orc表进行inner join操作

t5:得到两列数据,一列是category,一列是之前查询出来的相关视频id

(select

    distinct(t2.videoId),

    t3.category

from

    t2

inner join

    youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name;

4) 按照视频类别进行分组,统计每组视频个数,然后排行

最终代码:

select

    category_name as category,

    count(t5.videoId) as hot

from (

    select

        videoId,

        category_name

    from (

        select

            distinct(t2.videoId),

            t3.category

        from (

            select

                explode(relatedId) as videoId

            from (

                select

                    *

                from

                    youtube_orc

                order by

                    views

                desc limit

                    50) t1) t2

        inner join

            youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name) t5

group by

    category_name

order by

    hot

desc;

3.6.5、统计每个类别中的视频热度Top10,以Music为例

思路:

1) 要想统计Music类别中的视频热度Top10,需要先找到Music类别,那么就需要将category展开,所以可以创建一张表用于存放categoryId展开的数据。

2) 向category展开的表中插入数据。

3) 统计对应类别(Music)中的视频热度。

最终代码:

创建表类别表:

create table youtube_category(

    videoId string,

    uploader string,

    age int,

    categoryId string,

    length int,

    views int,

    rate float,

    ratings int,

    comments int,

    relatedId array<string>)

row format delimited

fields terminated by "\t"

collection items terminated by "&"

stored as orc;

向类别表中插入数据:

insert into table youtube_category 

    select

        videoId,

        uploader,

        age,

        categoryId,

        length,

        views,

        rate,

        ratings,

        comments,

        relatedId

    from

        youtube_orc lateral view explode(category) catetory as categoryId;

统计Music类别的Top10(也可以统计其他)

select

    videoId,

    views

from

    youtube_category

where

    categoryId = "Music"

order by

    views

desc limit

    10;

3.6.6、统计每个类别中视频流量Top10,以Music为例

思路:

1) 创建视频类别展开表(categoryId列转行后的表)

2) 按照ratings排序即可

最终代码:

select

    videoId,

    views,

    ratings

from

    youtube_category

where

    categoryId = "Music"

order by

    ratings

desc limit

    10;

3.6.7、统计上传视频最多的用户Top10以及他们上传的观看次数在前20的视频

思路:

1) 先找到上传视频最多的10个用户的用户信息

select

    *

from

    youtube_user_orc

order by

    videos

desc limit

    10;

2) 通过uploader字段与youtube_orc表进行join,得到的信息按照views观看次数进行排序即可。

最终代码:

select

    t2.videoId,

    t2.views,

    t2.ratings,

    t1.videos,

    t1.friends

from (

    select

        *

    from

        youtube_user_orc

    order by

        videos desc

    limit

        10) t1

join

    youtube_orc t2

on

    t1.uploader = t2.uploader

order by

    views desc

limit

    20;

3.6.8、统计每个类别视频观看数Top10

思路:

1) 先得到categoryId展开的表数据

2) 子查询按照categoryId进行分区,然后分区内排序,并生成递增数字,该递增数字这一列起名为rank列

3) 通过子查询产生的临时表,查询rank值小于等于10的数据行即可。

最终代码

select

    t1.*

from (

    select

        videoId,

        categoryId,

        views,

        row_number() over(partition by categoryId order by views desc) rank from youtube_category) t1

where

    rank <= 10;

四、可能出现的问题

4.1、JVM堆内存溢出

描述:java.lang.OutOfMemoryError: Java heap space

解决:在yarn-site.xml中加入如下代码

<property>

       <name>yarn.scheduler.maximum-allocation-mb</name>

       <value>2048</value>

</property>

<property>

     <name>yarn.scheduler.minimum-allocation-mb</name>

     <value>2048</value>

</property>

<property>

       <name>yarn.nodemanager.vmem-pmem-ratio</name>

       <value>2.1</value>

</property>

<property>

       <name>mapred.child.java.opts</name>

       <value>-Xmx1024m</value>

</property>