Spark数据案例分析

RDD

案例一:连续登陆
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
object ContinuouslandingRdd {

//p1,2020-12-1
//p1,2020-12-2
//p1,2020-12-3
//p1,2020-12-4
//p1,2020-12-6
//p1,2020-12-7
//p1,2020-12-11
//p1,2020-12-13
//p2,2020-12-01
//p2,2020-12-13
//p2,2020-12-13
//p2,2020-12-14
//p2,2020-12-15
//p3,2020-12-15
//p4,2020-12-15


def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]"))

val rdd = sc.textFile("src/main/resources/date1.csv")
val maprdd: RDD[(String, String)] = rdd.map(line => {
val data = line.split(",")
(data(0), data(1))
})
//思路groupbykey获取去重,按照时间戳排序的组,减去排序的序列化,如果相等就是连续登陆的
val etlrdd: RDD[(String, (String, String))] = maprdd.groupByKey().flatMapValues(datas => {
//去重按照时间戳来排序
val sort = datas.toSet.toList.sorted
val sdf = new SimpleDateFormat("yyyy-MM-dd")
val calendar = Calendar.getInstance()
var index = 0
sort.map(data => {
val date: Date = sdf.parse(data)
calendar.setTime(date)
calendar.add(Calendar.DATE, -index)
index += 1
(data, sdf.format(calendar.getTime))
})
})

//获取连续登陆的开始时间,结束时间,以及练习登陆的天数
etlrdd.map ({
case (x, (y, z)) => ((x, z), y)
}).groupByKey().mapValues(data => {
val sort = data.toList.sorted
val first_time = sort.head
val last_time = sort.last
val lon = sort.size
(lon,first_time,last_time)
}).filter(_._2._1 >= 3).map({case((x,y),(z,q,p)) => (x,q,p) }).collect().foreach(println)

//(p2,2020-12-13,2020-12-15)
//(p1,2020-12-2,2020-12-4)

}
}
案例二:分组转化
SQL转化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//分组转化案例
object GroupedIntoRdd {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]"))
val rdd = sc.textFile("src/main/resources/data.csv")
val maprdd = rdd.map(line => {
val data = line.split(",")
(data(0).toInt, (data(1), data(2), data(3).toInt))
})

//按id来分组处理
val etlrdd = maprdd.groupByKey().flatMapValues(data => {
//下一条的begin_time - 上一条的end_time > 10min再分一组
//按照begin_time来排序
val sort = data.toSet.toList.sorted
val sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm")
var flag = 0 // 0或者1 用来判断是否小于十分钟
var sum = 0 //划分分组
var temp_time = 0L //接收上一个的end_time
sort.map(da => {
val begin_time = sdf.parse(da._1).getTime
val end_time = sdf.parse(da._2).getTime
if (temp_time != 0L) {
if ((begin_time - temp_time) / (1000 * 60) < 10) {
flag = 0
}
else {
flag = 1
}
}
sum += flag
temp_time = end_time
(begin_time, end_time, da._3, sum)
})
}).map({ case (x, (y, z, q, p)) => ((x, p), (y, z, q)) })

val value = etlrdd.reduceByKey((x, y) => {
val min = Math.min(x._2, y._2)
val max = Math.max(x._2, y._2)
var sum = x._3 + y._3
(min, max, sum)
}
).mapPartitions(iter => {
val sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm")
iter.map(data => {
val start = sdf.format(data._2._1)
val last = sdf.format(data._2._2)
((data._1._1, start), (last, data._2._3))
})
}).sortByKey()
value.collect().foreach(println)
sc.stop()
}
}

SparkSQL

案例一:连续登陆
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
object ContinuouslandingSql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()
spark.read.csv("src/main/resources/date1.csv").toDF("id","dt").createTempView("Continuouslanding")
spark.sql(
"""
|select
|id,
|min(dt),
|max(dt),
|count(landing)
|from
|(
|select
|id,dt,
|date_sub(dt,rn) as landing
|from
|(
|select
|id,dt,
|row_number() over(partition by id order by dt) as rn
|from
|(
|select
|distinct id,dt
|from
|Continuouslanding
|) t1
|) t2
|) t3
|group by id , landing
|""".stripMargin).show(100,false)
}
}
案例二:分组转化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
-- begin_time - 上一个的end_time 大于6分钟就重新分一组

