站点图标 IDC铺

智能运维大数据处理时发生数据倾斜的解决方案

为何会发生数据倾斜?

19世纪末意大利经济学家帕累托发现在任何一组东西中,最重要的只占其中一小部分,约20%,其余80%尽管是多数,却是次要的,因此又称二八定律,又叫帕累托法则。

因此,正常的数据分布理论上来说都是会发生倾斜的,例如,在进行运维大数据分析时,80%的故障异常都是由20%的常见运维问题导致的,因此,会导致少数的问题有非常多的记录。

数据倾斜产生原因:

MapReduce模型中,数据倾斜问题是很常见的,因为同一个Key的Values,在进行groupByKey、countByKey、reduceByKey、join等操作时一定是分配到某一个节点上的一个Task中进行处理的,如果某个Key对应的数据量特别大的话,就会造成某个节点的堵塞甚至宕机的情况。在并行计算的作业中,整个作业的进度是由运行时间最长的那个Task决定的,在出现数据倾斜时,整个作业的运行将会非常缓慢,甚至会发生OOM异常。

解决方案

一、使用Hive ETL(提取、转换和加载) 预处理数据

Spark作业数据通常来源于Hive中,如果发生倾斜的数据来源于Hive表,可以通过Hive来进行数据预处理。直接通过Hive ETL,对数据预先进行聚合或join等操作,直接生成结果数据表,这样在Spark作业中针对的数据源就是预处理过后的Hive表,就不需要再去执行Shuffle类的算子了,从根源上解决了数据倾斜。

但是,因为数据本身就存在着分布不均的问题,所以在Hive ETL进行数据处理时,还是会发生数据倾斜,作业速度变慢的问题,这种解决方案只是避免的Spark作业时的数据倾斜问题。

二、提高Reduce任务的并行度

将Reduce Task的数量变多,就可以让每个Reduce Task分配到更少的数据量,这样可以缓解数据倾斜的问题。

在调用groupByKey、countByKey、reduceByKey时,传入Reduce端的并行度参数,这样在进行Shuffle操作时,会创建指定的Reduce Task,可以让每个Reduce Task分配到更少量的数据,避免了Spark作业时OOM的情况。

三、使用随机Key进行分步聚合

在groupByKey、reduceByKey操作时,第一次聚合的时候,对Key加一个随机前缀,例如10以内的随机数,将Key进行打散操作,将Key分为多组后,先进行局部聚合,在第二次聚合的时候,去除掉每个Key的前缀,再所有Key进行全局的聚合。

四、使用广播数据避免Reduce操作

当两个RDD要进行join操作时,其中一个RDD是比较小的,将较小的那个RDD进行Broadcast操作后,不使用join进行两个RDD的连接,因为普通的join操作是会触发Shuffle过程的,一旦触发Shuffle会将相同Key的数据都拉取到同一个Task中进行处理,而使用map join从广播变量中获取较小的RDD中的数据进行连接操作,不会触发Shuffle,避免了数据倾斜的发生。

但是,如果两个RDD都比较大,将其中一个RDD的数据进行Broadcast后,数据将会在每个Executor的Block Manger中都驻留一份,很有可能导致内存溢出,程序崩溃。

五、将发生数据倾斜的key单独进行join

首先通过Spark中的sample算子对数据进行随机抽样,然后对抽样出的数据中Key出现的次数进行排序,这样就可以找到导致数据倾斜的一个或多个Key,然后对数据进行Filter操作,可以将产生数据倾斜的Key和普通Key的数据分离,生成两个RDD。

对于可能产生数据倾斜的RDD,给每条数据都打上随机数进行打散操作后,再去进行join,然后去除前缀后再跟普通RDD进行join后的结果,进行union操作。

这种方案只需要针对数据中有少量导致数据倾斜的Key,如果导致数据倾斜的Key特别多,则不适用。

六、使用随机数和扩容表进行join

若在join操作时,RDD中有大量的Key导致了数据倾斜,可以考虑选择一个RDD进行扩容,将每条数据映射为多条数据,每条数据都带有0~n的前缀。另一个RDD的每条数据都打上一个n以内的随机值前缀。然后,将两个RDD进行join操作后,再去掉前缀。

此方案与方案五的区别在于,当有大量导致数据倾斜Key的情况时,没法将部分Key拆分出来单独处理,因此只能对整个RDD 进行数据扩容,对资源要求很高。

总结

方案1使用Hive进行数据预处理,适用于对Spark作业执行性能要求较高的场景,将数据倾斜在Hive ETL中解决,只有在Hive ETL周期性处理数据时作业速度较慢,其余的每次Spark作业速度都将很快;

方案2提高Reduce任务的并行度,指标不治本,因为,没有从根本上改变数据倾斜的本质和问题,只是缓解了执行Reduce Task时的压力;

方案3使用随机Key进行分步聚合,适用于groupByKey、reduceByKey等聚合类的操作;

方案4、5、6都是针对于join操作时发生的数据倾斜,方案4使用广播数据避免Reduce操作适用于其中一个RDD是比较小的情况,方案5将发生数据倾斜的key单独进行join适用于少量数据倾斜是由少量的Key导致的,方案6使用随机数和扩容表进行join对资源要求很高,适用于大量的Key导致了数据倾斜。

退出移动版