1.依赖关系
1.1.血缘关系介绍
多个连续的RDD的依赖关系,称之为血缘关系,通过RDD的血缘关系,就可以构建出DAG 有向无环图。
RDD为了提高容错性,需要将RDD间的关系保存下来,一旦出现错误,可以根据血缘关系将数据源重新读取进行计算。
1.2.查看血缘关系
任意转换算子使用 toDebugString方法打印血缘关系
代码示例:
public class Test01_dep { public static void main(String[] args) throws InterruptedException { // 1 创建SparkConf SparkConf sparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[2]"); // 2 创建SparkContext JavaSparkContext sc = new JavaSparkContext(sparkConf); // 3 编写代码 JavaRDD<String> lineRDD = sc.textFile("input/1.txt", 2); JavaRDD<String> flatMap = lineRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { String[] words = line.split(" "); return Arrays.stream(words).iterator(); } }); JavaRDD<String> filter = flatMap.filter(new Function<String, Boolean>() { @Override public Boolean call(String word) throws Exception { return !"".equals(word) && word != null; } }); JavaRDD<String> coalesce = filter.coalesce(1, false); JavaPairRDD<String, Integer> mapToPair = coalesce.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<>(word, 1); } }); JavaPairRDD<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer sum, Integer elem) throws Exception { return sum + elem; } }); // 打印血缘关系 System.out.println(reduceByKey.toDebugString()); reduceByKey.collect(); Thread.sleep(999999); // 4 关闭资源 sc.stop(); } }
输出结果:
(1) ShuffledRDD[6] at reduceByKey at Test01_dep.java:51 [] +-(1) MapPartitionsRDD[5] at mapToPair at Test01_dep.java:44 [] | CoalescedRDD[4] at coalesce at Test01_dep.java:43 [] | MapPartitionsRDD[3] at filter at Test01_dep.java:36 [] | MapPartitionsRDD[2] at flatMap at Test01_dep.java:28 [] | input/1.txt MapPartitionsRDD[1] at textFile at Test01_dep.java:25 [] | input/1.txt HadoopRDD[0] at textFile at Test01_dep.java:25 []
血缘关系输出结果说明:
(1)从下往上看RDD的转化 [0], 中括号里面的数字越小, 他的血缘关系越靠顶层
(2)同一个算子,如结果中textFile 有两个RDD, 证明他做了两个RDD转化,分别是读取文件和分区
(3)+- 分别表示进行了shuffe读和shuffe写
(4)小括号代表整个阶段的分区数量,也就是task的数量(注意:在同一个阶段中分区的数量始终一致)
1.3.窄依赖和宽依赖
窄依赖:表示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一or多对一),窄依赖我们形象的比喻为独生子女。
宽依赖:表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle,总结:宽依赖我们形象的比喻为超生。
新的RDD的一个分区的数据依赖于旧的RDD多个分区的数据
宽依赖依赖称之为Shuffle依赖。
具有宽依赖的转换算子包括:sort、reduceByKey、groupByKey、join和调用rePartition函数的任何操作。
宽依赖对Spark去评估一个转换算子有更加重要的影响,比如对性能的影响。
在不影响业务要求的情况下,要尽量避免使用有宽依赖的转换算子,因为有宽依赖,就一定会走shuffle,影响性能。
2.阶段任务划分
任务和阶段划分的位置是在Driver端执行的
任务和阶段划分一共有四个流程,具体如下:
RDD Objects:负责把所有RDD的算子串联起来,串联起来之后就形成了DAG图,这个时候还没有阶段和任务的划分,也没有其他的优化
DAGScheduler:负责切割DAG图,划分具体的阶段和任务
TaskScheduler:把DAGScheduler模块划分好的task调度到worker或者Execute当中执行
Worker:在Execute当中执行Task
Spark 任务切分中间分为:Application、Job、Stage和Task
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;一个Stage就是一个阶段,因为宽依赖走了Shuffle
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注意:
1、Application->Job->Stage->Task每一层都是1对n的关系。
2、spark 任务执行是一个Stage一个Stage执行的,不能多个Stage同时执行
为什么spark 把每一个阶段最后一个行动算子的分区数量当作整个阶段的分区?
因为在一个阶段当中最后一个行动算子性能最差,因为他要写Shuffle和磁盘文件进行交互
只要把最后一个算子性能优化好了,那么整个Stage的性能一般也不会差,因为前面的算子都在内存中交互,只有最后一个算子在磁盘中交互
如何调整一个阶段的分区数量:
public class Dep { public static void main(String[] args) throws InterruptedException { // 1.创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore"); // 2. 创建sparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 3. 编写代码 JavaRDD<String> lineRDD = sc.textFile("input/2.txt"); System.out.println(lineRDD.toDebugString()); System.out.println("-------------------"); JavaRDD<String> wordRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { List<String> stringList = Arrays.asList(s.split(" ")); return stringList.iterator(); } }); System.out.println(wordRDD); System.out.println("-------------------"); JavaPairRDD<String, Integer> tupleRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s, 1); } }); System.out.println(tupleRDD.toDebugString()); System.out.println("-------------------"); // 缩减分区 JavaPairRDD<String, Integer> coalesceRDD = tupleRDD.coalesce(1); JavaPairRDD<String, Integer> wordCountRDD = coalesceRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } },4); System.out.println(wordCountRDD.toDebugString()); System.out.println("-------------------"); wordCountRDD.collect().forEach(System.out::println); Thread.sleep(600000); // 4. 关闭sc sc.stop(); } }
3.任务调度原理
Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度,总体调度流程如下图所示:
DAGScheduler负责Stage级的调度,主要是将job切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统
下面两张图描述了Spark-On-Yarn模式下在任务调度期间,ApplicationMaster、Driver以及Executor内部模块的交互过程:
模块交互过程:
3.1.Stage调度
前面提到过,Stage和Task在DAGScheduler中,根据宽依赖做Stage划分,根据分区数做Task划分,Task包含在Stage中。
一个Stage 划分完毕后Stage是否被提交,需要判断它的父Stage是否执行,只有在父Stage执行完毕才能提交当前Stage,如果一个Stage没有父Stage,那么从该Stage开始提交。
TaskScheduler会监控Stage的运行状态,只有Executor丢失或者Task由于Fetch失败才需要重新提交失败的Stage以调度运行失败的任务,其他类型的Task失败会在TaskScheduler的调度过程中重试。
相对来说Stage调度比较简单,仅仅是通过DAGScheduler在Stage层面上划分DAG,在提交Stage并监控相关状态信息。TaskScheduler则相对较为复杂,下面详细阐述其细节。
3.2.Task调度
Spark Task的调度是由TaskScheduler来完成,由前文可知,DAGScheduler将Stage打包到TaskSet交给TaskScheduler,TaskScheduler会将TaskSet封装为TaskSetManager加入到调度队列中,TaskSetManager结构如下图所示。
TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务。
TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接收Executor的注册信息,并维护Executor的状态,所以说SchedulerBackend是管“粮食”的,同时它在启动后会定期地去“询问”TaskScheduler有没有任务要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余粮,你要不要啊”,TaskScheduler在SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行。
3.3.Task调度策略
TaskScheduler会先把DAGScheduler给过来的TaskSet封装成TaskSetManager扔到任务队列里,然后再从任务队列里按照一定的规则把它们取出来在SchedulerBackend给过来的Executor上运行。这个调度过程实际上还是比较粗粒度的,是面向TaskSetManager的。
TaskScheduler是以树的方式来管理任务队列,树中的节点类型为Schdulable,叶子节点为TaskSetManager,非叶子节点为Pool,下图是它们之间的继承关系。
TaskScheduler。
3.3.1.FIFO调度策略
3.3.2.FAIR调度策略
FAIR调度策略的树结构如下图所示:
FAIR模式中有一个rootPool和多个子Pool,各个子Pool中存储着所有待分配的TaskSetMagager。
在FAIR模式中,需要先对子Pool进行排序,再对子Pool里面的TaskSetMagager进行排序,因为Pool和TaskSetMagager都继承了Schedulable特质,因此使用相同的排序算法。
排序过程的比较是基于Fair-share来比较的,每个要排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks值,minShare值以及weight值。
注意,minShare、weight的值均在公平调度配置文件fairscheduler.xml中被指定,调度池在构建阶段会读取此文件的相关配置。
1、如果A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare,那么B排在A前面;(runningTasks比minShare小的先执行)
2、如果A、B对象的runningTasks都小于它们的minShare,那么就比较runningTasks与minShare的比值(minShare使用率),谁小谁排前面;(minShare使用率低的先执行)
3、如果A、B对象的runningTasks都大于它们的minShare,那么就比较runningTasks与weight的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行)
4、如果上述比较均相等,则比较名字。
整体上来说就是通过minShare和weight这两个参数控制比较过程,可以做到让minShare使用率和权重使用率少(实际运行task比例较少)的先运行。
FAIR模式排序完成后,所有的TaskSetManager被放入一个ArrayBuffer里,之后依次被取出并发送给Executor执行。
从调度队列中拿到TaskSetManager后,由于TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出Task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。
3.4.Task本地化调度
任务分配原则:根据每个Task的优先位置,确定Task的Locality(本地化)级别,本地化一共有五种,优先级由高到低顺序:
移动数据不如移动计算。
名称 | 解析 |
---|---|
PROCESS_LOCAL | 进程本地化,task和数据在同一个Executor中,性能最好。 |
NODE_LOCAL | 节点本地化,task和数据在同一个节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输。 |
RACK_LOCAL | 机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。 |
NO_PREF | 对于task来说,从哪里获取都一样,没有好坏之分。 |
ANY | task和数据可以在集群的任何地方,而且不在一个机架中,性能最差。 |
在调度执行时,Spark调度总是会尽量让每个task以最高的本地性级别来启动,当一个task以本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以本地性级别来启动该task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。
可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的Executor可能就会有相应的资源去执行此task,这就在在一定程度上提到了运行性能。
3.5.Task失败重试与黑名单机制
Task运行失败会被告知给TaskSetManager,对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败。
转载请注明:西门飞冰的博客 » Spark RDD 依赖关系和阶段任务划分及任务调度原理