hive中的数据倾斜

1、数据倾斜的现象
分布式计算中最常见的,最容易遇到的问题就是数据倾斜,数据倾斜的现象是,当我们提交运行一个程序时,我们通过监控发现,这个程序的大多数的Task都已经运行结束了,只有某一个Task一直在运行,迟迟不能结束,导致整体的进度卡在99%或者100%,这时候我们就可以判定程序出现了数据倾斜的问题。

2 数据倾斜的原因
表面上看,发生数据倾斜的原因在于这个Task运行过慢,但是仔细分析我们会发现,这个Task运行过慢的原因在于这个Task的负载要比其他Task的负载要高,所以发生数据倾斜的直观原因在于Task的数据分配不均衡。

那为什么会出现多个Task数据分配不均衡的情况呢?
从两方面考虑,第一:数据本身就是倾斜的,数据中某种数据出现的次数过多。第二:分区规则导致这些相同的数据都分配给了同一个Task,导致这个Task拿到了大量的数据,而其他Task拿到的数据比较少,所以运行起来相比较于其他Task就比较慢一些。
综上所述,产生数据倾斜的根本原因在于分区规则。

3 group By的数据倾斜
当程序中出现group by或者count(distinct)等分组聚合的场景时,如果数据本身是倾斜的根据MapReduce的Hash分区规则,肯定会出现数据倾斜的现象。根本原因是因为分区规则导致的,所以我们可以通过以下几种方案来解决group by导致的数据倾斜的问题。

方案一:开启Map端聚合

-- 开启Map端聚合:Combiner
hive.map.aggr=true;

通过减少Reduce的输入量,避免每个Task数据差异过大导致数据倾斜

方案二:实现随机分区

-- SQL中避免数据倾斜,构建随机分区
select * from table distribute by rand();

distribute by用于指定底层的MapReduce按照哪个字段作为Key实现分区、分组等
默认由Hive自己选择,我们可以通过distribute by自己指定,通过rank函数随机值实现随机分区,避免数据倾斜

方案三:自动构建随机分区并自动聚合

-- 开启随机分区,走两个MapReduce 
hive.groupby.skewindata=true;

开启该参数以后,当前程序会自动通过两个MapReduce来运行
第一个MapReduce自动进行随机分区,然后实现聚合
第二个MapReduce将聚合的结果再按照业务进行处理,得到结果

4 Join的数据倾斜
实际业务需求中往往需要构建两张表的Join实现,如果两张表比较大,无法实现Map Join,只能走Reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题,面对Join产生的数据倾斜,我们核心的思想是尽量避免Reduce Join的产生,优先使用Map Join来实现,但往往很多的Join场景不满足Map Join的需求,那么我们可以以下几种方案来解决Join产生的数据倾斜问题:

方案一:提前过滤,将大数据变成小数据,实现Map Join
实现两张表的Join时,我们要尽量考虑是否可以使用Map Join来实现Join过程。有些场景下看起来是大表Join大表,但是我们可以通过转换将大表Join大表变成大表Join小表,来实现Map Join。

方案二:使用Bucket Join
如果使用方案一来避免Reduce Join ,有些场景下依旧无法满足,例如过滤后的数据依旧是一张大表,那么最后的Join依旧是一个Reduce Join
这种场景下,我们可以将两张表的数据构建为桶表,实现Bucket Map Join,避免数据倾斜

方案三:使用Skew Join
Skew Join是Hive中一种专门为了避免数据倾斜而设计的特殊的Join过程,这种Join的原理是将Map Join和Reduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用Map Join来实现,其他没有产生数据倾斜的数据由Reduce Join来实现,这样就避免了Reduce Join中产生数据倾斜的问题,最终将Map Join的结果和Reduce Join的结果进行Union合并

-- 开启运行过程中skewjoin
set hive.optimize.skewjoin=true;
-- 如果这个key的出现的次数超过这个范围
set hive.skewjoin.key=100000;
-- 在编译时判断是否会产生数据倾斜
set hive.optimize.skewjoinpiletime=true;
-- 不合并,提升性能
set hive.optimize.union.remove=true;
-- 如果Hive的底层走的是MapReduce,必须开启这个属性,才能实现不合并
set mapreduce.input.fileinputformat.input.dir.recursive=true;

