hive任务优化

2018年7月26日 67点热度 0条评论 来源: 不名一文

hive是基于大数据开发的一个用于数据仓库的工具,其主要功能是将HQL(HIVE SQL)转换成mapreduce执行。所以对hive语句的优化几乎等于对mapreduce的优化,主要在io和数据倾斜方面进行优化。本文将从语句、任务、模型三个方面简单阐述优化方法和理论

1.语句优化

1.1合并小文件

map针对每一个文件产生一个或多个map任务,如果输入小文件过多,则会产生许多map任务处理每个小文件,严重耗费了资源。通过如下设置可以对输入小文件进行合并操作

  set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 

1.2压缩选择

可以通过压缩中间文件减少io消耗,提高效率

hive中存储格式和压缩格式如下:

1.2.1存储格式

  • Text File text格式,此为默认的格式。可以使用Gzip或者Bzip2压缩格式

  • SequenceFile 二进制文件格式,支持NONE/RECORD/BLOCK压缩格式

  • RCFile

  • Avro Files

  • ORC Files 

  • Parquet

  • Custom INPUTFORMAT and OUTPUTFORMAT 用户自定义文件格式

推荐orc files 和 parquet。其中orc用的较多

1.2.2压缩格式

压缩格式主要有 bzip2、gzip、lzo、snappy等

在进行shuffle中,由于进行数据传输,会产生较大的io。此时对map输出文件进行压缩,能够减小数据文件大小,降低io,提高执行效率,一般建议采用SnappyCodec压缩格式,此格式有较高的压缩比和低cpu消耗

set hive.exec.compress.intermediate=true;
set mapreduce.map.output.compress=true
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

1.3join优化

数据倾斜指由于数据表中某些值数据量较大时,导致某些reducer上数据量较大。在执行过程中会出现其它reducer都已完成,某些reducer还在执行且进度条一直呈现99%,严重影响了整个任务的执行效率(也称长尾),数据倾斜优化就是要解决某些值数据量较大的情况。

1.3.1 mapjoin

当大表和小表join出现数据倾斜时,可以将小表缓存至内存,在map端进行join操作,设置如下:

  set hive.auto.convert.join.noconditionaltask = true;
  set hive.auto.convert.join.noconditionaltask.size = 10000000;

我司可缓存内存默认大小为500M,最高可达2G

1.3.2 空值导致长尾

对空值进行随机处理

select   ...
  from (select *
          from tbcdm.dim_tb_itm
         where ds = '${bizdate}'
       )t
  left join (select *
               from tbods.s_standard_brand
              where ds='${bizdate}'
                and status=3
       ) t1
   on  coalesce(t.org_brand_id,rand() * 9999 ) = t1.value_id; //  此处进行了随机处理,请确保随机后的数不和 value_id 碰撞

1.3.3 热点值导致长尾(手动处理)