object GroupedIntoSql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()
val schema = new StructType(Array(
StructField("id", DataTypes.StringType),
StructField("begin_time", DataTypes.StringType),
StructField("end_time", DataTypes.StringType),
StructField("down_flow", DataTypes.DoubleType),
))
val frame = spark.read.format("csv").schema(schema).load("C:\\Users\\hp\\IdeaProjects\\Spark_Maven\\src\\main\\resources\\data.csv")
frame.createTempView("GroupedInto")

spark.sql(
"""
|select
|id,
|min(begin_time) as begin_time,
|max(end_time) as end_time,
|sum(down_flow) as down_flow
|from
|(
|select
|id,begin_time,end_time,down_flow,
|sum(flag) over(partition by id order by begin_time
|rows between unbounded preceding and current row ) as sum_flag
|from
|(
|select
|id,begin_time,end_time,down_flow,
|if((to_unix_timestamp(begin_time,'yyyy/M/dd HH:mm') -
|to_unix_timestamp(rn_time,'yyyy/M/dd HH:mm') )> 600, 1, 0) as flag
|from
|(
|select
|id,begin_time,end_time,down_flow,
|lag(end_time,1,begin_time) over(partition by id order by begin_time ) as rn_time
|from
|GroupedInto
|) t1
|) t2
|) t3
|group by id , sum_flag
|""".stripMargin).show(100)
}
}

DSL

案例一:连续登陆
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
object ContinuouslandingDsl {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val dataFrame = spark.read.csv("src/main/resources/date1.csv").toDF("id", "dt")
dataFrame
.distinct().select(
col("id")
, col("dt")
, row_number().over(Window.partitionBy("id").orderBy(col("dt"))
.rowsBetween(Window.unboundedPreceding, Window.currentRow)) as "rn"
)
.select(
col("id")
, col("dt")
, date_sub(col("dt"), col("rn")) as "date_rn"
//spark2.0不支持使用expr()
)
.groupBy("id", "date_rn")
.agg(
min(col("dt")) as "begin_time"
, max(col("dt")) as "end_time"
, count(col("date_rn")) as "number"
)
.select("id", "begin_time", "end_time", "number")
// .drop(col("date_rn"))
.show(100)
}
}
案例二:分组转化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
object GroupedIntoSqlDsl {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()
val dataFrame = spark.read.csv("src/main/resources/data.csv").toDF("id", "begin_time", "end_time", "down_flow")
import spark.implicits._
import org.apache.spark.sql.functions._
dataFrame.selectExpr(
"id"
,"begin_time"
,"end_time"
,"down_flow"
,"lag(end_time,1,begin_time) over(partition by id order by begin_time) as lag_time " )
.select(
'id
,'begin_time
,'end_time
,'down_flow
//if (unix_timestamp('begin_time,"yyyy/M/dd HH:mm")-unix_timestamp(col("lag_time"),"yyyy/M/dd HH:mm") > 600 ,1,0)
,expr("if((to_unix_timestamp(begin_time,'yyyy/M/dd HH:mm')-to_unix_timestamp(lag_time ,'yyyy/M/dd HH:mm'))>600,1,0)") as "flag"
)
.select(
col("id")
,col("begin_time")
,col("end_time")
,col("down_flow")
,sum(col("flag")).over( Window.partitionBy(col("id")).orderBy(col("begin_time")).rowsBetween(Window.unboundedPreceding,Window.currentRow)) as "rn"
)
.groupBy(col("id"),col("rn"))
.agg(
min(col("begin_time")) as "begin_time"
,max((col("end_time"))) as "end_time"
,sum(col("down_flow")) as "down_flow"
)
//要是后面有select 则不需要使用drop来过滤字段了
.drop(col("rn"))
.orderBy(col("id"))
.select("id","begin_time","end_time","down_flow")
.show(100,false)
}
}

闭包/多线程问题

在driver端初始化了一个object或class实例,要在executor运行,必须实现序列化接口

如果实例是object类型,则每个executor共享一个,如果是class类型,及new了一个实例,则一个task一个实例

在函数中初始化实例,如果是单例的,则一个进程(executor)只有一个实例,如果是class类型,看调用的算子,如果是map,则没来一条数据就

new 一个,如果是mappartition,则一个分区一个实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//多个线程共用一个变量,会出现线程安全问题
object DateUntil {
private val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

def parse(st: String): Long = {
val date = format.parse(st)
date.getTime
}
}
//加锁改进
object DateUntil {
private val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

def parse(st: String): Long = synchronized {
val date = format.parse(st)
date.getTime
}
}
//使用mappartition一个分区一个simpledateformat
rdd.mapPartitions(data => {
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
data.map(txt => {
val date = format.parse(txt)
date.getTime
})
})