Spark执行流程简单分析

spark on yarn

Yarn-Cluster

Spark-Standalone

  1. spark-submit执行脚本在client模式下,脚本通过反射调用程序的业务逻辑
  2. sparksubmit将任务的信息发送给master
  3. master向worker通信,需要的资源信息,application,driver信息发送给worker
  4. worker启动executor
  5. executor跟driver反向注册
  6. driver创建RDD Action算子会调用runjob触发执行
  7. 根据最后一个RDD往前推,根据依赖关系stage,RDD递归类似于类似于栈的结构,递归的出口是没有父RDD
  8. 先提交前面的stage,再提交后面的stage,一个stage对应一个stageset,一个stageset有多个task(shuffleMaptask Resulttask),然后把taskset传递给tasksecheduler
  9. asksecheduler将taskset中的task进行序列化,然后根据executor的资源情况,将序列化的task发送给executor
  10. 将taskdesccription反序列化用taskrunner包装放在线程池中
  11. 调用task的run方法,传入到taskcontext中,然后根据具体的task类型,如果是shufflemaptask,就调用其runtask
  12. 将数据先应用分区器返回ID,然后写入到APPendonlymap的内存中,默认达到5M溢写到磁盘,生成两个文件一个索引文件和数据文件
  13. mappartitionRDD向shuffledRDD要数据,shuffleRDD获取shuffledReader,从上游拉取属于自己分区的数据,然后进行全局的聚合,最后将聚合的结果写入到hdfs中

Spark通信分析

内存管理

作为一个JVM进程,Executor的内存管理建立在JVM的内存管理之上,

Spark对JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存.

同时,Spark引入了堆外(Off-heap)内存,

使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用.

堆内内存(On-heap Memory)

堆内内存的大小,由Spark应用程序启动时的–executor-memory或spark.executor.memory参数配置.

Executor内运行的并发任务共享JVM堆内内存,这些任务在缓存RDD和广播(Broadcast)

数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行Shuffle时占用的内存

被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些Spark内部的对象实例,

或者用户定义的Spark应用程序中的对象实例,均占用剩余的空间.

不同的管理模式下,这三部分占用的空间大小各不相同

堆内内存分析

我们知道,堆内内存采用JVM来进行管理.而JVM的对象可以以序列化的方式存储,

序列化的过程是将对象转换为二进制字节流,本质上可以理解为将非连续空间的链式存储转

化为连续空间或块存储,在访问时则需要进行序列化的逆过程——反序列化,

将字节流转化为对象,序列化的方式可以节省存储空间,但增加了存储和读取时候的计算开销.

对于Spark中序列化的对象,由于是字节流的形式,其占用的内存大小可直接计算.

对于Spark中非序列化的对象,其占用的内存是通过周期性地采样近似估算而得,

即并不是每次新增的数据项都会计算一次占用的内存大小.但是这种方法

降低了时间开销但是有可能误差较大,导致某一时刻的实际内存有可能远远超出预期

此外,在被Spark标记为释放的对象实例,很有可能在实际上并没有被JVM回收,

导致实际可用的内存小于Spark记录的可用内存.所以Spark并不能准确记录实际

可用的堆内内存,从而也就无法完全避免内存溢出(OOM, Out of Memory)的异常.

虽然不能精准控制堆内内存的申请和释放,但Spark通过对存储内存和执行内存各

自独立的规划管理,可以决定是否要在存储内存里缓存新的RDD,以及是否为新的

任务分配执行内存,在一定程度上可以提升内存的利用率,减少异常的出现.

堆外内存(Off-heap Memory)

默认情况下,Spark 仅仅使用了堆内内存.Executor端的堆内内存区域大致可以分为以下四大块:

Execution内存 主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据

Storage内存 主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据

用户内存(User Memory) 主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息

预留内存(Reserved Memory) 系统预留内存,会用来存储Spark内部对象

Spark 1.6 开始引入了Off-heap memory.这种模式不在JVM 内申请内存,

而是调用 Java 的 unsafe 相关 API 进行诸如 C 语言里面的malloc() 直接向操作系统申请内存.

由于这种方式不经过 JVM 内存管理,所以可以避免频繁的 GC,这种内存申请的缺点是必须自己编写内存申请和释放的逻辑.

利用JDK Unsafe API(从Spark 2.0开始,在管理堆外的存储内存时不再基于

Tachyon,而是与堆外的执行内存一样,基于JDK Unsafe API实现),

Spark可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的GC扫描和回收,

提升了处理性能.堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间

可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差.

动态占用机制

程序提交的时候我们都会设定基本的Execution内存和

Storage内存区域(通过 spark.memory.storageFraction 参数设置);

在程序运行时,双方的空间都不足时,则存储到硬盘;将内存中的块存储到磁盘的策略是按照 LRU

