当前位置: 首页 > news >正文

找人做网站要准备什么软件seo外包公司如何优化

找人做网站要准备什么软件,seo外包公司如何优化,wordpress 博客群,dw网页制作教程自我介绍代码背景 本文基于Spark 3.5.0 目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,…

背景

本文基于Spark 3.5.0
目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,这其中的主要原理是在于三个规则:OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions,OptimizeShuffleWithLocalRead,这里主要说一下OptimizeSkewInRebalancePartitions规则,CoalesceShufflePartitions的作用主要是进行文件的合并,是得文件不会太小,OptimizeShuffleWithLocalRead的作用是加速shuffle fetch的速度。

结论

OptimizeSkewInRebalancePartitions的作用是对小文件进行拆分,使得罗盘的文件不会太大,这个会有个问题,如果我们在使用Rebalance(col)这种情况的时候,如果col的值是固定的,比如说值永远是20240320,那么这里就得注意一下,关于OptimizeSkewInRebalancePartitions涉及到的参数spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 这些值配置,如果这些配置调整的不合适,就会导致写文件的时候有可能只有一个Task在运行,那么最终就只有一个文件。而且大大加长了整个任务的运行时间。

分析

直接到OptimizeSkewInRebalancePartitions中的代码中来:

  override def apply(plan: SparkPlan): SparkPlan = {if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {return plan}plan transformUp {case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>tryOptimizeSkewedPartitions(stage)}}

如果我们禁用掉对rebalance的倾斜处理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled为false(默认是true),那么就不会应用此规则,那么如果Col为固定值的情况下,就只会有一个Task进行文件的写入操作,也就只有一个文件,因为一个Task会拉取所有的Map的数据(因为此时每个maptask上的hash(Col)都是一样的,此时只有一个reduce task去拉取数据),如图:

在这里插入图片描述
假如说hash(col)为0,那实际上只有reduceTask0有数据,其他的ReduceTask1等等都是没有数据的,所以最终只有ReduceTask0写文件,并且只有一个文件。

在看合并的计算公式,该数据流如下:

 tryOptimizeSkewedPartitions||\/optimizeSkewedPartitions||\/ShufflePartitionsUtil.createSkewPartitionSpecs||\/ShufflePartitionsUtil.splitSizeListByTargetSize

splitSizeListByTargetSize方法中涉及到的参数解释如下 :

  • 参数 sizes: Array[Long] 表示属于同一个reduce任务的maptask任务的大小数组,举例 sizes = [100,200,300,400]
    表明该任务有4个maptask,0表示maptask为0的所属reduce的大小,1表示maptask为1的所属reduce的大小,依次类推,图解如下:

在这里插入图片描述
比如说reduceTask0的从Maptask拉取的数据的大小分别是100,200,300,400.

  • 参数targetSize 为 spark.sql.adaptive.advisoryPartitionSizeInBytes的值,假如说是256MB
  • 参数smallPartitionFactor为spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 的值,默认是0.2
    这里有个计算公式:
    def tryMergePartitions() = {// When we are going to start a new partition, it's possible that the current partition or// the previous partition is very small and it's better to merge the current partition into// the previous partition.val shouldMergePartitions = lastPartitionSize > -1 &&((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||(currentPartitionSize < targetSize * smallPartitionFactor ||lastPartitionSize < targetSize * smallPartitionFactor))if (shouldMergePartitions) {// We decide to merge the current partition into the previous one, so the start index of// the current partition should be removed.partitionStartIndices.remove(partitionStartIndices.length - 1)lastPartitionSize += currentPartitionSize} else {lastPartitionSize = currentPartitionSize}}。。。while (i < sizes.length) {// If including the next size in the current partition exceeds the target size, package the// current partition and start a new partition.if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {tryMergePartitions()partitionStartIndices += icurrentPartitionSize = sizes(i)} else {currentPartitionSize += sizes(i)}i += 1}tryMergePartitions()partitionStartIndices.toArray

这里的计算公式大致就是:从每个maptask中的获取到属于同一个reduce的数值,依次累加,如果大于targetSize就尝试合并,直至到最后一个maptask
可以看到tryMergePartitions有个计算公式:currentPartitionSize < targetSize * smallPartitionFactor,也就是说如果当前maptask的对应的reduce分区数据 小于 256MB*0.2 = 51.2MB 的话,也还是会合并到前一个分区中去,如果smallPartitionFactor设置过大,可能会导致所有的分区都会合并到一个分区中去,最终会导致一个文件会有几十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如说以下的测试案例:

    val targetSize = 100val smallPartitionFactor2 = 0.5// merge last two partition if their size is not bigger than smallPartitionFactor * targetval sizeList5 = Array[Long](50, 50, 40, 5)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList5, targetSize, smallPartitionFactor2).toSeq ==Seq(0))val sizeList6 = Array[Long](40, 5, 50, 45)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList6, targetSize, smallPartitionFactor2).toSeq ==Seq(0))

这种情况下,就会只有一个reduce任务运行。

http://www.fp688.cn/news/159607.html

相关文章:

  • 上海物流网站怎么建设谷歌排名规则
  • 移动端手机网站建设佛山网站营销推广
  • 系统开发需求文档seo优化技巧有哪些
  • 深圳做网站的公司网站运营维护的基本工作
  • 教育行业网站建设价格中国新闻最新消息今天
  • 创世网站网络建设淘宝指数查询
  • 太原企业建站模板广告软文营销平台
  • 用自己网站做邮箱域名2023年8月份新冠
  • php网站开发实例教程第七章h5制作网站
  • 网站单个页面做301灰色seo关键词排名
  • 遵义服务好的网站建设公司网站营销方案
  • 武进网站建设服务性能优化工具
  • 农场游戏系统开发网站建设推广百度引流推广怎么收费
  • 府网站建设运维情况自查报告网络营销的特点和优势
  • 做网站 指导seo查询工具
  • 网站开发代码交接文档书阿里巴巴国际站关键词推广
  • 做任务赚钱的网站四川省人民政府
  • 调查网站赚钱优化大师是什么意思
  • 某班级网站建设方案论文seo独立站
  • php源码下载网站网络营销推广的手段
  • 网站文字不能复制怎么做网优工程师前景和待遇
  • 网站开发经验教训科学新概念seo外链平台
  • 青海手机网站建设互动营销用在哪些推广上面
  • 西安专业网站建设公司中国职业培训在线官方网站
  • 自适应网页如何设计seo的主要内容
  • 龙华网站开发公司无锡网站建设公司
  • 青岛市城乡建设委员会网站电话轻松seo优化排名
  • 如何管理建好的网站网络营销方法有几种类型
  • 旅游网站前端模板做公司网站
  • 自我介绍网站html外贸网站建设推广