1.前言
默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD时指定。注意:这里的并行执行的任务数量,并不是指的切分任务的数量。
Spark分区的目的是为了并行计算,因为一个分区就是一个task进行计算,多个task可以同时在不同Executor执行。注意一个Executor只能运行一个task,属于并发执行,而不是并行执行。
一个任务需要多少个分区,是可以进行手动设置的。
spark可以进行分区的位置有两个:
一个是最初始从文件和集合读取数据的时候,根据分区规则给数据进行分区
一个是在使用Shuffle的时候,使用分区器给数据进行重分区
分区的好处:
1、创建足够的分区,在足够的资源下可以并行计算。
2、面对数据倾斜的场景,可以通过增加分区数量和自定义分区规则,进行规避。
2.RDD 分区规则
2.1.从集合中创建RDD
public class list_partition { public static void main(String[] args) { // 1 创建Spark配置 SparkConf sparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[2]"); // 2 创建SparkContext JavaSparkContext sc = new JavaSparkContext(sparkConf); // 从集合当中创建RDD的分区规则: // parallelize 方法传递的第二个参数表示分区的数量,不传递则使用默认值 // 分区默认值:是当前运行环境的最大可用核数 JavaRDD<Integer> javaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); // 数据分区的情况 // 利用整数除机制 左闭右开 // 分区0 => start 0*5/3=0 end 1*5/3=1 => 存储数据1 // 分区1 => start 1*5/3=1 end 2*5/3=3 => 存储数据2,3 // 分区2 => start 2*5/3=3 end 3*5/3=5 => 存储数据4,5 javaRDD.saveAsTextFile("output"); // 4 关闭资源 sc.stop(); } }
2.2.从文件中创建RDD
1.txt 文件内容如下
public class file_partition { public static void main(String[] args) { // 1 创建SparkConf SparkConf sparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]"); // 2 创建SparkContext JavaSparkContext sc = new JavaSparkContext(sparkConf); // 3 编写代码 // textFile可以将文件作为数据处理的数据源,也可以设定分区 // minPartitions:最小分区数量 // 这里设置了2个分区,output实际输出了三个分区 // 分区规则:具体的分区个数需要通过公式计算 // 首先获取文件的总长度:totalSize = 7Byte 文件总字节数包含 回车换行等字符 // 计算平均长度:7 / 2 = 3Byte goalSize = totalSize / numSplits // 获取块大小 128M // minSize = 1Byte // 计算切分大小 splitSize = Math.max(minSize, Math.min(goalSize, blockSize)); // splitSize = 3byte Math.max(1Byte, Math.min(3Byte, 128M)); // 判断剩余的totalSize大小, 是否大于 splitSize的1.1倍 大于 切片, 小于不切 // 7 / 3 = 2... 1(1.1) + 1 = 3(分区) // 如果数据源为多个文件,那么计算分区时以文件为单位进行分区 JavaRDD<String> javaRDD = sc.textFile("input/1.txt", 2); // 数据分区规则: // 1、数据以行为单位进行读取 // spark读取文件,采用的是Hadoop的方式一行一行读取,和字节数没有关系 // 2、数据读取时以偏移量为单位,偏移量不会被重复读取 // (文件数据)1@@ => (偏移量) 012 // (文件数据)2@@ => (偏移量) 345 // (文件数据)3 => (偏移量) 6 // 3、数据分区的偏移量范围的计算,和分区的计算规则一致,分区0和分区1偏移量长度是3是因为分区平均长度是3Byte,分区2偏移量长度是1,是因为计算分区最后余数是1Byte // 分区0 => 偏移量[0,3] ,这里的偏移量3包含3 => 存储数据1,2 // 分区1 => 偏移量[3,6], 这里的偏移量6包含6 => 存储数据3 // 分区2 => 偏移量[6,7] => 没有数据存储 // 4、如果切分的位置位于一行的中间 会在当前分区读完一整行数据 javaRDD.saveAsTextFile("output"); // 4 关闭资源 sc.stop(); } }
3.分区器介绍
什么时候会用到分区器?
Shuffle的算子才会用到分区器,给我们的数据进行重分区
注意:
1、分区器是pairRDD专属的,并且只有Key-Value类型的pairRDD才有分区器,非Key-Value类型的RDD分区的值是None
2、Java RDD没有分区器,他只是有一些分区和分区的规则而已
3、每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
3.1.Hash分区器原理
Hash 分区原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。
Hash 分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据,从而造成数据倾斜。
3.2.Range分区器原理
RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。
实现过程为:
第一步:先从整个RDD中采用水塘抽样算法,抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;
第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的
1)我们假设有100万条数据要分4个区
2)从100万条中抽100个数(1,2,3, ….. 100)
3)对100个数进行排序,然后均匀的分为4段
4)获取100万条数据,每个值与4个分区的范围比较,放入合适分区
Range 分区弊端:因为每一个数据集数据都不一样,也可能导致出现数据倾斜问题
Range 分区器在实际工作中基本没有使用场景
3.3.自定义分区器实践
无论是Hash分区器还是Range分区器都没有办法从根源上解决数据倾斜的问题。要是想从根本上解决数据倾斜问题,一般需要使用自定义分区器来根据数据特征自定义数据分区规则。
如下案例是一个通过自定义分区器解决数据倾斜问题的场景
RDD 代码:
public class Test_CustomPartitioner { public static void main(String[] args) { // 1 创建SparkConf SparkConf sparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[2]"); // 2 创建SparkContext JavaSparkContext sc = new JavaSparkContext(sparkConf); // 3 编写代码 // 手动模拟的数据倾斜数据集 ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<>(); list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1)); list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1)); list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1)); list.add(new Tuple2<>(3, 5)); list.add(new Tuple2<>(5, 5)); list.add(new Tuple2<>(4, 5)); list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2)); list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2)); list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2)); // 创建一个RDD,指定两个分区 JavaPairRDD<Integer, Integer> pairRDD = sc.parallelizePairs(list, 2); // 对数据进行排序,1和2两个大key进入一个分区,这里就出现了数据倾斜 JavaPairRDD<Integer, Integer> sortByKey = pairRDD.sortByKey(); // 使用自定义分区器 JavaPairRDD<Integer, Integer> partitionBy = sortByKey.partitionBy(new CustomPartitioner(2)); // 查看自定义分区器后的结果 JavaRDD<String> partitionsWithIndex = partitionBy.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer, Integer>>, Iterator<String>>() { @Override public Iterator<String> call(Integer v1, Iterator<Tuple2<Integer, Integer>> v2) throws Exception { ArrayList<String> result = new ArrayList<>(); while (v2.hasNext()) { Tuple2<Integer, Integer> next = v2.next(); result.add(v1 + "----" + next._1); } return result.iterator(); } }, true); partitionsWithIndex.collect().forEach(System.out::println); // 4 关闭资源 sc.stop(); } }
自定义分区器代码:
// 自定义分区器需要继承Partitioner类 public class CustomPartitioner extends Partitioner { private int numPartitions; public CustomPartitioner() { } // 带参构造器,自定义分区数量 public CustomPartitioner(int numPartitions) { this.numPartitions = numPartitions; } @Override public int numPartitions() { return numPartitions; } //return 的结果就是分区号 @Override public int getPartition(Object key) { // Object类型转换成int类型,这里传递进来是什么类型,就转换成什么类型 int newKey = (int) key; // 判断key等于1放到0号分区,等于2放到1号分区,其他数据根据传入的分区数量取模决定 if (newKey == 1) { return 0; } else if (newKey == 2) { return 1; } else { return newKey % numPartitions(); } } public int getNumPartitions() { return numPartitions; } public void setNumPartitions(int numPartitions) { this.numPartitions = numPartitions; } }
转载请注明:西门飞冰的博客 » Spark RDD 分区规则和分区器