规则(Least Recently Used)进行的.若己方空间不足而对方空余时,

可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)

Execution内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间

Storage内存的空间被对方占用后,目前的实现是无法让对方”归还”,

因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂;而且

Shuffle 过程产生的文件在后面一定会被使用到,而 Cache 在内存的数据不一定在后面使用.

注意,上面说的借用对方的内存需要借用方和被借用方的内存类型都一样,

都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间.

shuffle阶段分析

HashShuffle普通机制:

  1. 每一个map task将不同结果写到不同的buffer中,每个buffer的大小为32K。buffer起到数据缓存的作用

  2. 每个buffer文件最后对应一个磁盘小文件

  3. reduce task来拉取对应的磁盘小文件

  4. map task的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去.ReduceTask会去Map端拉取相应的磁盘小文件.

  5. 产生的磁盘小文件的个数: M(map task的个数)* R(reducetask的个数)

合并机制

产生磁盘小文件的个数: C(core的个数)*R(reduce的个数)

SortShuffle普通机制

没有开启mapsideconbine && 分区数小于 spark.shuffle.sort.bypassMergeThreshold的参数值.这个值默认是200 bypass 例如

groupbykey或者groupby并且分区分区数小于200

判断Serializer是否relacation,没有聚合器 && 分区数小于2^24 unsafeshuffle 例如repartitionandsortwithinpartition

否则 sortshuffle

  1. 每个maptask将计算结果写入内存数据结构中,这个内存默认大小为5M

  2. 会有一个“监控器”来不定时的检查这个内存的大小,如果写满了5M,比如达到了5.01M,那么再给这个内存申请5.02M(5.01M * 2 – 5M

    = 5.02)的内存,此时这个内存空间的总大小为10.02M

  3. 当“定时器”再次发现数据已经写满了,大小10.05M,会再次给它申请内存,大小为 10.05M * 2 – 10.02M = 10.08M

  4. 假如此时总的内存只剩下5M,不足以再给这个内存分配10.08M,那么这个内存会被锁起来,把里面的数据按照相同的key为一组,进行排

    序后,分别写到不同的缓存中,然后溢写到不同的小文件中,而map task产生的新的计算结果会写入总内存剩余的5M中

  5. buffer中的数据(已经排好序)溢写的时候,会分批溢写,默认一次溢写10000条数据,假如最后一部分数据不足10000条,那么剩下多少

    条就一次性溢写多少条

  6. 每个map task产生的小文件,最终合并成一个大文件来让reduce拉取数据,合成大文件的同时也会生成这个大文件的索引文件,里面记录

    着分区信息和偏移量(比如:key为hello的数据在第5个字节到第8097个字节)

  7. 最终产生的小文件数为2*m(map task的数量)

BypassMergeSortShuffle

BypassMergeSortShuffleWriter 所有的中间数据都是在磁盘里,并没有利用内存.而且它只保证分区索引的排序,而并不保证数据的排序

IndexShuffleBlockResolver类负责创建索引文件,存储到ShuffleIndexBlock数据块中.它提供了writeIndexFileAndCommit方法创建索引.

因为创建索引文件,有线程竞争.所以它会先建立临时索引文件,然后再去检查索引文件是否已经存在,并且与临时索引文件是否相同.如果一

致,则删除临时索引文件.如果不一致,则会更新索引文件

BypassMergeSortShuffleWriter 算法适用于没有聚合,数据量不大的场景.给每个分区分配一个临时文件,对每个 record 的 key 使用分区器

(模式是hash,如果用户自定义就使用自定义的分区器)找到对应分区的输出文件并写入文件对应的文件

Spark-submit分析

1
2
3
4
5
6
7
8
9
10
11
// CoarseGrainedExecutorBackend类
driverConf.set(EXECUTOR_ID, arguments.executorId)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
//注册executor
env.rpcEnv.setupEndpoint("Executor",
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
arguments.workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
//启动executor
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
//向driver通信
driver.get.send(LaunchedExecutor(executorId))
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
//启动完成driver就开始创建rdd
1
2
3
4
5
6
7
8
9
//executor的本质
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Executor task launch worker-%d")
.setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused"))
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
1
2
3
4
5
6
7
8
9
10
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
//driver发送task到executor
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
executor.launchTask(this, taskDesc)
}
1
2
3
4
5
6
//executor重新把task给包装为taskrunner放到线程池中
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//TaskRunner 调用run方法
// 在task类里用runtask方法,调用方法
val value = Utils.tryWithSafeFinally {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem,
resources = taskDescription.resources)
threwException = false
res
}

//runtask
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L

val rdd = rddAndDep._1
val dep = rddAndDep._2
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
partitionId
} else context.taskAttemptId()
dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//shufflemaptask
//条件都不满足调用compute

writer.write(
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}

