Spark性能优化简单分析

资源调优

Spark作业运行基本原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1. spark-submit 根据部署模式(本地或集群中某工作节点)启动一个对应的Driver进程,
Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core

2. Driver进程要做的第一件事情就是向sparkstandalone集群也可以是yarn集群申请
spark作业的资源(executor),集群管理器会根据我们为Spark作业设置的资源参数,在各
个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core

3. 再申请到作业所需的资源,Driver进程就会开始调度和执行编写的作业代码,
Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task

4. Driver将这些task分配到各个executor进程中.task是最小的计算单位,
一个stage中所有的task都执行完毕之后,会在各个节点本地的磁盘文件中
写入计算中间结果,然后Driver就会调度运行下一个stage

5. 下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,
直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止
(Spark是根据shuffle类算子来进行stage的划分,shuffle算子执行以及之后的代码会被
划分为下一个stage.因此一个stage刚开始执行的时候,它的每个task可能都会从上一个
stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的
所有相同的key使用我们自己编写的算子函数执行聚合操作)

6. 当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别
的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件
中.executor的内存分为三个部分(task代码执行内存占20%task通过shuffle拉取上游
结果进行聚合等占据20%,RDD持久化占据60%),ask的执行速度是跟每个Executor进程的
CPU core数量有直接关系的.一个CPU core同一时间只能执行一个线程

提交任务时合理分配资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
– total-executor-cores
该参数用于设置Spark作业总共可以使用多少个core
不要超过队列总CPU core的1/3~1/2左右比较合适

– num-executors
该参数用于设置Spark作业总共要用多少个Executor进程来执行
每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,
设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;
设置的太多的话,大部分队列可能无法给予充分的资源

– executor-memory
该参数用于设置每个Executor进程的内存.Executor内存的大小,很多时候直接
决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联
每个Executor进程的内存设置4G~8G较为合适.但是这只是一个参考值,
具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列
的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的
此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要
超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所
有的资源,导致别的同事的作业无法运行

– executor-cores
该参数用于设置每个Executor进程的CPU core数量.这个参数决定了每个
Executor进程并行执行task线程的能力.因为每个CPU core同一时间只能
执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程
Executor的CPU core数量设置为2~4个较为合适.同样得根据不同部门的
资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,
再依据设置的Executor数量,来决定每个Executor进程可以分配到几个
CPU core.同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores
不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同事的作业运行

– driver-memory
该参数用于设置Driver进程的内存
Driver的内存通常来说不设置,或者设置1G左右应该就够了
唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到
Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题

提交的任务要充分使用资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
- 降低HDFS的block块的大小
因为Spark用的读取文件的方法是MR的方法,所以读取文件的时候,首先划分成
一个一个的split。RDD的partition数=split数,而在默认情况下,split数=block数
,所以partition数=block数,所以降低block块的大小可以增加block块的个数
,从而增加partition的个数,从而提供并行度

- sparkContext.textFile(path, numPartitions)
在读取文件的时候可以指定分区数

- coalesce(numPartitions, isshuffle)
改变RDD分区数,一般设置第二个参数为false,用于减少分区

- repartition(numPartitions)
改变RDD分区数,一般用于增加分区数

- reduceByKey、groupByKey这些算子都可以指定分区数,决定返回的RDD的分区个数

- spark.default.parallelism
该参数用于设置每个stage的默认task数量
这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能
Spark作业的默认task数量为500~1000个较为合适
很多人常犯的一个错误就是不去设置这个参数,那么此时就会导致
Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task
通常来说,Spark默认设置的数量是偏少的(比如就几十个task),
如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃
试想一下,无论你的Executor进程有多少个,内存和CPU有多大,
但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源
因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores
的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源

- spark.sql.shuffle.partitions
spark sql编程中产生shuffle,决定shuffle过程中reduce task的个数
default:200
参数调优建议:
同 spark.default.parallelism

- partitionBy(Partitioner)
通过自定义分区器来改变partition个数

- join算子也会改变RDD分区数
val rdd3 = rdd1.join(rdd2)
默认情况下(spark.default.parallelism没有设置)时,RDD3的分区数是
由父RDD中分区数最多的RDD决定的,比如rdd1有2个分区,rdd2有3个分区,那么rdd3有3个分区

代码调优

代码调优的十个原则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
1. 避免创建重复的RDD.对于同一份数据只应该创建一个RDD,否则spark作业会进行重复计算进而增加了作业开销

2. 尽可能复用同一个RDD.对不同的数据执行算子操作时还要尽可能地复用一个RDD.
比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,
这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型
的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD

