当前位置: 移动技术网 > IT编程>数据库>其他数据库 > spark计算模型RDD

spark计算模型RDD

2020年03月23日  | 移动技术网IT编程  | 我要评论

rdd介绍

1.rdd概念以及特性

rdd(resilient distributed dataset)叫做弹性分布式数据集,是spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。rdd具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。rdd允许用户在执行多个查询时显式地将数据缓存在内存中,后续的查询能够重用这些数据,这极大地提升了查询速度。(a resilient distributed dataset)弹性分布式数据集合。并且是spark最基本的编程抽象,而且rdd是只读、可分区的、可以进行并行计算的一个对象。

  • 数据集:一个数据集合,用于存放数据的。rdd是一个数据容器,用来组织管理数据的。跟array和list类似,并且都能够进行map、flatmap、filter等等

  • 分布式:rdd中的数据是分布式存储的,可用于分布式计算。rdd的数据是分布存储的,也就是spark集群中每个节点上只存储了rdd的部分数据。计算同样也是分布式并行计算的

  • 弹性:

    • 存储的弹性:rdd的数据可以在内存和磁盘之间进行自由切换

    • 可靠性的弹性:rdd的在丢失数据的时候能够自动恢复。rdd在计算过程中会出现失败的情况,失败以后会进行一定次数的重试(4次)

    • 并行度的弹性:rdd的数据分区可以改变,进而增加并行计算的粒度

  • rdd其他特点:

    • rdd的数据是只读,每次操作都会产生新的rdd。安全。

    • rdd中数据可以缓存在内存、磁盘、hdfs之上

1.1rdd弹性

1) 自动进行内存和磁盘数据存储的切换

​ spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换

2) 基于血统的高效容错机制

​ 在rdd进行转换和动作的时候,会形成rdd的lineage依赖链,当某一个rdd失效的时候,可以通过重新计算上游的rdd来重新生成丢失的rdd数据。

3) task如果失败会自动进行特定次数的重试

​ rdd的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。

4) stage如果失败会自动进行特定次数的重试

​ 如果job的某个stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4次。

5) checkpoint和persist可主动或被动触发

​ rdd可以通过persist持久化将rdd缓存到内存或者磁盘,当再次用到该rdd时直接读取就行。也可以将rdd进行检查点,检查点会将数据存储在hdfs中,该rdd的所有父rdd依赖都会被移除。

6) 数据调度弹性

​ spark把这个job执行模型抽象为通用的有向无环图dag,可以将多stage的任务串联或并行执行,调度引擎自动处理stage的失败以及task的失败。

7) 数据分片的高度弹性

​ 可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。

​ rdd是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他rdd转换而创建,为此,rdd支持丰富的转换操作(如map, join, filter, groupby等),通过这种转换操作,新的rdd则包含了如何从其他rdds衍生所必需的信息,所以说rdds之间是有依赖关系的。基于rdds之间的依赖,rdds会形成一个有向无环图dag,该dag描述了整个流式计算的流程,实际执行的时候,rdd是通过血缘关系(lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区,总结起来,基于rdd的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的dag,然后写回稳定存储(hdfs或磁盘)。另外rdd还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。可以说spark最初也就是实现rdd的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,spark-rdd的关系类似于hadoop-mapreduce关系。

1.2rdd的五大属性

1) 一组分片(partition),即数据集的基本组成单位。对于rdd来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。

如果文件的block个数 <=2 那么 sc.textfile(“file:///wordcount.txt”)分区个数为2

如果文件的block块个数 >2 那么 sc.textfile(“file:///wordcount.txt”)分区的个数等于block块的个数

2) 一个计算每个分区的函数。spark中rdd的计算是以分片为单位的,每个rdd都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。rdd的每一个算子操作比如map 都会通过compute方法作用在每个分区之上

3) rdd之间的依赖关系。rdd的每次转换都会生成一个新的rdd,所以rdd之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对rdd的所有分区进行重新计算。每一个rdd都有其依赖列表rdd的依赖关系 都是存在一个序列集合中,作用:容错 以及构建起血统机制