def compute(split: Partition, context: TaskContext): Iterator[T]
//mappartitionRDD调用了这个方法
//不断向上一个RDD要数据,直至数据源(hadoopRDD),这个RDD也有computer方法

Spark Shuffle 内存使用

Spark Shuffle OOM 可能性分析
1
2
3
4
1. 首先需要注意 Executor 端的任务并发度,多个同时运行的 Task 会共享 Executor 端的内存,使得单个 Task 可使用的内存减少
2. 无论是在 Map 还是在 Reduce 端,插入数据到内存,排序,归并都是比较都是比较占用内存的.因为有 Spill,理论上不会因为数据倾斜造成 OOM.但是,由于对堆内对象的分配和释放是由 JVM 管理的,而 Spark 是通过采样获取已经使用的内存情况,有可能因为采样不准确而不能及时 Spill,导致OOM
3. 在 Reduce 获取数据时,由于数据倾斜,有可能造成单个 Block 的数据非常的大,默认情况下是需要有足够的内存来保存单个 Block 的数据.因此,此时极有可能因为数据倾斜造成 OOM.可以设置 spark.maxRemoteBlockSizeFetchToMem 参数,设置这个参数以后,超过一定的阈值,会自动将数据 Spill 到磁盘,此时便可以避免因为数据倾斜造成 OOM 的情况.在我们的生产环境中也验证了这点,在设置这个参数到合理的阈值后,生产环境任务 OOM 的情况大大减少了
4. 在 Reduce 获取数据后,默认情况会对数据流进行解压校验(参数 spark.shuffle.detectCorrupt).正如在代码注释中提到,由于这部分没有 Spill 到磁盘操作,也有很大的可性能会导致 OOM.在我们的生产环境中也有碰到因为检验导致 OOM 的情况

Sort 采样分析

HashPartitioner

1
2
3
4
5
6
7
8
9
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}

def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}

RangePartitioner

  1. 先从迭代器的前k个元素,存储到蓄水池
  2. 依次遍历余下的元素,比如遍历第m个元素,然后生成[0, m)区间的随机数 i.如果 i 小于 k,则替换掉原来的第 i 个元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
val reservoir = new Array[T](k)
// Put the first k elements in the reservoir.
var i = 0
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}

// If we have consumed all the elements, return them. Otherwise do the replacement.
if (i < k) {
// 如果分区的数据全部才完了,则返回结果
// If input size < k, trim the array to return only an array of input size.
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
// 如果数据还有剩余,则进行蓄水池采样
// If input size > k, continue the sampling process.
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
l += 1
// There are k elements in the reservoir, and the l-th element has been
// consumed. It should be chosen with probability k/l. The expression
// below is a random long chosen uniformly from [0,l)
// 生成随机数
val replacementIndex = (rand.nextDouble() * l).toLong
// 如果随机数小于k, 则替代原来的元素
if (replacementIndex < k) {
reservoir(replacementIndex.toInt) = item
}
}
//返回结果
(reservoir, l)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//对每个分区进行采样
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//分区边界问题
def determineBounds[K: Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
// 按照数据进行数据排序,默认升序排列
val ordered = candidates.sortBy(_._1)
// 获取总的样本数量大小
val numCandidates = ordered.size
// 计算总的权重大小
val sumWeights = ordered.map(_._2.toDouble).sum
// 计算步长
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
// 获取排序后的第i个数据及权重
val (key, weight) = ordered(i)
// 累计权重
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
// 权重已经达到一个步长的范围,计算出一个分区id的值
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
// 上一个边界值为空,或者当前边界key数据大于上一个边界的值,那么当前key有效,进行计算
// 添加当前key到边界集合中
bounds += key
// 累计target步长界限
target += step
// 分区数量加1
j += 1
// 上一个边界的值重置为当前边界的值
previousBound = Some(key)
}
}
i += 1
}
// 返回结果
bounds.toArray
}

默认分区器

当触发shuffle,但没有指定partitioner.spark会自动生成默认的分区器

首先去寻找父类rdd(注意不是所有祖先的rdd,而仅仅是上一级的rdd)的partitioner,则返回其中的最大partitioner (按照分区数量排序)

如果父类rdd没有指定partitioner,但是spark.default.parallelism有在配置中指定,则使用该数值,创建HashPartitioner

否则,就找到父类rdd的最大分区数目,使用该数值,创建HashPartitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
if (hasPartitioner.nonEmpty) {
hasPartitioner.maxBy(_.partitions.length).partitioner.get
} else {
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(rdds.map(_.partitions.length).max)
}
}
}

Sample采样

1
2
3
4
5
6
7
-- 参数withReplacement
是否放回就是概论中中的放回抽样和不放回抽样
-- 参数fraction表示抽样比例
0~1之间的浮点数
--参数seed
随机种子
--用于抽样排查数据倾斜问题