3. 对多次使用的RDD进行持久化.Spark中对于一个RDD执行多次算子的默认原理是这样的:
每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,
然后再对这个RDD执行你的算子操作,对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,
将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据

以下情况考虑使用checkpoint()算子
这个RDD是经过很长时间才计算完毕或者计算这个RDD的逻辑非常复杂,
这说明这个RDD的计算结果非常重要,一般选用checkpoint来持久化,这样数据更
安全了,task计算失败的时候会从checkpoint读取数据进行计算
你的业务逻辑非常非常的复杂,RDD的依赖关系(lineage)非常长,使用checkpoint截断依赖关系,提高容错的效率
对某个RDD执行 checkpoint() 之前,对该 RDD 执行 cache(),这样的话,
新启动的job只需要把内存中的数据上传到HDFS中即可,不需要重新计算

4. 尽量避免使用shuffle类算子.shuffle过程中,各个节点上的相同key
都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key
.而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多
,导致内存不够存放,进而溢写到磁盘文件中.因此在shuffle过程中,
可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作.磁盘IO和网络数据传输也是shuffle性能较差的主要原因

5. 如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,
那么尽量使用可以map-side预聚合的算子.所谓的map-side预聚合,
说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中
的本地combiner.map-side预聚合之后,每个节点本地就只会有一条相同的key,
因为多条相同的key都被聚合起来了.其他节点在拉取所有节点上的相同key时,
就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销.
通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子
来替代掉groupByKey算子.因为reduceByKey和aggregateByKey算子都会
使用用户自定义的函数对每个节点本地的相同key进行预聚合.而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差

6. 使用高性能算子.
(1). 使用reduceByKey/aggregateByKey替代groupByKey

(2). 使用mapPartitions替代普通map.一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些.但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题.因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常

(3). 使用foreachPartitions替代foreach

(4). 使用filter之后进行coalesce操作.通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去.因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢.因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition.在某些场景下,对于性能的提升会有一定的帮助

(5). 使用repartitionAndSortWithinPartitions替代repartition与sort类操作.如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子.因为该算子可以一边进行重分区的shuffle操作,一边进行排序.shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的

7. 尽量使用广播变量.在算子函数中使用到外部变量时,默认情况下
,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本.
如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,
以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能

8. 使用Kryo优化序列化性能.

9. 优化数据结构.Java中,有三种类型比较耗费内存:对象 字符串 集合类型
在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,
尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,
使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能,
但是在编码实践中发现,要做到该原则其实并不容易

持久化策略

Spark的持久化级别
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
MEMORY_ONLY
使用未序列化的Java对象格式,将数据保存在内存中.如果内存不够存放所有的数据,
则数据可能就不会进行持久化.那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,
需要从源头处重新计算一遍.这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略

MEMORY_AND_DISK
使用未序列化的Java对象格式,优先尝试将数据保存在内存中.如果内存不够存放所有的数据,
会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用

MEMORY_ONLY_SER
基本含义同MEMORY_ONLY.唯一的区别是,会将RDD中的数据进行序列化,RDD的每个
partition会被序列化成一个字节数组.这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC

MEMORY_AND_DISK_SER
基本含义同MEMORY_AND_DISK.唯一的区别是,会将RDD中的数据进行序列化,
RDD的每个partition会被序列化成一个字节数组.这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC

DISK_ONLY
使用未序列化的Java对象格式,将数据全部写入磁盘文件中

MEMORY_ONLY_2, MEMORY_AND_DISK_2
对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,
并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,
节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。
如果没有副本的话,就只能将这些数据从源头处重新计算一遍了
持久化策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,
可以绰绰有余地存放下整个RDD的所有数据.因为不进行序列化与反序列化操作,
就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,
不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上.
但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,

如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常
如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别.
该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,
大大减少了对象数量,并降低了内存占用.这种级别比MEMORY_ONLY多出来的性能开销,
主要就是序列化与反序列化的开销.但是后续算子可以基于纯内存进行操作,
因此性能总体还是比较高的.此外,可能发生的问题同上,如果RDD中的数据量过多的话,
还是可能会导致OOM内存溢出的异常
如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,
而不是MEMORY_AND_DISK策略.因为既然到了这一步,就说明RDD的数据量很大,
内存无法完全放下.序列化后的数据比较少,可以节省内存和磁盘的空间开销.
同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘

通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,
会导致性能急剧降低,有时还不如重新计算一次所有RDD.后缀为_2的级别,
必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,
除非是要求作业的高可用性,否则不建议使用