hive中的数据倾斜

1、数据倾斜的现象
分布式计算中最常见的,最容易遇到的问题就是数据倾斜,数据倾斜的现象是,当我们提交运行一个程序时,我们通过监控发现,这个程序的大多数的Task都已经运行结束了,只有某一个Task一直在运行,迟迟不能结束,导致整体的进度卡在99%或者100%,这时候我们就可以判定程序出现了数据倾斜的问题。

2 数据倾斜的原因
表面上看,发生数据倾斜的原因在于这个Task运行过慢,但是仔细分析我们会发现,这个Task运行过慢的原因在于这个Task的负载要比其他Task的负载要高,所以发生数据倾斜的直观原因在于Task的数据分配不均衡。

那为什么会出现多个Task数据分配不均衡的情况呢?
从两方面考虑,第一:数据本身就是倾斜的,数据中某种数据出现的次数过多。第二:分区规则导致这些相同的数据都分配给了同一个Task,导致这个Task拿到了大量的数据,而其他Task拿到的数据比较少,所以运行起来相比较于其他Task就比较慢一些。
综上所述,产生数据倾斜的根本原因在于分区规则。

3 group By的数据倾斜
当程序中出现group by或者count(distinct)等分组聚合的场景时,如果数据本身是倾斜的根据MapReduce的Hash分区规则,肯定会出现数据倾斜的现象。根本原因是因为分区规则导致的,所以我们可以通过以下几种方案来解决group by导致的数据倾斜的问题。

方案一:开启Map端聚合

-- 开启Map端聚合:Combiner
hive.map.aggr=true;

通过减少Reduce的输入量,避免每个Task数据差异过大导致数据倾斜

方案二:实现随机分区

-- SQL中避免数据倾斜,构建随机分区
select * from table distribute by rand();

distribute by用于指定底层的MapReduce按照哪个字段作为Key实现分区、分组等
默认由Hive自己选择,我们可以通过distribute by自己指定,通过rank函数随机值实现随机分区,避免数据倾斜

方案三:自动构建随机分区并自动聚合

-- 开启随机分区,走两个MapReduce 
hive.groupby.skewindata=true;

开启该参数以后,当前程序会自动通过两个MapReduce来运行
第一个MapReduce自动进行随机分区,然后实现聚合
第二个MapReduce将聚合的结果再按照业务进行处理,得到结果

4 Join的数据倾斜
实际业务需求中往往需要构建两张表的Join实现,如果两张表比较大,无法实现Map Join,只能走Reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题,面对Join产生的数据倾斜,我们核心的思想是尽量避免Reduce Join的产生,优先使用Map Join来实现,但往往很多的Join场景不满足Map Join的需求,那么我们可以以下几种方案来解决Join产生的数据倾斜问题:

方案一:提前过滤,将大数据变成小数据,实现Map Join
实现两张表的Join时,我们要尽量考虑是否可以使用Map Join来实现Join过程。有些场景下看起来是大表Join大表,但是我们可以通过转换将大表Join大表变成大表Join小表,来实现Map Join。

方案二:使用Bucket Join
如果使用方案一来避免Reduce Join ,有些场景下依旧无法满足,例如过滤后的数据依旧是一张大表,那么最后的Join依旧是一个Reduce Join
这种场景下,我们可以将两张表的数据构建为桶表,实现Bucket Map Join,避免数据倾斜

方案三:使用Skew Join
Skew Join是Hive中一种专门为了避免数据倾斜而设计的特殊的Join过程,这种Join的原理是将Map Join和Reduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用Map Join来实现,其他没有产生数据倾斜的数据由Reduce Join来实现,这样就避免了Reduce Join中产生数据倾斜的问题,最终将Map Join的结果和Reduce Join的结果进行Union合并

-- 开启运行过程中skewjoin
set hive.optimize.skewjoin=true;
-- 如果这个key的出现的次数超过这个范围
set hive.skewjoin.key=100000;
-- 在编译时判断是否会产生数据倾斜
set hive.optimize.skewjoinpiletime=true;
-- 不合并,提升性能
set hive.optimize.union.remove=true;
-- 如果Hive的底层走的是MapReduce,必须开启这个属性,才能实现不合并
set mapreduce.input.fileinputformat.input.dir.recursive=true;