1.前言
输出这篇文章,至少参考了五个不同的spark优化文档,删除了不少调整不调整感觉对性能变化没啥用的内容,查漏补缺总结了如下十二条spark性能调优内容,感觉总结的也是相当全了。
2.调优一:资源配置
Spark性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的。实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。
调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度。如Spark Yarn模式,由于Yarn使用资源队列进行资源的分配和调度,在表写submit脚本的时候,就根据Spark作业要提交到的资源队列,进行资源的分配,比如资源队列有400G内存,100个CPU core,那么指定50个Executor,每个Executor分配8G内存,2个CPU core。
增加Executor个数
增加每个Executor的CPU core个数
在资源允许的情况下,增加每个Executor的Cpu core个数,可以提高执行task的并行度。比如有4个Executor,每个Executor有2个CPU core,那么可以并行执行8个task,如果将每个Executor的CPU core个数增加到4个(资源允许的情况下),那么可以并行执行16个task,此时的并行能力提升了一倍。
增加每个Executor的内存量
在资源允许的情况下,增加每个Executor的内存量以后,对性能的提升有三点:
1、可以缓存更多的数据(即对RDD进行cache),写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO;
2、可以为shuffle操作提供更多内存,即有更多空间来存放reduce端拉取的数据,写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO;
3、可以为task的执行提供更多内存,在task的执行过程中可能创建很多对象,内存较小时会引发频繁的GC,增加内存后,可以避免频繁的GC,提升整体性能。
spark-submit 命令参考:
/usr/local/spark/bin/spark-submit \ --class com.atguigu.spark.WordCount \ --num-executors 80 \ --driver-memory 6g \ --executor-memory 6g \ --executor-cores 3 \ --master yarn-cluster \ --queue root.default \ --conf spark.yarn.executor.memoryOverhead=2048 \ --conf spark.core.connection.ack.wait.timeout=300 \ /usr/local/spark/spark.jar
3.调优二:RDD优化
3.1.避免创建重复的RDD
对于同一份数据,只应该创建一个 RDD,不能创建多个 RDD 来代表同一份数据。
在开发 RDD lineage 极其冗长的Spark 作业时,可能会忘了自己之前对于某一份数据已经创建过一个 RDD 了,从而导致对于同一份数据,创建了多个 RDD。这就意味着,我们的 Spark 作业会进行多次重复计算来创建多个代表相同数据的 RDD,进而增加了作业的性能开销。
一个简单的例子:
// 需要对名为“hello.txt”的HDFS文件进行一次map操作,再进行一次reduce操作。也就是说,需要对一份数据执行两次算子操作。 // 错误的做法:对于同一份数据执行多次算子操作时,创建多个RDD。 // 这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,然后分别对每个RDD都执行了一个算子操作。 // 这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;第二次加载 HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。 val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") rdd1.map(...) val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") rdd2.reduce(...) // 正确的用法:对于一份数据执行多次算子操作时,只使用一个RDD。 // 这种写法很明显比上一种写法要好多了,因为我们对于同一份数据只创建了一个RDD,然后对这一个RDD执行了多次算子操作。 // 但是要注意到这里为止优化还没有结束,由于rdd1被执行了两次算子操作,第二次执行reduce操作的时候,还会再次从源头处重新计算一次rdd1的数据,因此还是会有重复计算的性能开销。 // 要彻底解决这个问题,必须结合“原则三:对多次使用的RDD进行持久化”,才能保证一个RDD被多次使用时只被计算一次。 val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") rdd1.map(...) rdd1.reduce(...)
3.2.RDD复用
在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算
未调优前:
一个简单的例子:
// 错误的做法。 // 有一个<Long, String>格式的RDD,即rdd1。 // 接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。 JavaPairRDD<Long, String> rdd1 = ... JavaRDD<String> rdd2 = rdd1.map(...) // 分别对rdd1和rdd2执行了不同的算子操作。 rdd1.reduceByKey(...) rdd2.map(...) // 正确的做法。 // 上面这个case中,其实rdd1和rdd2的区别无非就是数据格式不同而已,rdd2的数据完全就是rdd1的子集而已,却创建了两个rdd,并对两个rdd都执行了一次算子操作。 // 此时会因为对rdd1执行map算子来创建rdd2,而多执行一次算子操作,进而增加性能开销。 // 其实在这种情况下完全可以复用同一个RDD。 // 我们可以使用rdd1,既做reduceByKey操作,也做map操作。 // 在进行第二个map操作时,只使用每个数据的tuple._2,也就是rdd1中的value值,即可。 JavaPairRDD<Long, String> rdd1 = ... rdd1.reduceByKey(...) rdd1.map(tuple._2...) // 第二种方式相较于第一种方式而言,很明显减少了一次rdd2的计算开销。 // 但是到这里为止,优化还没有结束,对rdd1我们还是执行了两次算子操作,rdd1实际上还是会被计算两次。 // 因此还需要配合“RDD持久化:对多次使用的RDD进行持久化”进行使用,才能保证一个RDD被多次使用时只被计算一次。
3.3.RDD持久化
在Spark中,当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以之前的父RDD重新计算一次,这种情况是必须要避免的,对同一个RDD的重复计算是对资源的极大浪费,因此,必须对多次使用的RDD进行持久化,通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据。
对于RDD的持久化,有两点需要说明:
第一,RDD的持久化是可以进行序列化的,当内存无法将RDD的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中。
第二,如果对于数据的可靠性要求很高,并且内存充足,可以使用副本机制,对RDD数据进行持久化。当持久化启用了副本机制时,对于持久化的每个数据单元都存储一个副本,放在其他节点上面,由此实现数据的容错,一旦一个副本数据丢失,不需要重新计算,还可以使用另外一个副本。
代码示例:
// 如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。 // 正确的做法。 // cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。 // 此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。 // 第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。 val rdd1 = sc.textFile("hdfs://hadoop277ha/hello.txt").cache() rdd1.map(...) rdd1.reduce(...) // persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。 // 比如说,StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。 // 而且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每个partition都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。 // 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。 val rdd1 = sc.textFile("hdfs://hadoop277ha/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER) rdd1.map(...) rdd1.reduce(...)
对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别。
3.4.RDD尽可能早的filter操作
获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。
4.调优三:并行度调节
Spark作业中的并行度指各个stage的task的数量。
如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20个Executor,每个Executor分配3个CPU core,而Spark作业有40个task,这样每个Executor分配到的task个数是2个,这就使得每个Executor有一个CPU core空闲,导致资源的浪费。
理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度。
Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍。之所以没有推荐task数量与CPU core总数相等,是因为task的执行时间不同,有的task执行速度快而有的task执行速度慢,如果task数量与CPU core总数相等,那么执行快的task执行完成后,会出现CPU core空闲的情况。如果task数量设置为CPU core总数的2~3倍,那么一个task执行完毕后,CPU core会立刻执行下一个task,降低了资源的浪费,同时提升了Spark作业运行的效率。
Spark作业并行度的设置代码如下:
val conf = new SparkConf() .set("spark.default.parallelism", "500")
5.调优四:广播大变量
默认情况下,task中的算子中如果使用了外部的变量,每个task都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对RDD进行持久化,可能就无法将RDD数据存入内存,只能写入磁盘,磁盘IO将会严重消耗性能;另一方面,task在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的GC,GC会导致工作线程停止,进而导致Spark暂停工作一段时间,严重影响Spark性能。
假设当前任务配置了20个Executor,指定500个task,有一个20M的变量被所有task共用,此时会在500个task中产生500个副本,耗费集群10G的内存,如果使用了广播变量, 那么每个Executor保存一个副本,一共消耗400M内存,内存消耗减少了5倍。
广播变量在每个Executor保存一个副本,此Executor的所有task共用此广播变量,这让变量产生的副本数量大大减少。
在初始阶段,广播变量只在Driver中有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中尝试获取变量,如果本地没有,BlockManager就会从Driver或者其他节点的BlockManager上远程拉取变量的复本,并由本地的BlockManager进行管理;之后此Executor的所有task都会直接从本地的BlockManager中获取变量。
6.调优五:Kryo序列化
序列化和反序列化: java对象,如果要进行IO操作,就需要被序列化成字节数组
1、java中的序列化机制: 让自定义的类实现Serializable接口, 这种方式太重。!! 除了序列化属性的值意外,还会序列这个类的信息。 2、Hadoop当中:专门为hadoop提供了一种序列化机制:让自定义的类实现Writable 但是这两种方式,spark都不想用! 3、spark提供了一种新的优化方式:Kryo
在 Spark 中,主要有三个地方涉及到了序列化:
1、在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
2、将自定义的类型作为 RDD 的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
3、使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
对于这三种出现序列化的地方,我们都可以通过使用 Kryo 序列化类库,来优化序列化和反序列化的性能。Spark 默认使用的是 Java 的序列化机制,也就是 ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是 Spark 同时支持使用 Kryo序列化库,Kryo 序列化类库的性能比 Java序列化类库的性能要高很多。官方介绍,Kryo 序列化机制比 Java 序列化机制,性能高 10 倍左右。
Spark 之所以默认没有使用 Kryo 作为序列化类库,是因为 Kryo 要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
以下是使用 Kryo 的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为 RDD 泛型类型的自定义类型等):
// 创建SparkConf对象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 设置序列化器为KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册要序列化的自定义类型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) // 提供一种最暴力的方案:只要是RDD的泛型类,都应该使用这种方式
7.调优六:调节本地化等待时长
Spark作业运行过程中,Driver会对每一个stage的task进行分配。根据Spark的task分配算法,Spark希望task能够运行在它要计算的数据算在的节点(数据本地化思想),这样就可以避免数据的网络传输。通常来说,task可能不会被分配到它处理的数据所在的节点,因为这些节点可用的资源可能已经用尽,此时,Spark会等待一段时间,默认3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将task分配到比较差的本地化级别所对应的节点上,比如将task分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级。
当task要处理的数据不在task所在节点上时,会发生数据的传输。task会通过所在节点的BlockManager获取数据,BlockManager发现数据不在本地时,户通过网络传输组件从数据所在节点的BlockManager处获取数据。
网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此,我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分task,那么当前的task将有机会得到执行,这样就能够改善Spark作业的整体性能。
在Spark项目开发阶段,可以使用client模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的task数据本地化的级别,如果大部分都是PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看task的本地化级别有没有提升,并观察Spark作业的运行时间有没有缩短。
注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark作业的运行时间反而增加了。
Spark本地化等待时长的设置如代码清单:
val conf = new SparkConf().set("spark.locality.wait", "6")
8.调优七:尽量避免使用Shuffle类算子
如果有可能的话,要尽量避免使用 shuffle 类算子。因为 Spark 作业运行过程中,最消耗性能的地方就 是 shuffle过程。shuffle 过程,简单来说,就是将分布在集群中多个节点上的同一个 key,拉取到同一 个节点上,进行聚合或 join 等操作。比如 reduceByKey、join 等算子,都会触发 shuffle 操作。
shuffle 过程中,各个节点上的相同 key 都会先写入本地磁盘文件中,然后其他节点需要通过网络传输 拉取各个节点上的磁盘文件中的相同 key。而且相同 key 都拉取到同一个节点进行聚合操作时,还有可 能会因为一个节点上处理的 key 过多,导致内存不够存放,进而溢写到磁盘文件中。因此在 shuffle 过 程中,可能会发生大量的磁盘文件读写的 IO 操作,以及数据的网络传输操作。磁盘 IO 和网络数据传输 也是 shuffle 性能较差的主要原因。
因此在我们的开发过程中,能避免则尽可能避免使用 reduceByKey、join、distinct、repartition 等会 进行 shuffle 的算子,尽量使用 map 类的非 shuffle 算子。这样的话,没有 shuffle 操作或者仅有较少 shuffle 操作的Spark 作业,可以大大减少性能开销。
Broadcast与map进行join代码示例:
// 传统的join操作会导致shuffle操作。 // 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。 val rdd3 = rdd1.join(rdd2) // Broadcast+map的join操作,不会导致shuffle操作。 // 使用Broadcast将一个数据量较小的RDD作为广播变量。 val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data) // 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。 // 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。 // 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。 val rdd3 = rdd1.map(rdd2DataBroadcast...) // 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。 // 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
9.调优八:使用预聚合的算子操作
如果因为业务需要,一定要使用 shuffle 操作,无法用 map 类的算子来替代,那么尽量使用可以预聚合的算子。
所谓的预聚合,说的是在每个节点本地对相同的 key 进行一次聚合操作,类似于 MapReduce 中的本地 combiner。预聚合之后,每个节点本地就只会有一条相同的 key,因 为多条相同的 key都被聚合起来了。其他节点在拉取所有节点上的相同 key 时,就会大大减少需要拉取 的数据数量,从而也就减少了磁盘 IO 以及网络传输开销。通常来说,在可能的情况下,建议使用 reduceByKey 或者 aggregateByKey 算子来替代掉 groupByKey 算子。因为 reduceByKey 和 aggregateByKey 算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而 groupByKey 算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来 说比较差。
比如如下两幅图,就是典型的例子,分别基于 reduceByKey 和 groupByKey 进行单词计数。
这张图是 groupByKey 的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;
使用reduceByKey对性能的提升如下:
1、本地聚合后,在map端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用;
2、本地聚合后,下一个stage拉取的数据量变少,减少了网络传输的数据量;
3、本地聚合后,在reduce端进行数据缓存的内存占用减少;
4、本地聚合后,在reduce端进行聚合的数据量减少。
10.调优九:使用高性能算子
10.1.使用mapPartitions替代普通map
mapPartitions 类的算子,一次函数调用会处理一个 partition 所有的数据,而不是一次函数调用处理 一条,性能相对来说会高一些。但是有的时候,使用 mapPartitions 会出现 OOM(内存溢出)的问 题。因为单次函数调用就要处理掉一个 partition 所有的数据,如果内存不够,垃圾回收时是无法回收 掉太多对象的,很可能出现 OOM 异常。所以使用这类操作时要慎重!
因此,mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提升效果还是不错的。
10.2.使用foreachPartitions替代foreach
原理类似于“使用 mapPartitions 替代 map”,也是一次函数调用处理一个 partition 的所有数据,而不 是一次函数调用处理一条数据。在实践中发现,foreachPartitions 类的算子,对性能的提升还是很有帮 助的。比如在 foreach 函数中,将 RDD 中所有数据写 MySQL,那么如果是普通的 foreach 算子,就会 一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁 数据库连接,性能是非常低下;但是如果用 foreachPartitions 算子一次性处理一个 partition 的数据, 那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高 的。实践中发现,对于 1 万条左右的数据量写 MySQL,性能可以提升 30% 以上。
在生产环境中,全部都会使用foreachPartition算子完成数据库操作。
foreachPartition算子存在一个问题,与mapPartitions算子类似,如果一个分区的数据量特别大,可能会造成OOM,即内存溢出。
10.3.
在Spark任务中我们经常会使用filter算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过filter过滤后,每个分区的数据量有可能会存在较大差异,如下图所示:
我们可以发现两个问题:
每个partition的数据量变小了,如果还按照之前与partition相等的task个数去处理当前数据,有点浪费task的计算资源;
2、每个partition的数据量不一样,会导致后面的每个task处理每个partition数据的时候,每个task要处理的数据量不同,这很有可能导致数据倾斜问题。
如上图所示:第二个分区的数据过滤后只剩100条,而第三个分区的数据过滤后剩下800条,在相同的处理逻辑下,第二个分区对应的task处理的数据量与第三个分区对应的task处理的数据量差距达到了8倍,这也会导致运行速度可能存在数倍的差距,这也就是数据倾斜问题。
针对上述的两个问题,我们分别进行分析:
1、针对第一个问题,既然分区的数据量变小了,我们希望可以对分区数据进行重新分配,比如将原来4个分区的数据转化到2个分区中,这样只需要用后面的两个task进行处理即可,避免了资源的浪费。
2、针对第二个问题,解决方法和第一个问题的解决方法非常相似,对分区数据重新分配,让每个partition中的数据量差不多,这就避免了数据倾斜问题。
那么具体应该如何实现上面的解决思路?我们需要coalesce算子。
repartition与coalesce都可以用来进行重分区,其中repartition只是coalesce接口中shuffle为true的简易实现,coalesce默认情况下不进行shuffle,但是可以通过参数进行设置。
假设我们希望将原本的分区个数A通过重新分区变为B,那么有以下几种情况:
1、A > B(多数分区合并为少数分区)
① A与B相差值不大
此时使用coalesce即可,无需shuffle过程。
② A与B相差值很大
此时可以使用coalesce并且不启用shuffle过程,但是会导致合并过程性能低下,所以推荐设置coalesce的第二个参数为true,即启动shuffle过程。
2、A < B(少数分区分解为多数分区)
此时使用repartition即可,如果使用coalesce需要将shuffle设置为true,否则coalesce无效。
我们可以在filter操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition的数量,而且让每个partition的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能。
10.4.
有这样一个需求:
1、rdd.repartition().sort() // 先进行了重分区,然后进行分区的排序 这个效率相对来说没有下 面这种方高 1 + 1 = 2 2、rdd.repartitionAndSortWithinPartitions() // 边分区边排序 1 + 1 = 1.5
1、如果有reducer阶段就一定会进行shuffle 2、如果有shuffle,那么就一定会按照key排序 原因: 1、如果一个reduceTask执行计算的输入数据是无序的,则每个reduceTask进行分组聚合的时候,需要对这个输入文件进行多次扫描。 2、reduceTask期望它的输入数据是有序的。按照顺序来扫描这个文件一次,就能做完所有分组操作
11.结束
转载请注明:西门飞冰的博客 » Spark 性能调优总结