当前位置: 移动技术网 > IT编程>数据库>其他数据库 > spark RDD,reduceByKey vs groupByKey

spark RDD,reduceByKey vs groupByKey

2018年10月30日  | 移动技术网IT编程  | 我要评论

spark 中有两个类似的api,分别是 reducebykey 和 groupbykey 。这两个的功能类似,但底层实现却有些不同,那么为什么要这样设计呢?我们来从源码的角度分析一下。

先看两者的调用顺序(都是使用默认的partitioner,即defaultpartitioner)

所用 spark 版本:spark 2.1.0

先看reducebykey

step1

  def reducebykey(func: (v, v) => v): rdd[(k, v)] = self.withscope {
    reducebykey(defaultpartitioner(self), func)
  }

setp2

  def reducebykey(partitioner: partitioner, func: (v, v) => v): rdd[(k, v)] = self.withscope {
    combinebykeywithclasstag[v]((v: v) => v, func, func, partitioner)
  }

setp3

def combinebykeywithclasstag[c](
      createcombiner: v => c,
      mergevalue: (c, v) => c,
      mergecombiners: (c, c) => c,
      partitioner: partitioner,
      mapsidecombine: boolean = true,
      serializer: serializer = null)(implicit ct: classtag[c]): rdd[(k, c)] = self.withscope {
    require(mergecombiners != null, "mergecombiners must be defined") // required as of spark 0.9.0
    if (keyclass.isarray) {
      if (mapsidecombine) {
        throw new sparkexception("cannot use map-side combining with array keys.")
      }
      if (partitioner.isinstanceof[hashpartitioner]) {
        throw new sparkexception("hashpartitioner cannot partition array keys.")
      }
    }
    val aggregator = new aggregator[k, v, c](
      self.context.clean(createcombiner),
      self.context.clean(mergevalue),
      self.context.clean(mergecombiners))
    if (self.partitioner == some(partitioner)) {
      self.mappartitions(iter => {
        val context = taskcontext.get()
        new interruptibleiterator(context, aggregator.combinevaluesbykey(iter, context))
      }, preservespartitioning = true)
    } else {
      new shuffledrdd[k, v, c](self, partitioner)
        .setserializer(serializer)
        .setaggregator(aggregator)
        .setmapsidecombine(mapsidecombine)
    }
  }

姑且不去看方法里面的细节,我们会只要知道最后调用的是 combinebykeywithclasstag 这个方法。这个方法有两个参数我们来重点看一下,

def combinebykeywithclasstag[c](
      createcombiner: v => c,
      mergevalue: (c, v) => c,
      mergecombiners: (c, c) => c,
      partitioner: partitioner,
      mapsidecombine: boolean = true,
      serializer: serializer = null)

首先是 partitioner 参数 ,这个即是 rdd 的分区设置。除了默认的 defaultpartitioner,spark 还提供了 rangepartitioner 和 hashpartitioner 外,此外用户也可以自定义 partitioner 。通过源码可以发现如果是 hashpartitioner 的话,那么是会抛出一个错误的。

然后是 mapsidecombine 参数 ,这个参数正是 reducebykey 和 groupbykey 最大不同的地方,它决定是是否会先在节点上进行一次 combine 操作,下面会有更具体的例子来介绍。

然后是groupbykey

step1

  def groupbykey(): rdd[(k, iterable[v])] = self.withscope {
    groupbykey(defaultpartitioner(self))
  }

step2

  def groupbykey(partitioner: partitioner): rdd[(k, iterable[v])] = self.withscope {
    // groupbykey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createcombiner = (v: v) => compactbuffer(v)
    val mergevalue = (buf: compactbuffer[v], v: v) => buf += v
    val mergecombiners = (c1: compactbuffer[v], c2: compactbuffer[v]) => c1 ++= c2
    val bufs = combinebykeywithclasstag[compactbuffer[v]](
      createcombiner, mergevalue, mergecombiners, partitioner, mapsidecombine = false)
    bufs.asinstanceof[rdd[(k, iterable[v])]]
  }

setp3

def combinebykeywithclasstag[c](
      createcombiner: v => c,
      mergevalue: (c, v) => c,
      mergecombiners: (c, c) => c,
      partitioner: partitioner,
      mapsidecombine: boolean = true,
      serializer: serializer = null)(implicit ct: classtag[c]): rdd[(k, c)] = self.withscope {
    require(mergecombiners != null, "mergecombiners must be defined") // required as of spark 0.9.0
    if (keyclass.isarray) {
      if (mapsidecombine) {
        throw new sparkexception("cannot use map-side combining with array keys.")
      }
      if (partitioner.isinstanceof[hashpartitioner]) {
        throw new sparkexception("hashpartitioner cannot partition array keys.")
      }
    }
    val aggregator = new aggregator[k, v, c](
      self.context.clean(createcombiner),
      self.context.clean(mergevalue),
      self.context.clean(mergecombiners))
    if (self.partitioner == some(partitioner)) {
      self.mappartitions(iter => {
        val context = taskcontext.get()
        new interruptibleiterator(context, aggregator.combinevaluesbykey(iter, context))
      }, preservespartitioning = true)
    } else {
      new shuffledrdd[k, v, c](self, partitioner)
        .setserializer(serializer)
        .setaggregator(aggregator)
        .setmapsidecombine(mapsidecombine)
    }
  }

结合上面 reducebykey 的调用链,可以发现最终其实都是调用 combinebykeywithclasstag 这个方法的,但调用的参数不同。
reducebykey的调用

combinebykeywithclasstag[v]((v: v) => v, func, func, partitioner)

groupbykey的调用

combinebykeywithclasstag[compactbuffer[v]](
      createcombiner, mergevalue, mergecombiners, partitioner, mapsidecombine = false)

正是两者不同的调用方式导致了两个方法的差别,我们分别来看

  • reducebykey的泛型参数直接是[v],而groupbykey的泛型参数是[compactbuffer[v]]。这直接导致了 reducebykey 和 groupbykey 的返回值不同,前者是rdd[(k, v)],而后者是rdd[(k, iterable[v])]

  • 然后就是mapsidecombine = false 了,这个mapsidecombine 参数的默认是true的。这个值有什么用呢,上面也说了,这个参数的作用是控制要不要在map端进行初步合并(combine)。可以看看下面具体的例子。

从功能上来说,可以发现 reducebykey 其实就是会在每个节点先进行一次合并的操作,而 groupbykey 没有。

这么来看 reducebykey 的性能会比 groupbykey 好很多,因为有些工作在节点已经处理了。那么 groupbykey 为什么存在,它的应用场景是什么呢?我也不清楚,如果观看这篇文章的读者知道的话不妨在评论里说出来吧。非常感谢!

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网