如果是因为热点值导致长尾,并且JOIN的输入比较大无法用MAP JOIN,可以先将热点Key取出,对于主表数据用热点Key切分成热点数据和非热点数据两部分分别处理,最后合并。以淘宝的PV日志表关联商品维表取商品属性为例:

  1. 取出热点Key:将PV大于50000的商品ID取出到临时表。
    insert overwrite table topk_item partition (ds = '${bizdate}')
    select item_id
         , count(1) as cnt
      from dwd_tb_log_pv_di
     where ds = '${bizdate}'
       and url_type = 'ipv'
       and item_id is not null
     group by item_id
    having cnt >= cnt >= 50000;
  2. 取出非热点数据。将主表(sdwd_tb_log_pv_di)和热点key表(topk_item)外关联后通过条件b1.item_id is null,取出关联不到的数据即非热点商品的日志数据,此时需要用map join。再用非热点数据关联商品维表,因为已经排除了热点数据,不会存在长尾。
    select   ...
      from (select   *
              from     dim_tb_itm
             where    ds = '${bizdate}'
            ) a
     right join (select   /*  mapjoin(b1) */
                         b2.*
                    from (select item_id
                            from topk_item
                           where ds = '${bizdate}'
                          ) b1
                    right join (select *
                                  from dwd_tb_log_pv_di
                                 where ds = '${bizdate}'
                                   and url_type = 'ipv'
                               ) b2
                       on b1.item_id = coalesce(b2.item_id,concat("tbcdm",rand())
                    where b1.item_id is null
             ) l
        on a.item_id = coalesce(l.item_id,concat("tbcdm",rand());
  3. 取出热点数据。将主表(sdwd_tb_log_pv_di)和热点Key表(topk_item)内关联,此时需要用MAP JOIN,取到热点商品的日志数据。同时,需要将商品维表(dim_tb_itm)和热点Key表(topk_item)内关联,取到热点商品的维表数据,然后将第一部分数据外关联第二部分数据,因为第二部分只有热点商品的维表,数据量比较小,可以用MAP JOIN避免长尾。
    select   /*  mapjoin(a) */
             ...
    from
            (select   /*  mapjoin(b1) */
                      b2.*
             from
                     (select   item_id
                      from     topk_item
                      where    ds = '${bizdate}'
                      )b1
             join
                     (select   *
                      from     dwd_tb_log_pv_di
                      where    ds = '${bizdate}'
                      and      url_type = 'ipv'
                      and      item_id is not null
                      ) b2
             on       (b1.item_id = b2.item_id)
             ) l
    left outer join
            (select   /*  mapjoin(a1) */
                      a2.*
             from
                     (select   item_id
                      from     topk_item
                      where    ds = '${bizdate}'
                      ) a1
             join
                     (select   *
                      from     dim_tb_itm
                      where    ds = '${bizdate}'
                      ) a2
             on       (a1.item_id = a2.item_id)
             ) a
    on       a.item_id = l.item_id;
  4. 将步骤2和步骤3的数据通过union all合并后即得到完整的日志数据,并且关联了商品的信息。

注:代码来自阿里云官网。仅供参考,写的属实有点乱

1.3.4热点值导致长尾(自动处理)

对于skewjoin,平台在执行job时会自动进行如下操作

  1. 将它们存入临时的HDFS目录。其它数据正常执行
  2. 对倾斜数据开启map join操作,对非倾斜值采取普通join操作
  3. 将倾斜数据集和非倾斜数据集进行合并操作

hive.optimize.skewjoin.compiletime

如果建表语句元数据中指定了skew key,则使用set hive.optimize.skewjoin.compiletime=true开启skew join。

可以通过如下建表语句指定skewed key:

 create table list_bucket_single (key string, value string)
    skewed by (key) on (1,5,6) [stored as directories];

hive.optimize.skewjoin

该参数为在运行时动态指定数据进行skewjoin,一般和hive.skewjoin.key参数一起使用

  set hive.optimize.skewjoin=true;
  set hive.skewjoin.key=100000;

以上参数表示当key记录条数超过100000时采用skewjoin操作

  • 区别

    hive.optimize.skewjoin.compiletime和hive.optimize.skewjoin区别为前者为编译时参数(倾斜key事先知道),后者为运行时参数(运行时动态判断) 前者在生成执行计划时根据元数据生成skewjoin,此参数要求倾斜值一定;后者为运行过程中根据数据条数进行skewjoin优化。hive.optimize.skewjoin实际上应该重名为为hive.optimize.skewjoin.runtime参数,考虑兼容性没有进行重命名

参考文档: 文档一 文档二  参考文档  

1.3.5 大表join优化(未倾斜)

如果数据量超大,请 采用 bucket mapside join 或 sort-merge-bucket join ,具体参考  join详解

1.4group by

1.4.1map端聚合

set hive.map.aggr=true;
select count(*) from table2;

1.4.2热点值导致长尾(手动)

group by Key出现长尾,是因为某个Key内的计算量特别大。

对于确定的倾斜值,先均匀分布到各个reducer上,然后开启新一轮reducer进行统计操作。写法如下


// 此处只是一个例,实际情况可以在map端开启聚(此处忽略)
  -- 正常写法  //  
  select key
       , count(1) as cnt
    from tb_name
   group  by  key;
  ​
  -- 改进后写法 ,假设热点key值为 key001
  select a.key
       , sum(cnt) as cnt
   from (select key
              , if(key = 'key001',random(),0)
              , count(1) as cnt
           from tb_name
          group by key, 
                   if(key = 'key001',random(),0)
         ) t
   group by t.key;

由上可见,这次的执行计划变成了M>R>R。虽然执行的步骤变长了,但是长尾的Key经过2个步骤的处理,整体的时间消耗可能反而有所减少

1.4.3热点值导致长尾(自动)

如果在不确定倾斜值的情况下,可以设置hive.groupby.skewindata参数

  set hive.groupby.skewindata=true;
  select key
       , count(1) as cnt
    from tb_name
   group by key;

其原理和上述写法调整中类似,是先对key值进行均匀分布,然后开启新一轮reducer求值

1.4.4 distinct 长尾

对于Distinct,把长Key进行拆分的策略已经不生效了。对这种场景,您可以考虑通过其它方式解决。

--原始sql,不考虑uid为空。
select count(uid) as pv
     , count(distinct uid) as uv
  from userlog;


-- 改写后写法
select sum(pv) as pv
     , count(*) as uv
  from (select count(*) as pv
             , uid
          from userlog
         group by uid
       )t;

参考文档

2.任务优化

一个任务中有多个语句,或者join时涉及到多张表且涉及表数据量较大,计算逻辑复杂,建议将各语句进行拆分,多表join也可以进行拆开。好处如下

  • 代码清晰,各任务各司其职

  • 有些任务可以串行执行,提供执行效率

  • 如果任务依赖于多张上游表,上游表产出时间有先后,可以将先产出表先进行计算,避免最后上游表产出后任务统一计算,大大延迟结果表产出时间(分散计算)

  • 方便观察发现瓶颈任务,针对瓶颈任务单独进行优化

3.模型优化

3.1一个栗子

在生产中需要对tb_a_df全量表和tb_b_df全量表进行关联,补齐A表中value1中信息,表数据信息如下

tb_a_df:1.2 亿条数据,存储4GB
tb_b_df:130 亿条数据,存储11TB

3.2优化

优化前

-- yyyymmdd 为当天分区
insert overwrite table result_table partition (ds = '${yyyymmdd}')
select t.key
     , t.value
     , t1.value1
  from tb_a_df t
  left join tb_b_df t1
    on t.key = t1.key
   and t1.ds = '${yyyymmdd}'
 where t.ds = '${yyyymmdd}'

其中tb_b_df表数据量过大,join过程需要shuffle,极大消耗资源,以上任务运行时长为40分钟左右,且运行时长不稳定,影响数据产出时间

优化思路:

经验证发现:

tb_a_df表对应的增量表 tb_a_delta 最大更新35w条左右

tb_a_df 对应的增量表 tb_b_delta   每天更新量为9kw,100GB左右。

可以考虑将全量数据分为变动数据和非变动数据

于是我们从增量表入手将任务拆分为3个子task

3.3优化后

-- taska
-- 计算 tb_a 中有变动的数据
-- tb_a_delta 表数据量很小,可以采用map join,极大降低了消耗
insert overwrite table result_table_delta_a partition(ds = '${yyyymmdd}')
select /*+ mapjoin(t) */
       t.key
     , t.value
     , t1.value1
  from tb_a_delta t   -- 数据量 35w
  join tb_b_df t1     -- 130 亿条数据
    on t.key = t1.key
   and t1.ds = '${yyyymmdd}'
 where t.ds = '${yyyymmdd}'

--  taskb
-- 计算 tb_b 中有变动的数据,此时数据量小了很多,大大降低了shuffle的成本
insert overwrite table result_table_delta_b partition(ds = '${yyyymmdd}')
select t.key
     , t.value
     , t1.value1
  from tb_a_df t        -- 数据量 1.2亿
  join tb_b_delta t1    -- 数据量 9kw
    on t.key = t1.key
   and t1.ds = '${yyyymmdd}'
 where t.ds = '${yyyymmdd}'

-- taskc
-- 
with tmp as
(
-- a表变动数据
select t.key
     , t.value
     , t.value1
     , 1          flg
  from result_table_delta_a t
 where t.ds = '${yyyymmdd}'

 union all

-- b表变动数据
select t.key
     , t.value
     , t.value1
     , 2          flg
  from result_table_delta_b t
 where t.ds = '${yyyymmdd}'
),
tmp1 as 
(
-- 去重
select t.key
     , t.value
     , t.value1
     , row_number() over(partition by key order by flg desc) as rn  -- 优先取b变动数据
  from tmp t
)
insert overwrite table result_table partition (ds = '${yyyymmdd}')
-- 变动数据
select key
     , value
     , value1
  from tmp1 t
 where rn = 1

union all

-- 非变动数据,用昨天分区来补全
-- 注意,此处可以直接用昨天分区数据 left anti join tmp1 即可。由于生产中未作验证,因此此处没表述
select t.key
     , t.value
     , t2.value1
  from tb_a_df t
  left anti join tmp1 t1  -- 排除变动数据
    on t.key = t1.key
   and t1.rn = 1
  left join tb_a_df t2
    on t.key = t2.key
   and t2.ds = '${yyyymmdd-1}'  -- 昨天分区
 where t.ds = '${yyyymmdd}'

3.4效果&收益

效果

  • taska平均运行时长 5min

  • taskb平均运行时长 2.5min

  • taskc运行时长 7min

其中taska和taskb并行运行,待运行完成后再运行taskc,因此优化后任务时长为 5min + 7min = 12 min ,时长从 40min -> 12 min,时长降为原来的30%,也大大降低了计算资源

3.5其它方案

此处刚好 tb_a_delta 表较小,刚好可以采用 mapjoin,如果不能使用 mapjoin,需作何解?

可以采用hive中的buckted sort merge join方案,我司针对两个大表之间的join大量采用了此方案。

关于join介绍,请点击此处

4.系统级别

请参考 hive简介  9.企业级调优

5.其它拓展

5.1调整map数

5.1.1 map分割文件大小计算公式

计算公式 computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

详情请看 mapreduce简介 -> 3.1.2 Job提交流程和切片源码 -> 2.FileInputFormat切片源码解析(input.getSplits(job))

5.1.2 hive设置

hive中调整 maxSize 最大值即可

set mapreduce.input.fileinputformat.split.maxsize=100;

5.2调整reduce数

reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:

  1. hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)
  2. hive.exec.reducers.max(每个任务最大的reduce数,默认为999)

计算reducer数的公式N=min(参数2,总输入数据量/参数1)

即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;
 

select user_id
     , count(1) 
  from dwd_tb_crm_trd_rfd_case_df 
 where ds= '20201001'
 group by user_id;
// dwd_tb_crm_trd_rfd_case_df 总大小为9G多,因此这句有10个reduce

感悟

通过这个栗子我们可以看到模型对日常设计的重要性(此处只是一个案例,还有其它案例笔者后续总结出来)。是采用增量还是全量计算,是否将核心字段和非核心字段剥离 是平时需要关注的重要点

以上优化方式为一般且常见的优化方式,对于具体问题应该进行具体分析

    原文作者:不名一文
    原文地址: https://blog.csdn.net/u012485099/article/details/80801364
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。