spark on yarn
Yarn-Cluster
Spark-Standalone
- spark-submit执行脚本在client模式下,脚本通过反射调用程序的业务逻辑
- sparksubmit将任务的信息发送给master
- master向worker通信,需要的资源信息,application,driver信息发送给worker
- worker启动executor
- executor跟driver反向注册
- driver创建RDD Action算子会调用runjob触发执行
- 根据最后一个RDD往前推,根据依赖关系stage,RDD递归类似于类似于栈的结构,递归的出口是没有父RDD
- 先提交前面的stage,再提交后面的stage,一个stage对应一个stageset,一个stageset有多个task(shuffleMaptask Resulttask),然后把taskset传递给tasksecheduler
- asksecheduler将taskset中的task进行序列化,然后根据executor的资源情况,将序列化的task发送给executor
- 将taskdesccription反序列化用taskrunner包装放在线程池中
- 调用task的run方法,传入到taskcontext中,然后根据具体的task类型,如果是shufflemaptask,就调用其runtask
- 将数据先应用分区器返回ID,然后写入到APPendonlymap的内存中,默认达到5M溢写到磁盘,生成两个文件一个索引文件和数据文件
- mappartitionRDD向shuffledRDD要数据,shuffleRDD获取shuffledReader,从上游拉取属于自己分区的数据,然后进行全局的聚合,最后将聚合的结果写入到hdfs中
Spark通信分析
内存管理
作为一个JVM进程,Executor的内存管理建立在JVM的内存管理之上,
Spark对JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存.
同时,Spark引入了堆外(Off-heap)内存,
使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用.
堆内内存(On-heap Memory)
堆内内存的大小,由Spark应用程序启动时的–executor-memory或spark.executor.memory参数配置.
Executor内运行的并发任务共享JVM堆内内存,这些任务在缓存RDD和广播(Broadcast)
数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行Shuffle时占用的内存
被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些Spark内部的对象实例,
或者用户定义的Spark应用程序中的对象实例,均占用剩余的空间.
不同的管理模式下,这三部分占用的空间大小各不相同
堆内内存分析
我们知道,堆内内存采用JVM来进行管理.而JVM的对象可以以序列化的方式存储,
序列化的过程是将对象转换为二进制字节流,本质上可以理解为将非连续空间的链式存储转
化为连续空间或块存储,在访问时则需要进行序列化的逆过程——反序列化,
将字节流转化为对象,序列化的方式可以节省存储空间,但增加了存储和读取时候的计算开销.
对于Spark中序列化的对象,由于是字节流的形式,其占用的内存大小可直接计算.
对于Spark中非序列化的对象,其占用的内存是通过周期性地采样近似估算而得,
即并不是每次新增的数据项都会计算一次占用的内存大小.但是这种方法
降低了时间开销但是有可能误差较大,导致某一时刻的实际内存有可能远远超出预期
此外,在被Spark标记为释放的对象实例,很有可能在实际上并没有被JVM回收,
导致实际可用的内存小于Spark记录的可用内存.所以Spark并不能准确记录实际
可用的堆内内存,从而也就无法完全避免内存溢出(OOM, Out of Memory)的异常.
虽然不能精准控制堆内内存的申请和释放,但Spark通过对存储内存和执行内存各
自独立的规划管理,可以决定是否要在存储内存里缓存新的RDD,以及是否为新的
任务分配执行内存,在一定程度上可以提升内存的利用率,减少异常的出现.
堆外内存(Off-heap Memory)
默认情况下,Spark 仅仅使用了堆内内存.Executor端的堆内内存区域大致可以分为以下四大块:
Execution内存 主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据
Storage内存 主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据
用户内存(User Memory) 主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息
预留内存(Reserved Memory) 系统预留内存,会用来存储Spark内部对象
Spark 1.6 开始引入了Off-heap memory.这种模式不在JVM 内申请内存,
而是调用 Java 的 unsafe 相关 API 进行诸如 C 语言里面的malloc() 直接向操作系统申请内存.
由于这种方式不经过 JVM 内存管理,所以可以避免频繁的 GC,这种内存申请的缺点是必须自己编写内存申请和释放的逻辑.
利用JDK Unsafe API(从Spark 2.0开始,在管理堆外的存储内存时不再基于
Tachyon,而是与堆外的执行内存一样,基于JDK Unsafe API实现),
Spark可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的GC扫描和回收,
提升了处理性能.堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间
可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差.
动态占用机制
程序提交的时候我们都会设定基本的Execution内存和
Storage内存区域(通过 spark.memory.storageFraction 参数设置);
在程序运行时,双方的空间都不足时,则存储到硬盘;将内存中的块存储到磁盘的策略是按照 LRU
规则(Least Recently Used)进行的.若己方空间不足而对方空余时,
可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
Execution内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间
Storage内存的空间被对方占用后,目前的实现是无法让对方”归还”,
因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂;而且
Shuffle 过程产生的文件在后面一定会被使用到,而 Cache 在内存的数据不一定在后面使用.
注意,上面说的借用对方的内存需要借用方和被借用方的内存类型都一样,
都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间.
shuffle阶段分析
HashShuffle普通机制:
每一个map task将不同结果写到不同的buffer中,每个buffer的大小为32K。buffer起到数据缓存的作用
每个buffer文件最后对应一个磁盘小文件
reduce task来拉取对应的磁盘小文件
map task的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去.ReduceTask会去Map端拉取相应的磁盘小文件.
产生的磁盘小文件的个数: M(map task的个数)* R(reducetask的个数)
合并机制
产生磁盘小文件的个数: C(core的个数)*R(reduce的个数)
SortShuffle普通机制
没有开启mapsideconbine && 分区数小于 spark.shuffle.sort.bypassMergeThreshold的参数值.这个值默认是200 bypass 例如
groupbykey或者groupby并且分区分区数小于200
判断Serializer是否relacation,没有聚合器 && 分区数小于2^24 unsafeshuffle 例如repartitionandsortwithinpartition
否则 sortshuffle
每个maptask将计算结果写入内存数据结构中,这个内存默认大小为5M
会有一个“监控器”来不定时的检查这个内存的大小,如果写满了5M,比如达到了5.01M,那么再给这个内存申请5.02M(5.01M * 2 – 5M
= 5.02)的内存,此时这个内存空间的总大小为10.02M
当“定时器”再次发现数据已经写满了,大小10.05M,会再次给它申请内存,大小为 10.05M * 2 – 10.02M = 10.08M
假如此时总的内存只剩下5M,不足以再给这个内存分配10.08M,那么这个内存会被锁起来,把里面的数据按照相同的key为一组,进行排
序后,分别写到不同的缓存中,然后溢写到不同的小文件中,而map task产生的新的计算结果会写入总内存剩余的5M中
buffer中的数据(已经排好序)溢写的时候,会分批溢写,默认一次溢写10000条数据,假如最后一部分数据不足10000条,那么剩下多少
条就一次性溢写多少条
每个map task产生的小文件,最终合并成一个大文件来让reduce拉取数据,合成大文件的同时也会生成这个大文件的索引文件,里面记录
着分区信息和偏移量(比如:key为hello的数据在第5个字节到第8097个字节)
最终产生的小文件数为2*m(map task的数量)
BypassMergeSortShuffle
BypassMergeSortShuffleWriter 所有的中间数据都是在磁盘里,并没有利用内存.而且它只保证分区索引的排序,而并不保证数据的排序
IndexShuffleBlockResolver类负责创建索引文件,存储到ShuffleIndexBlock数据块中.它提供了writeIndexFileAndCommit方法创建索引.
因为创建索引文件,有线程竞争.所以它会先建立临时索引文件,然后再去检查索引文件是否已经存在,并且与临时索引文件是否相同.如果一
致,则删除临时索引文件.如果不一致,则会更新索引文件
BypassMergeSortShuffleWriter 算法适用于没有聚合,数据量不大的场景.给每个分区分配一个临时文件,对每个 record 的 key 使用分区器
(模式是hash,如果用户自定义就使用自定义的分区器)找到对应分区的输出文件并写入文件对应的文件
Spark-submit分析
1 | // CoarseGrainedExecutorBackend类 |
1 | override def receive: PartialFunction[Any, Unit] = { |
1 | //executor的本质 |
1 | case LaunchTask(data) => |
1 | //executor重新把task给包装为taskrunner放到线程池中 |
1 | //TaskRunner 调用run方法 |
1 | //shufflemaptask |
Spark Shuffle 内存使用
Spark Shuffle OOM 可能性分析
1 | 1. 首先需要注意 Executor 端的任务并发度,多个同时运行的 Task 会共享 Executor 端的内存,使得单个 Task 可使用的内存减少 |
Sort 采样分析
HashPartitioner
1 | def getPartition(key: Any): Int = key match { |
RangePartitioner
- 先从迭代器的前k个元素,存储到蓄水池
- 依次遍历余下的元素,比如遍历第m个元素,然后生成[0, m)区间的随机数 i.如果 i 小于 k,则替换掉原来的第 i 个元素
1 | def reservoirSampleAndCount[T: ClassTag]( |
1 | //对每个分区进行采样 |
1 | //分区边界问题 |
默认分区器
当触发shuffle,但没有指定partitioner.spark会自动生成默认的分区器
首先去寻找父类rdd(注意不是所有祖先的rdd,而仅仅是上一级的rdd)的partitioner,则返回其中的最大partitioner (按照分区数量排序)
如果父类rdd没有指定partitioner,但是spark.default.parallelism有在配置中指定,则使用该数值,创建HashPartitioner
否则,就找到父类rdd的最大分区数目,使用该数值,创建HashPartitioner
1 | def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { |
Sample采样
1 | -- 参数withReplacement |