repartition 和 coalesce 算子

repartition 和 coalesce 都是Transformation算子,都可以实现RDD的重新分区功能。

1. coalesce

1.1 源码
  /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }

根据方法体上的介绍:
该方法返回一个新的RDD(numPartitions个分区)。

如果从1000个分区变为100个分区,这将会产生一个窄依赖,没有shuffle过程,相反,新产生的100个分区中的每个分区都会拥有当前1000个分区中的10个。

但是,如果coalesce过程很剧烈(例如:numPartitions=1),可能导致计算发生在比你期望更少的节点上(例如当numPartitions=1时只有一个节点)。为了避免这种情况,可以将shuffle设置为true,这将会加入一个shuffle过程,那么coalesce之前的操作仍然按照之前的分区数并行执行(不用管coalesce重新设置了多少分区)

如果设置shuffle=true,那么可以将RDD重分区为更大的分区数,这对于如下情形非常有用:分区数较少(例如100)且有一些分区特别大(数据倾斜),调用coalesce(1000, shuffle = true)可以将数据分布到1000个分区上。

1.2 例子
public class WordCount1 implements ISparkJob {

    @Override
    public void run(JavaSparkContext sparkContext) {
        JavaRDD<String> javaRDD = sparkContext.parallelize(
                Arrays.asList("aaa bbb", "bbb ccc", "aaa", "bbb", "ccc", "aaa bbb", "aaa", "bbb", "ccc", "ddd"),
                5);

        // partition:5->1  shuffle=false
        javaRDD.coalesce(1, false).collect();

        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // partition:5->1  shuffle=true
        javaRDD.coalesce(1, true).collect();

        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // partition:5->10  shuffle=false
        javaRDD.coalesce(10, false).collect();

        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // partition:5->10  shuffle=true
        javaRDD.coalesce(10, true).collect();

        try {
            TimeUnit.MINUTES.sleep(30);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

运行过程:

  • 共有四个Job

  • Job0(partition:5->1,shuffle=false)

Job0只有一个Stage(ResultStage),没有shuffle过程,且Task数为1。
虽然原始RDD的分区数为5,但是coalesce操作将分区数变为了1,整个Stage过程的Task数由末尾分区数决定,所以Task数为1。
注意:整个Stage过程的Task数由末尾分区数决定。


  • Job1(partition:5->1,shuffle=true)

Job1有两个Stage(ResultStage和ShuffleMapStage),由于有shuffle过程,所以在shuffle处切分Stage。
ShuffleMapStage(原始RDD -> Shuffle Write)的Task数为5,因为原始RDD的分区数为5,且ShuffleMapStage中分区数未改变。
ResultStage(Shuffle Read -> Job结束)的Task数为1,因为coalesce
操作将分区数变为了1。


  • Job2(partition:5->10,shuffle=false)

Job2有一个Stage(ResultStage),没有shuffle过程,虽然coalesce操作中设置numPartitions=10,但是并不能生效,所以Task数为5。
注意:当shuffle=false时,设置numPartitions大于原始RDD分区数,无法生效。


  • Job3(partition:5->10,shuffle=true)

Job3有两个Stage(ResultStage和ShuffleMapStage),有shuffle过程,在shuffle处切分Stage。
ShuffleMapStage(原始RDD -> Shuffle Write)的Task数为5,因为原始RDD分区数为5,且Stage过程中未修改分区数。
ResultStage(Shuffle Read -> Job结束)的Task数为10,因为coalesce操作将分区数变为了10。

2. repartition

  /**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

可见:repartition内部调用coalesce方法实现,且shuffle=true。

3. 关于stage的task数的测试

public class WordCount1 implements ISparkJob {

    @Override
    public void run(JavaSparkContext sparkContext) {
        JavaRDD<String> javaRDD = sparkContext.parallelize(
                Arrays.asList("aaa bbb", "bbb ccc", "aaa", "bbb", "ccc", "aaa bbb", "aaa", "bbb", "ccc", "ddd"),
                10);

        // shuffle=false
        javaRDD.coalesce(8, false)
                .coalesce(4, false)
                .coalesce(2, false)
                .coalesce(1, false)
                .collect();

        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // shuffle=true
        javaRDD.coalesce(8, true)
                .coalesce(4, true)
                .coalesce(2, true)
                .coalesce(1, true)
                .collect();

        try {
            TimeUnit.MINUTES.sleep(30);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 共有两个Job

  • Job0(Shuffle=false)

Job0没有Shuffle,只有一个Stage,虽然Stage过程中多次重分区,但是Stage末尾将分区数设为了1,所以整个Stage的Task数就是1。


  • Job1(Shuffle=true)

Job1有Shuffle,根据Shuffle划分为5个Stage:
Stage1(原始RDD -> Shuffle Write)有10个task,因为原始RDD分区数为10,且Stage1中未修改分区数;
Stage2(Shuffle Read -> Shuffle Write)有8个task,因为Stage1将分区数设为8,且Stage2中未修改分区数;
Stage3(Shuffle Read -> Shuffle Write)有4个task,因为Stage2将分区数设为4,且Stage3中未修改分区数;
Stage4(Shuffle Read -> Shuffle Write)有2个task,因为Stage3将分区数设为2,且Stage4中未修改分区数;
Stage5(Shuffle Read -> Job结束)有1个task,因为Stage4将分区数设为1,且Stage5中未修改分区数;

因此,Stage的Task数可以通过整个Stage末尾RDD的分区数来判定。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,270评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,489评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,630评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,906评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,928评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,718评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,442评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,345评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,802评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,984评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,117评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,810评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,462评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,011评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,139评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,377评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,060评论 2 355

推荐阅读更多精彩内容