数据本地化调优

task数据本地化级别

1
2
3
4
5
1. PROCESS_LOCAL:进程本地化:task要计算的数据在同一个executor进程的内存中
2. NODE_LOCAL:节点本地化: task要计算的数据在本机的磁盘上,或者本机另一个executor进程的内存中
3. NO_PREF:没有本地化级别:task要计算的数据在其他的机子上
4. RACK_LOCAL:机架本地化
5. ANY:跨机架取数据

TaskScheduler发送task选择本地化级别的策略

1
2
3
4
5
6
比如计算需要的数据在node01这台服务器中的Executor1这个进程中,那么TaskScheduler
会把TaskSet发往Executor1进程中执行,此时的数据本地化级别时PROCESS_LOCAL,Executor1
是最佳的计算位置,如果发送的task在等待了3秒,重试了5次之后仍然没有执行,那么TaskScheduler
就认为Executor1的资源不充足,不足以支撑计算,那么降低数据本地化级别,把task发往node01
的另外一个进程Executor2中,这时的数据本地化级别为NODE_LOCAL,如果还无法执行,
降低为RACK_LOCAL,ANY,直到Task可以开始计算

数据倾斜调优

定位数据倾斜

1
2
3
4
5
6
7
8
9
10
1. 在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,
比如按照key进行聚合或join等操作.此时如果某个key对应的数据量特别大的话,就会发生数据倾斜

2. 定位数据倾斜.如果是yarn-client模式直接查看本地log,如果是yarn-cluster模式查看spark-web-UI.
如何定位发生了数据倾斜:首先通过spark-UI定位发生在哪些stage的task,当然可能发生某个task突然内存溢出,
建议直接看yarn-client模式下本地log的异常栈,或者是通过YARN查看yarn-cluster模式下的log中的异常栈,
一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出

3. 查看导致数据倾斜的key的数据分布情况.知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了
shuffle操作并且导致了数据倾斜的RDD/Hive表,查看一下其中key的分布情况,这主要是为之后选择哪一种技术方案提供依据.

数据倾斜的解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1. 使用Hive ETL预处理数据
通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join.治标不治本,Hive ETL中还是会发生数据倾斜
在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,
而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,
每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验

2. 过滤少数导致倾斜的key
如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key
但是大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个

3. 提高shuffle的并行度
设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,
该值默认是200,对于很多场景来说都有点过小,只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限

4. 双重聚合
将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题.接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果.仅仅适用于聚合类的shuffle操作,适用范围相对较窄.如果是join类的shuffle操作,还得用其他的解决方案

5. 将reduce join转为map join
不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现

6. 采样倾斜key并分拆join操作
对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join.如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合

7. 使用随机前缀和扩容RDD进行join


Shuffle调优

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
1. spark.shuffle.file.buffer
默认值:32k
该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小.
将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k)
从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,
进而提升性能.在实践中发现,合理调节该参数,性能会有1%~5%的提升

2. spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),
从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能.在实践中发现,合理调节该参数,性能会有1%~5%的提升.

3. spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时
如果因为网络异常导致拉取失败,是会自动进行重试的.该参数就代表了可以重试的最大次数.
如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败

调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次)
以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败.在实践中发现,
对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性.

4. spark.shuffle.io.retryWait
默认值:5s
参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s.

调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性.

5. spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%.

调优建议:在资源调优中讲解过这个参数.如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘.在实践中发现,合理调节该参数可以将性能提升10%左右.

6. spark.shuffle.manager
默认值:sort
参数说明:该参数用于设置ShuffleManager的类型.Spark 1.5以后,有三个可选项:
hash、sort和tungsten-sort.HashShuffleManager是Spark 1.2以前的默认选项,
但是Spark 1.2以及之后的版本默认都是SortShuffleManager了.tungsten-sort
与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高.

调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的
业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;
而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,
通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能.这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug.

7. spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的
数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,
而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个
task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件.

调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议
将这个参数调大一些,大于shuffle read task的数量.那么此时就会自动启用
bypass机制,map-side就不会进行排序了,减少了排序的性能开销.但是这种方式下,
依然会产生大量的磁盘文件,因此shuffle write性能有待提高.

8. spark.shuffle.consolidateFiles
默认值:false
参数说明:如果使用HashShuffleManager,该参数有效.如果设置为true,
那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,
对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能.

调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用
bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,
使用HashShuffleManager,同时开启consolidate机制.在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%.