Spark常用算子分析

Transformation

map and filter
1
2
3
4
5
6
7
8
9
10
11
12
object MapAndFilterDemo {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("mapfliter").setMaster("local[*]"))
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val map: MapPartitionsRDD[Double, Int] = new MapPartitionsRDD[Double,Int](rdd, (_, _, iter) => iter.map(data => {
data.toString.toDouble * 10
}))
val filter = new MapPartitionsRDD[Double, Double](map, (_, _, iter) => iter.filter(data => data > 4))
filter.collect().foreach(println)
sc.stop()
}
}
map and mappartition
1
2
3
4
5
6
//mappartition 从数据源拿的是一个迭代器
new MapPartitionsRDD(
this,
(_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter)
//map拿取的是迭代器中的一个一个数据
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
repartitionAndSortWithinPartitions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
object repartitionAndSortWithinPartitionsDemo {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]"))
val rdd = sc.textFile(args(0))
val maprdd: RDD[(String, Int)] = rdd.map(data => {
val txt = data.split(",")
(txt(0), txt(1).toInt)
})
//TODO 获取分区的标准,例如按照姓名来分区,首先触发一次job获取有多少个人
val groupcount: Array[String] = maprdd.map(_._1).collect()
val partition = new MyPartition(groupcount)
//val rerdd: RDD[(String, Int)] = maprdd.repartitionAndSortWithinPartitions(partition)
implicit val sort = Ordering[String].on[String](t => t).reverse
val shuffrdd = new ShuffledRDD[String, Int, Int](maprdd, partition)
shuffrdd.setKeyOrdering(sort)
shuffrdd.saveAsTextFile(args(1))
Thread.sleep(Int.MaxValue)
sc.stop()
}
}

class MyPartition(groupcount: Array[String]) extends Partitioner {
private val map = mutable.Map[String, Int]()
var index = 0
for (name <- groupcount) {
map(name) = index
index += 1
}

override def numPartitions: Int = groupcount.length
//获取的是key
override def getPartition(key: Any): Int = {
val value = key.asInstanceOf[String]
map(value)
}
}
groupbykey and groupby
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
object GroupByKeyDemo {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("groupbykey").setMaster("local[*]"))
val maprdd = sc.makeRDD(List("spark", "scala", "java", "js", "flink",
"spark", "scala", "java", "js", "flink",
"spark", "scala", "java", "js", "flink")).map(data => (data, 1))
maprdd.groupByKey()
val shufflerdd = new ShuffledRDD[String, Int, CompactBuffer[Int]](maprdd, new HashPartitioner(maprdd.partitions.length))
//设置局部不聚合
shufflerdd.setMapSideCombine(false)
//将每个组内的元素的value值放到集合中
val createCombiner = (v: Int) => CompactBuffer(v)
//把组内其他元素添加到集合中
val mergeValue = (buf: CompactBuffer[Int], v: Int) => buf += v
//在下游进行合并
val mergeCombiners = (c1: CompactBuffer[Int], c2: CompactBuffer[Int]) => c1 ++= c2
val groupbukeyrdd = shufflerdd.setAggregator(new Aggregator[String, Int, CompactBuffer[Int]](
createCombiner, mergeValue, mergeCombiners
))
groupbukeyrdd.collect().foreach(println)
sc.stop()
}
}
1
2
3
4
5
6
 //源码
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = withScope {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
reducebykey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
object ReducebukeyDemo {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("groupbykey").setMaster("local[*]"))
val maprdd = sc.makeRDD(List("spark", "scala", "java", "js", "flink",
"spark", "scala", "java", "js", "flink",
"spark", "scala", "java", "js", "flink"), 2).map(data => (data, 1))
val createCombiner = (x: Int) => x
val mergeValue = (x: Int, y: Int) => x + y
val mergeCombiners = (x1: Int, x2: Int) => x1 + x2
maprdd.combineByKey(
(x => x), (x: Int, y: Int) => x + y, (x1: Int, x2: Int) => x1 + x2
).saveAsTextFile("combinebykey")
//使用shuffleRDD
val shufflerdd = new ShuffledRDD[String, Int, Int](maprdd, new HashPartitioner(maprdd.partitions.length))
//是否局部聚合
shufflerdd.setMapSideCombine(true)
shufflerdd.setAggregator(new Aggregator[String, Int, Int](
createCombiner, mergeValue, mergeCombiners
)).saveAsTextFile("shufflerdd")
sc.stop()
}
}
join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object JoinDemo {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("groupbykey").setMaster("local[*]"))
val rdd1 = sc.makeRDD(List(("spark", 1), ("scala", 2), ("kafka", 4), ("flink", 9), ("spark", 1)))
val rdd2 = sc.makeRDD(List(("spark", 3), ("scala", 4), ("kafka", 5), ("strom", 2), ("strom", 4)))
//cogroup的效果如:(spark,(CompactBuffer(1, 1),CompactBuffer(3))),考虑使用for循环,
val cordd: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
//flatMapValues把集合扁平化了,避免了空值的出现
val joinrdd: RDD[(String, (Int, Int))] = cordd.flatMapValues(data => {
for (it <- data._1; it2 <- data._2) yield (it, it2)
})
joinrdd.collect().foreach(println)
sc.stop()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
object FullJoinDemo {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("groupbykey").setMaster("local[*]"))
val rdd1 = sc.makeRDD(List(("spark", 1), ("scala", 2), ("kafka", 4), ("flink", 9), ("spark", 1)))
val rdd2 = sc.makeRDD(List(("spark", 3), ("scala", 4), ("kafka", 5), ("strom", 2), ("strom", 4)))
val cordd = rdd1.cogroup(rdd2)
cprdd.flatMapValues {
case (vs, Seq()) => vs.iterator.map((_,None))
case (Seq(), ws) => ws.iterator.map((None,_))
case (vs, ws) => for (it <- vs.iterator; it2 <- ws.iterator) yield (Some(it), Some(it2))
}.collect().foreach(println)
sc.stop()
}
}

Action