网站建设基本流程前期互联网100个创业项目
源头RDD有自己的分区计算逻辑,一般没有分区器,并行度是根据分区算法自动计算的,RDD的compute函数中记录了数据如何而来,如何分区的
hadoopRDD,根据XxxinputFormat.getInputSplits()来决定,比如默认的TextInputFormat将文件按照0-128M进行切割,剩余部分是否小于128M的1.1倍
JdbcRDD,需要指定一个数字类型的字段,而且指定上界,下届,然后指定并行度进行分割
大部分源头RDD都有传入并行度的参数
窄依赖RDD,没有自己的分区器,默认集成它的上级RDD的并行度
有个特殊的窄依赖算子,coalesce(2,true),他需要指定并行度
宽依赖 RDD(ShuffledRDD),这种RDD是通过进行shuffle的算子得到的,shuffle算子必须要一个分区器,要么传,要么走默认分区器
shuffle算子指定并行度,groupBy(f(),4) //分区器为:hashpartitioner,并行度为4
有个特殊的shuffle算子,默认分区器不是hashpartitioner。那就是sortBy(),他是RangePartitiner,但是sortBy其实用的是sortByKey然后取value 最终.values把分区器搞丢了
shuffle算子只当分区函数,groupBy(f(),RangePartitioner(4)) /分区器为:RangePartitioner,并行度为4
shuffle算子不传并行度,也不传分区器
按照 Partitioner.defaultPartitioner( )获取分区器,
默认并行度为:spark.default.parallelism值,如果没设置这个参数,则取上游Rdd中并行度的最大值
最大分区器:上游所有有分区器的rdd中,并行度最大的rdd的分区器
上游rdd有分区器
如果上游所有rdd中的最大并行度/最大分区器所在rdd的并行度<10 或者 默认并行度小于最大分区器所在rdd的并行度,那么直接用最大分区器
上游rdd没有分区器
默认以hashPatitioner作为分区器,并行度为默认并行度
按照backend.defaultParallelism()获取默认并行度
本地模式:getInt("spark.default.parallelism", totalCores),根据spark.default.parallelism参数,如果没配置就是机器的总逻辑核数,setMaster(local[*]) *代表全部逻辑核
yarn:getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)),根据spark.default.parallelism参数,如果没配置就是yarn的executor的总逻辑核数,最小也得两个并行度