
Hive -- 从日志分析学习Hive(二)
继续上篇,上一篇中数据已经导入到Hive中,并且去除双引号的UDF也编写完成,该篇将围绕着业务建立相关的表。
动态分区
从源数据中直接导入的数据可能并不符合最终的要求,一般都需要清洗阶段,这里新建一个dwd层表,该表与ods层的不同之处在于使用了时间作为分区字段,分区的本质对于Hive来说是表空间下的不同文件夹,当查询时如果指定分区就不需要扫描全部的文件。另外该表使用了orcfile
格式存储,并且开启了snappy
压缩算法,一般CPU速率远远大于硬盘速率,因此这是一种消耗CPU性能来弥补硬盘性能不足的策略,先立个flag,后面专门研究下hive中的存储格式以及压缩算法。
清单1:清洗后的中间层表
1 | create external table if not exists dwd_mrdear_access_log( |
表建立后,自然要想办法把数据从源表中导入进来,这里使用动态分区,动态分区的原理是根据查询出来的字段列表最后的列作为分区字段,比如下面sql中分区字段为dt
且只有一个,因此会使用formatDate(time)
(自定义时间处理的UDF)的结果作为该分区值,然后进行分区处理,分区处理判断该分区有没有建立,没有则创建,然后插入数据。
清单2:数据清洗ETL
1 | // 单个reduce对内存有限制,如果oom,则可以设置多个reduce处理 |
PS
这里可能会报错,有几个参数控制着Hive的动态分区,根据错误进行调整。
清单3:动态分区相关配置
1 | hive.exec.dynamic.partition=true/false #是否开启动态分区 |
动态分区的原理
动态分区的隐藏条件是根据分区字段作为中间结果的分区输出条件,举个例子,在上述导入数据的SQL中,当执行完select xx from ods_mrdear_access_src_log DISTRIBUTE BY dt
时所产生的临时结果数据已经是分区后的结果,如下所示:
然后利用MoveOperator
从临时目录移动到最终表空间下,需要合并的话还会执行MergeOperator
把多个结果集合并成一个,完成动态分区。
查询计划explain
通过上面动态分区所需要的业务中间表已经建立,接下来是完成业务需求,比如统计HTTP状态码的分布,也就是200,400,404等请求的个数,通过hive很容易写出以下sql。
清单4:统计状态码分布
1 | select status,count(1) as total from dwd_mrdear_access_log group by status; |
那么对于Hive来说,该SQL到底是怎么执行的呢?Hive通过查询计划Explain向开发人员展示整个查询流程,使用方式是在查询语句前加explain
或者explain extended
关键字,后者能看到更加详细的信息。在这之前先了解Hive的基本操作分类,如下表所示:
操作符 | 描述 | 描述类 |
---|---|---|
TableScanOperator | 扫描hive表数据 | org.apache.hadoop.hive.ql.plan.TableScanDesc |
ReduceOutOperator | 创建将发送到Reduce端的<key,reduce>对 | org.apache.hadoop.hive.ql.plan.ReduceSinkDesc |
JoinOperator | Join两份数据 | org.apache.hadoop.hive.ql.plan.JoinDesc |
SelectOperator | 选择输出列 | org.apache.hadoop.hive.ql.plan.SelectDesc |
FileOutOperator | 建立结果数据,输出至文件 | org.apache.hadoop.hive.ql.plan.FileSinkDesc |
FilterOperator | 过滤输入数据 | org.apache.hadoop.hive.ql.plan.FilterDesc |
GroupByOperator | Group By语句 | org.apache.hadoop.hive.ql.plan.GroupByDesc |
MapJoinOperator | /+mapjoin(t)/ | org.apache.hadoop.hive.ql.plan.MapJoinDesc |
LimitOperator | Limit语句 | org.apache.hadoop.hive.ql.plan.LimitDesc |
UnionOperator | Union语句 | org.apache.hadoop.hive.ql.plan.UnionDesc |
FetchOperator | 客户端直接读取数据 | org.apache.hadoop.hive.ql.plan.FetchWork |
MoveOperator | 移动数据文件 | org.apache.hadoop.hive.ql.plan.MoveWork |
更多的Operator可以在Hive源码中定位到org.apache.hadoop.hive.ql.plan.Explain 注解,然后查看相应的逻辑。 |
explain
会把SQL拆分为多个STAGE
,STAGE
之间会构成一个DAG图的依赖关系,根据DAG的关系决定执行方式。在执行流程就会拆分为对应的Operator
,对于select status,count(1) as total from dwd_mrdear_access_log group by status;
其执行流程如下(注释很详细):
清单5:查询计划注释
1 | STAGE DEPENDENCIES: # 这里展示任务依赖关系,即DAG图 |
转换为图则如下:
Map以及Reduce数量
在Hive中可以使用其提供的参数来控制Map以及Reduce的任务数量,从而针对不同的需求最大化性能。
Map数量
Hive是基于Hadoop的上层抽象,其Map本质是使用org.apache.hadoop.mapred.InputFormat
从HDFS中读取数据,因此能起多少Map由具体的实现类中getSplits
方法确定,一般情况下一个org.apache.hadoop.mapred.InputSplit
就是一个Map。
在Hive中如果想影响拆分算法,一般使用org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
读取且合并小文件数据,然后使用以下参数影响拆分合并算法:
- mapreduce.input.fileinputformat.split.minsize:每一个块最小size
- mapreduce.input.fileinputformat.split.maxsize:每一个块最大size
- mapreduce.input.fileinputformat.split.minsize.per.node:同一节点的数据块形成切片时,切片大小的最小值
- mapreduce.input.fileinputformat.split.minsize.per.rack:同一机架数据块切片时最小值
Reduce数量
Reduce数量主要由以下三个参数控制,其逻辑在org.apache.hadoop.hive.ql.exec.mr.MapRedTask#setNumberOfReducers
方法中。
- hive.exec.reducers.bytes.per.reducer (默认值: 256000000):根据处理文件大小决定reduce数量,默认256Mb,如果是1G输入文件则对应4个Reduce任务。
- hive.exec.reducers.max (默认值: 1009) :控制最大的Reduce数量
- mapreduce.job.reduces (默认值: -1):直接指定Reduce数量
业务需求
查询top受访页面
清单6:top受访页面结果
1 | # SQL |
根据查询可以看出来,RSS订阅地址以及首页访问频率最大,不过search页面为什么访问也这么大,并且还是POST,很不合理。查询下数据select * from dwd_mrdear_access_log where request_url='POST /search/ HTTP/1.1' limit 10;
确认是某个客户端在一直请求,状态返回都是405.
查询top带宽页面
清单7:top带宽页面结果
1 | select request_url, count(request_url) as total, sum(size/1024/1024) as size from dwd_mrdear_access_log group by request_url sort by size desc limit 10; |
根据结果live2d是真的占流量,拖慢网站速度,不过由于启用了PWA有一定程度的缓解。
参考
- 版权声明: 感谢您的阅读,本文由屈定's Blog版权所有。如若转载,请注明出处。
- 文章标题: Hive -- 从日志分析学习Hive(二)
- 文章链接: https://mrdear.cn/posts/framework-hive-study2.html