4) 一个partitioner,即rdd的分片函数。当前spark中实现了两种类型的分片函数,一个是基于哈希的hashpartitioner,另外一个是基于范围的rangepartitioner。只有对于于key-value的rdd,才会有partitioner,非key-value的rdd的parititioner的值是none。partitioner函数不但决定了rdd本身的分片数量,也决定了parent rdd shuffle输出时的分片数量。

5) 一个列表,存取每个partition的优先位置(preferred location)。对于一个hdfs文件来说,这个列表保存的就是每个partition所在的块的位置。按照“移动数据不如移动计算”的理念,spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

 a list of preferred locations to compute each split on (e.g. block locations for
  *    an hdfs file)
  spark在读取hdfs文件的是,hdfs文件每一个block默认有多个备份,spark会获取每一个block块以及其备份的位置信息构建成列表,在进行计算的时候,spark会在位置列表中选取一个最佳位置进行任务分配。 移动数据不如移动计算的原则。
       移动数据不如移动计算的原则最高境界:数据在当前运行程序的进程之中
  rdd是如何确定优先位置?
      getpreferredlocations(split: partition): seq[string] 
  通过以上方法确定计算的最佳位置。
  rdd的数据本地化:
      5种

2.rdd的构建方式

3种构建方式

  • 根据以后数据集合构建rdd

    • val rdd1 = sc.parallelize(array(1,2,3,4,5,6,7,8))

    • val rdd1 = sc.parallelize(list(1,2,3,4,5,6,7,8))

  • 根据外部文件 可以是本地文件也可是hdfs上文件

    • sc.textfile(filepath)

  • 根据以后rdd创建新的rdd 需要经过算子操作

    • val newrdd=linerdd.flatmap(function)

3.rdd的算子操作

rdd的算子分为两类

  • 转换算子(transform算子)

    • 将一个rdd通过转换算子操作以后会构建新的rdd,比如map 、flatmap、reducebykey

    • 转换算子操作都是直接new新的rdd,此时rdd并没有进行真正的计算。转换算子只是对数据如何计算做了标记。转换算子都是懒加载。

  • 重要算子操作

    • mappartitions :作用于每个分区之上的

      • mappartitions 和map区别:

        • mappartitions 相当于partition批量操作

        • map作用于每一条数据

        • 重要区别:mappartitions 这个在大量task运行的时候可能会出现内存溢出的情况。小数据量的操作 mappartitions 要优于map操作

    • groupbykey算子和reducebykey算子的区别

      • 1.groupbykey 返回值:key->集合 reducebykey返回值: key-》值

      • 2.reducebykey操作会在本地进行初步merge操作,能够减少网络数据的传输

    • coalesce 减少分区数据的算子

      • 该算子可以进行shuffle也可以不进shuffle操作, coalesce(numpartitions: int, shuffle: boolean = false)

    • repartition 实际上是调用了 coalesce 算子 ,而且 repartition一定会进行shuffle操作,既可以增加也可以减少分区

  • action算子

    action算子内部都会有一个runjob方法进行提交一个job任务

  广播变量:

  • 广播变量需要数据传递

    • http协议:基于http协议将数据传递到executor。executor会driver端申请下载(已经被废弃)

    • torrent协议:默认的方式。 driver下载到executor上,然后executor会再次数据源,将数据传递到下一个需要数据executor之上。参考 (torrentbroadcast类)

4.rdd的依赖关系

rdd和它依赖的父rdd的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

  • rdd的依赖类型

    • 窄依赖:父rdd中一个partition最多被子rdd中的一个partition所依赖,这种依赖关系就是窄依赖

    • 窄依赖算子:map 、filter 、union 、flatmap等

    • 宽依赖:父rdd中一个partition被子rdd中的多个partition所依赖,这种依赖关系就是宽依赖

    • 宽依赖算子:groupbykey、reducebykey。凡是by基本上都是宽依赖

    一对一或者多对==一:窄依赖==

    一对多或者多对==多:宽依赖==

  • 宽窄依赖算子的判断依据是转换算子是否会产生shuffle操作,如果有shuffle操作则是宽依赖,否则是窄依赖

  • join既是宽依赖算子也是窄依赖算子 (在一个shuffle操作之后,在使用join的时候,此时join就是窄依赖)

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网