前言
数据倾斜是大数据计算中一个最棘手的问题,出现数据倾斜后,Spark 作业的性能会比期望值差很多,两大直接后果:Spark 任务 OOM 异常退出和数据倾斜拖慢整个任务的执行。数据倾斜的调优,就是利用各种技术方案解决不同类型的数据倾斜问题,保证 Spark 作业的性能。
导致分布式计算应用出现数据倾斜的原因就是shuffle 数据倾斜的调优,都是围绕着:
1、要么就不要使用shuffle
2、要么就让shuffle在执行过程中均匀分发数据
最终的目的:Spark中的同一个stage中的多个Task处理的数据量大小几乎是一致的。
什么是数据倾斜
数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其他部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
数据倾斜发生的现象
绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。
原本能够正常执行的spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。
总结:
1、大部分任务都很快执行完,用时也相差无几,但个别task执行耗时很长,整个应用程序一直处于99%左右的状态。
2、一直运行正常的Spark Application昨晚突然OOM了。
数据倾斜发生的原理
数据倾斜的原因很简单:在进行shuffle的时候,必须将各个节点上相同的key的数据拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个spark作业的运行进度是由运行时间最长的那个task决定的。
因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。
下图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world和you这两个key分别才对应1条数据,所以另外两个task只要分别处理1条数据即可。此时第一个task的运行时间可能是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。
数据倾斜的危害
当出现数据倾斜时,小量任务耗时远高于其他任务,从而使得整体耗时过大,未能充分发挥分布式系统的并行计算优势。另外,当发生数据倾斜时,少量部分任务处理的数据量过大,可能造成内存不足使得任务失败,并行而引进整个应用失败。如果应用并没有因此失败,但是大量正常任务都早早完成处于等待状态,资源得不到充分利用。
总结:
1、整体耗时过大(整个任务的完成由执行时间最长的那个task决定)
2、应用程序可能异常退出(某个Task执行时处理的数据量远远大于正常节点,则需要的资源容易出现瓶颈, 当资源不足,则应用程序退出)
3、资源闲置(处理等待状态的Task资源得不到及时的释放,处于闲置浪费状态)
如何从数据源避免倾斜
如果spark本身的逻辑没有问题,就需要从数据来源判断是否有造成数据倾斜的可能,如果spark读取了倾斜的数据,那么spark本身也就会倾斜。
避免数据源倾斜HDFS
Spark 以通过 textFile(path, minPartitions) 方法读取文件时,使用 TextInputFormat。
此时可通过在数据生成端将不可切分文件存储为可切分文件,或者保证各文件包含数据量相同的方式避免数据倾斜。
总结:
对于不可切分文件可能出现数据倾斜,对于可切分文件,一般来说,不存在数据倾斜问题。
1、可切分: 基本上不会! 默认数据块大小:128M
2、不可切分: 源文件不均匀,最终导致 分布式引用程序计算产生数据倾斜
避免数据源倾斜KafKa
场景:在Kafka生产者向指定主题的topic写入数据时,要是使用了Hash或者跟业务有关的分区规则,大概率会导致topic 个别分区出现大量数据,而其他分区数据极小,我们知道Kafka一个分区只能有一个消费者,一旦一个分区数据量过于庞大,也会影响到下游的消费者处理速度。
解决方案:
不要让Kafka的存储涉及到计算和业务逻辑
1、Kafka不是计算引擎,只是一个用来在流式项目架构中起削峰填谷作用的消息中转平台,所以为保证一个 Topic的分布式平衡,尽量不要使用Hash散列或者是跟业务有关的自定义分区规则等方式来进行数据分区,否 则会造成下游消费者一开始就产生了数据倾斜。
2、Kafka尽量使用随机,轮询等不会造成数据倾斜的数据分区规则
各司其职挺好的,不然的话,会得不偿失
避免数据源倾斜hive
场景:导致数据倾斜的是 Hive 表。如果该 Hive 表中的数据本身很不均匀(比如某个 key 对应了 100 万数据,其他 key 才对应了 10 条数据),而且业务场景需要频繁使用 Spark 对 Hive 表执行某个分析操作
解决方案:
此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过 Hive ETL 预先对数据按照 key 进行聚合,或者是预先和其他表进行join),然后在 Spark 作业中针对的数据源就不是原来的 Hive 表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在 Spark 作业中也就不需要使用原先的 shuffle 类算子执行这类操作了。
方案缺点:
这种方案虽然从根源上解决了spark的数据倾斜,但是这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。
企业最佳实践:
在一些 Java 系统与 Spark 结合使用的项目中,会出现 Java 代码频繁调用 Spark 作业的场景,而且对Spark 作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的 Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次 Java 调用 Spark作业时,执行速度都会很快,能够提供更好的用户体验。
数据采样
定位处理逻辑 – Stage 和 Task
目的:知道那块的代码处理当中导致的倾斜
归根结底,数据倾斜产生的原因,就是两个 stage 中的 shuffle 过程导致的。所以我们只需要研究Shuffle 算子即可,比如 distinct、groupByKey、reduceByKey、aggergateByKey、join、cogroup、repartition 等。
出现数据倾斜,可以通过spark Web UI 管理监控界面,查看各stage的运行情况,如果某一个stage的运行很长,并且这个stage的大部分task都很快,只有极个别task慢,那么就研究这个Stage的Shuffle算子就行了,问题一定就出现在这里。
查看导致倾斜的key的数据分布情况
目的:知道处理过程当中数据变成什么样的分布了
知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的 RDD/Hive表,查看一下其中key的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同 的key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解决。
此时根据执行操作的情况不同,可以有很多种查看key分布的方式:
1、如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下 SQL 中使用的表的key 分布情况。
2、如果是对 Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看 key 分布 的代码,比如 RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印 一下,就可以看到key的分布情况。
举例来说,对于上面所说的单词计数程序,如果确定了是 stage1 的 reduceByKey 算子导致了数据倾 斜,那么就应该看看进行 reduceByKey 操作的 RDD 中的 key 分布情况,在这个例子中指的就是 pairs RDD。如下示例,我们可以先对 pairs 采样 10% 的样本数据,然后使用 countByKey 算子统计出每个 key 出现的次数,最后在客户端遍历和打印样本数据中各个 key 的出现次数。
数据倾斜解决方案
任何一种解决方案都是有对应的前提的,没有通用的解决方案。
前提:
第一:知道那块的代码处理当中导致的倾斜
第二:知道处理过程当中数据变成什么样的分布了
方案一:调整shuffle的并行度
核心思路:碰运气
适用场景:
大量不同的Key被分配到了相同的Task造成该Task数据量过大。
如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。但是也是一种属于碰运气的方案。因为这种方案,并不能让你一定解决数据倾斜,甚至有可能加重。那当然,总归,你会调整到一个合适的并行度是能解决的。前提是这种方案适用于 Hash散列的分区方式。凑巧的是,各种分布式计算引擎,比如MapReduce,Spark 等默认都是使用 Hash散列的方式来进行数据分区。
Spark 在做 Shuffle 时,默认使用 HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的 Key 对应的数据被分配到了同一个 Task 上,造成该 Task 所处理的数据远大于其它 Task,从而造成数据倾斜。
如果调整 Shuffle 时的并行度,使得原本被分配到同一 Task 的不同 Key 发配到不同 Task 上处理,则可降低原 Task 所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。
实现思路:
在对 RDD 执行 Shuffle 算子时,给 Shuffle 算子传入一个参数,比如 reduceByKey(1000),该参数就设置了这个 shuffle 算子执行时shuffle read task 的数量。对于 Spark SQL 中的 Shuffle 类语句,比如group by、join 等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了 shufflereadTask 的并行度,该值默认是 200,对于很多场景来说都有点过小。
实现原理:
增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个task 处理比原来更少的数据。举例来说,如果原本有 5 个key,每个 key 对应 10 条数据,这 5 个 key都是分配给一个 task 的,那么这个 task 就要处理 50 条数据。而增加了 shuffle read task 以后,每个task 就分配到一个 key,即每个 task 就处理 10 条数据,那么自然每个 task 的执行时间都会变短了。
具体原理如下图所示。
方案优缺点:
优点:
实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。实现简单,可在需要Shuffle的操作算子上直接设 置并行度或者使用spark.default.parallelism设置。如果是Spark SQL,还可通过SET spark.sql.shuffle.partitions=[num_tasks]设置并行度。可用最小的代价解决问题。一般如果出现 数据倾斜,都可以通过这种方法先试验几次,如果问题未解决,再尝试其它方法。
缺点:
只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。适用场景少,只能将分配到 同一Task的不同Key分散开,但对于同一Key倾斜严重的情况该方法并不适用。并且该方法一般只能缓解数据 倾斜,没有彻底消除问题。
最佳实践:
该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。
方案二:过滤少数导致倾斜的key
核心思想: 横向切分
适用场景:
如果发现导致倾斜的 key 就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。
比如 99% 的 key 就对应 10 条数据,但是只有一个 key 对应了 100 万数据,从而导致了数据倾斜。
实现思路:
如果我们判断那少数几个数据量特别多的 key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个 key。比如,在 Spark SQL 中可以使用 where 子句过滤掉这些 key 或者在SparkCore 中对 RDD 执行 filter 算子过滤掉这些 key。如果需要每次作业执行时,动态判定哪些 key的数据量最多然后再进行过滤,那么可以使用 sample 算子对 RDD 进行采样,然后计算出每个 key 的数量,取数据量最多的 key 过滤掉即可。
实现原理:
1、将导致数据倾斜的 key 给过滤掉之后,这些 key 就不会参与计算了,自然不可能产生数据倾斜。
2、剩下的所有的key的所有数据也单独处理,最终把两个任务的结果合起来即可!
SQL举例:
原始的SQL: select a.*, b.* from a join b on a.id = b.id; 倾斜的原因:(id=1,id=2,id=3 这三个key的数据特别多) 改进后的SQL: select a.*, b.* from a join b on a.id = b.id where a.id not in (1,2,3) union select a.*, b.* from a join b on a.id = b.id where a.id in (1,2,3);
方案优缺点:
优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。
缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。
方案三:将reduce join转为map join
适用场景:
在对 RDD 使用 join 类操作,或者是在 Spark SQL 中使用 join 语句时,而且 join 操作中的一个 RDD 或表的数据量比较小(比如几百M或者一两G),比较适用此方案。
在分布式计算引擎中,实现Join的思路有两种:
1、MapJoin,顾名思义,Join逻辑的完成是在 Mapper 阶段就完成了,这是假定执行的是 MapReduce任务,如果是 Spark任务,表示只用一个 Stage 就执行完了 Join 操作。
优点:避免了两阶段之间的shuffle,效率高,没有shuffle也就没有了倾斜。
缺点:多使用内存资源,只适合大小表做join的场景
2、ReduceJoin,顾名思义,Join逻辑的完成是在 Reducer 阶段完成的。那么如果是MapReduce任务,则表示 Maper阶段执行完之后把数据 Shuffle到 Reducer阶段来执行 Join 逻辑,那么就可能导致数据倾斜。如果是 Spark任务,意味着,上一个stage的执行结果数据shuffle到 下一个stage中来完成Join 操作,同样也可能产生数据倾斜。
优点:这是一种通用的join,在不产生数据倾斜的情况下,能完成各种类型的join
缺点:会发生数据倾斜的情况
实现思路:
不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
实现原理:
普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将相同 key 的数据拉取到一个 shuffleread task 中再进行 join,此时就是 reduce join。但是如果一个 RDD 是比较小的,则可以采用广播小RDD 全量数据 +map 算子来实现与 join 同样的效果,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。
具体原理如下图所示。
方案优缺点:
优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜
缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver 和每个Executor 内存中都会驻留一份小 RDD 的全量数据。如果我们广播出去的 RDD 数据比较大,比如 10G 以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况
方案四:两阶段聚合(局部聚合+全局聚合)
分拆的动作:核心思想:横向切分
分阶段执行:核心思想:纵向切分
适用场景:
对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。
实现思路:
这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello,1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。
实现原理:
将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。
方案优缺点:
优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大 幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。
缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。
方案五:使用随机前缀和扩容 RDD 进行 join
适用场景:
如果在进行 join 操作时,RDD 中有大量的 key 导致数据倾斜,那么进行分拆 key 也没什么意义。
实现思路:
1、首先查看 RDD/Hive 表中的数据分布情况,找到那个造成 数据倾斜的 RDD/Hive 表,比如有多个key 都对应了超过1万条数据。
2、然后将该RDD的每条数据都打上一个n以内的随机前缀。
3、同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个 0~n的前缀。
4、最后将两个处理后的RDD进行join即可。
实现原理:
将原先一样的 key 通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与”两阶段聚合”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。
方案优缺点:
优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错
缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。
方案六:任务横切,一分为二,单独处理
适用场景:
有时候,一个Spark应用程序中,导致倾斜的因素不是一个单一的,比如有一部分倾斜的因素是null,有一部分倾斜的因素是某些个key分布特别多。那么拆分出来也得使用不同的手段来处理
实现思路:
导致数据倾斜的因素比较多,比较复杂的场景中。
1、所有导致倾斜的数据拿出来处理
2、所有不导致倾斜的数据拿出来处理
实现原理:
在了解清楚数据的分布规律,以及确定了数据倾斜是由何种原因导致的,那么按照这些原因,进行数据的拆分,然后单独处理每个部分的数据,最后把结果合起来。
方案优缺点:
优点:将多种简单的方案综合起来,解决一个复杂的问题。可以算上一种万能的方案。
缺点:确定数据倾斜的因素比较复杂,导致解决该数据倾斜的方案比较难实现落地。代码复杂度也较高。
方案七:多种方案组合使用
在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。
如果多种方案,组合使用也不行,请看最后一招:自定义分区规则
方案八:自定义分区
适用场景:
使用自定义的 Partitioner 实现类代替默认的 HashPartitioner,尽量将所有不同的 Key 均匀分配到不同的 Task中
适用场景:
大量不同的Key被分配到了相同的Task造成该Task数据量过大。
实现思路:
先通过抽样,了解数据的 key 的分布规律,然后根据规律,去定制自己的数据分区规则,尽量保证所有的 Task 的数据量相差无几。
实现原理:
使用自定义的 Partitioner(默认为HashPartitioner),将原本被分配到同一个 Task 的不同 Key 分配到不同 Task。
方案优缺点:
优点:灵活,通用。
缺点:必须根据对应的场景设计合理的分区方案。没有现成的方案可用,需临时实现。
转载请注明:西门飞冰的博客 » Spark 数据倾斜原理和解决方案