MapReduce实践

MapReduce API

Mapreduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop 的数据分析 应用”的核心框架。Mapreduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的 分布式运算程序,并发运行在一个hadoop 集群上

FileInputformatCase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-08
* Time: 20:29
* 对FileInputformatCase中TestInputformatCase的简单实现(默认)
* 案例统计字母出现的个数
*/
public class MR_WordCount1 {
static Configuration con;
static IntWritable wordtimes = new IntWritable(1);
static Text text = new Text();

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"C:\\Users\\hp\\IdeaProjects\\GitHub_Maven\\src\\main\\resources\\a.txt", "C:\\MapReduce3"};
con = new Configuration();
Job job = Job.getInstance(con, "");
job.setMapperClass(Map_MR_WordCount1.class);
job.setReducerClass(Reduce_MR_WordCount1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(1);
System.out.println(job.waitForCompletion(true) ? "正确" : "错误");

}

public static class Map_MR_WordCount1 extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
for (String word : value.toString().split("\\s+")) {
text.set(word);
context.write(text, wordtimes);
}
}
}

public static class Reduce_MR_WordCount1 extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int num = 0;
for (IntWritable value : values) {
num++;
}
context.write(key, new IntWritable(num));
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.tongyongtao.BigData.MapReduce.FileInputformatCase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-08
* Time: 21:02
* 处理小文件的一个方法 ComnineTextInputformat
* 案例统计字母出现个数
*/
public class MR_WordCount2 {
static Configuration con;
static IntWritable wordtimes = new IntWritable(1);
static Text text = new Text();

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"C:\\Users\\hp\\Desktop\\wordcount", "C:\\MapReduce"};
con = new Configuration();
Job job = Job.getInstance(con, "");
job.setMapperClass(MR_WordCount1.Map_MR_WordCount1.class);
job.setReducerClass(MR_WordCount1.Reduce_MR_WordCount1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//ComnineTextInputformat 方法不设置就就是默认的text
job.setInputFormatClass(CombineTextInputFormat.class);
//设置大小
CombineTextInputFormat.setMaxInputSplitSize(job,4192304);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(1);
System.out.println(job.waitForCompletion(true) ? "正确" : "错误");

}

public static class Map_MR_WordCount1 extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
for (String word : value.toString().split("\\s+")) {
text.set(word);
context.write(text, wordtimes);
}
}
}

public static class Reduce_MR_WordCount1 extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int num = 0;
for (IntWritable value : values) {
num++;
}
context.write(key, new IntWritable(num));
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.tongyongtao.BigData.MapReduce.FileInputformatCase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-08
* Time: 21:49
* keyvalues形式
*/
public class MR_WordCount3 {
static Configuration con;
static IntWritable wordtimes = new IntWritable(1);
static Text text = new Text();

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"C:\\Users\\hp\\IdeaProjects\\GitHub_Maven\\src\\main\\resources\\word_keyvalue.txt", "C:\\MapReduce5"};
con = new Configuration();
//设置
con.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR," ");
Job job = Job.getInstance(con, "");
job.setMapperClass(Map_MR_WordCount1.class);
job.setReducerClass(Reduce_MR_WordCount1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(1);
System.out.println(job.waitForCompletion(true) ? "正确" : "错误");

}

public static class Map_MR_WordCount1 extends Mapper<Text, Text, Text, IntWritable> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
for (String word : value.toString().split("\\s+")) {
text.set(word);
context.write(key, wordtimes);
}
}
}

public static class Reduce_MR_WordCount1 extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int num = 0;
for (IntWritable value : values) {
num++;
}
context.write(key, new IntWritable(num));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.tongyongtao.BigData.MapReduce.FileInputformatCase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2020-10-08
* Time: 22:23
* NLineInputFormat 按照行来切片
*/
public class MR_WordCount4 {
static Configuration con;
static IntWritable wordtimes = new IntWritable(1);
static Text text = new Text();

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"C:\\Users\\hp\\IdeaProjects\\GitHub_Maven\\src\\main\\resources\\word.nl.txt", "C:\\MapReduce3"};
con = new Configuration();
Job job = Job.getInstance(con, "");
//设置 三行一切
NLineInputFormat.setNumLinesPerSplit(job,3);
//设置
job.setInputFormatClass(NLineInputFormat.class);
job.setMapperClass(MR_WordCount1.Map_MR_WordCount1.class);
job.setReducerClass(MR_WordCount1.Reduce_MR_WordCount1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(1);
System.out.println(job.waitForCompletion(true) ? "正确" : "错误");

}

public static class Map_MR_WordCount1 extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
for (String word : value.toString().split("\\s+")) {
text.set(word);
context.write(text, wordtimes);
}
}
}

public static class Reduce_MR_WordCount1 extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int num = 0;
for (IntWritable value : values) {
num++;
}
context.write(key, new IntWritable(num));
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.tongyongtao.BigData.MapReduce.FileInputformatCase;
/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-08
* Time: 22:31
* 自定义切片
*/
public class MR_WordCount5 {

}

CombinerCase

1
2
3
4
5
6
7
8
9
public class MR_Combiner {
/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-08
* Time: 10:58
*/
}

JoinCase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package com.tongyongtao.BigData.MapReduce.JoinCase;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-08
* Time: 22:56
*/
public class TableBean1 implements Writable {
private int ID;
private String Pid;
private int amount;
private String Pname;
private String Flog;

@Override
public String toString() {
return
"ID=" + ID +
" Pid=" + Pid +
" amount=" + amount +
" Pname=" + Pname +
" Flog=" + Flog;

}

public int getID() {
return ID;
}

public void setID(int ID) {
this.ID = ID;
}

public String getPid() {
return Pid;
}

public void setPid(String pid) {
Pid = pid;
}

public int getAmount() {
return amount;
}

public void setAmount(int amount) {
this.amount = amount;
}

public String getPname() {
return Pname;
}

public void setPname(String pname) {
Pname = pname;
}

public String getFlog() {
return Flog;
}

public void setFlog(String flog) {
Flog = flog;
}

public TableBean1(int ID, String pid, int amount, String pname, String flog) {
this.ID = ID;
Pid = pid;
this.amount = amount;
Pname = pname;
Flog = flog;
}

public TableBean1() {
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(ID);
dataOutput.writeUTF(Pid);
dataOutput.writeInt(amount);
dataOutput.writeUTF(Pname);
dataOutput.writeUTF(Flog);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.ID = dataInput.readInt();
this.Pid = dataInput.readUTF();
this.amount = dataInput.readInt();
this.Pname = dataInput.readUTF();
this.Flog = dataInput.readUTF();
}

@Override
protected Object clone() throws CloneNotSupportedException {

return new TableBean1(getID(), getPid(), getAmount(), getPname(), getFlog());
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package com.tongyongtao.BigData.MapReduce.JoinCase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-08
* Time: 22:32
* 简单join的实现 两张表 实现在reduce端的join
* 首先在setup方法中获取文件的名字 比如 order.txt 和 pd.txt
* key 和 value 的设置,两张表有重合部分作为key 这样排序后在同一个迭代器中
* 集合遍历取代原来的
* order.txt id pid amount
* pd.txt pid pname
* ...这个案例检查了好久 出现如下bug :dataInput和 dataOutput写入读取的顺序不一致,spilt[]的顺序不一致
* 没有往集合中添加数据........
*/
public class MR_Join1 {
static Configuration con;

public static void main(String[] args) throws Exception {
args = new String[]{"C:\\Users\\hp\\IdeaProjects\\GitHub_Maven\\src\\main\\resources\\jointask", "C:\\MAPREDYCE1"};
con = new Configuration();
Job job = Job.getInstance(con, "MR_Join1");
job.setMapperClass(Map_Join1.class);
job.setReducerClass(Reduce_Join1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean1.class);
job.setOutputKeyClass(TableBean1.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.out.println(job.waitForCompletion(true) ? "zq" : "cw");
}

public static class Map_Join1 extends Mapper<LongWritable, Text, Text, TableBean1> {
String name = null;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取读取的文件的名字
FileSplit inputSplit = (FileSplit) context.getInputSplit();
name = inputSplit.getPath().getName();
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//按行切
TableBean1 tableBean = new TableBean1();
Text text = new Text();
String[] split = value.toString().split("\\s+");
if (name.startsWith("order")) {
tableBean.setID(Integer.parseInt(split[0]));
tableBean.setPid(split[1]);
tableBean.setAmount(Integer.parseInt(split[2]));
tableBean.setPname(" ");
tableBean.setFlog("order");
text.set(split[1]);
} else {
tableBean.setID(0);
tableBean.setPid(split[0]);
tableBean.setAmount(0);
tableBean.setPname(split[1]);
tableBean.setFlog("pd");
text.set(split[0]);
}
context.write(text, tableBean);
}
}

public static class Reduce_Join1 extends Reducer<Text, TableBean1, TableBean1, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean1> values, Context context) throws IOException, InterruptedException {
//存放订单表
List<TableBean1> tableBean1s = new ArrayList<>();
//存放pd表
TableBean1 tableBeanPD = new TableBean1();
for (TableBean1 value : values) {
TableBean1 clone = null;
if ("order".equals(value.getFlog())) {
try {
clone = (TableBean1) value.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
tableBean1s.add(clone);
} else {
try {
tableBeanPD = (TableBean1) value.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
}
}
for (TableBean1 tableBean1 : tableBean1s) {
tableBean1.setPid(tableBeanPD.getPname());
context.write(tableBean1, NullWritable.get());
}
}
}
}

PartitionAndGroupingCase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.tongyongtao.BigData.MapReduce.OutputformatCase;


import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class HbaseBean1 implements Writable {
private String movie;
private double rate;
private long timeStamp;
private String uid;

public HbaseBean1() {
}

public String getMovie() {
return movie;
}

public void setMovie(String movie) {
this.movie = movie;
}

public double getRate() {
return rate;
}

public void setRate(double rate) {
this.rate = rate;
}

public long getTimeStamp() {
return timeStamp;
}

public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}

public String getUid() {
return uid;
}

public void setUid(String uid) {
this.uid = uid;
}

public HbaseBean1(String movie, double rate, long timeStamp, String uid) {
this.movie = movie;
this.rate = rate;
this.timeStamp = timeStamp;
this.uid = uid;
}

@Override
public String toString() {
return "Movie{" +
"movie='" + movie + '\'' +
", rate=" + rate +
", timeStamp=" + timeStamp +
", uid='" + uid + '\'' +
'}';
}


@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(movie);
dataOutput.writeDouble(rate);
dataOutput.writeLong(timeStamp);
dataOutput.writeUTF(uid);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.movie = dataInput.readUTF();
this.rate = dataInput.readDouble();
this.timeStamp = dataInput.readLong();
this.uid = dataInput.readUTF();
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.tongyongtao.BigData.MapReduce.OutputformatCase;

import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-12
* Time: 22:19
* Hbase与MR的交互
* 案例:电影案例 分析:设计rowkey ImmutableBytesWritable
*/
public class MR_Hbase1 {
public static void main(String[] args) throws Exception {
args = new String[]{"C:\\Users\\hp\\IdeaProjects\\GitHub_Maven\\src\\main\\resources\\hbase.json"};
Configuration con = HBaseConfiguration.create();
con.set("hbase.zookeeper.quorum", "linux03,linux04,linux05");
Job job = Job.getInstance(con, "MR_Hbase");
job.setMapperClass(MAP_Hbase1.class);
job.setReducerClass(Reduce_Hbase1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(HbaseBean1.class);
// 穿件扫描对象用来扫描源hbase中的所有的数据
Scan scan = new Scan();
// 接收的扫描的数据的行数
scan.setCaching(200);
scan.setCacheBlocks(false);
job.setJarByClass(HbaseBean1.class);

// 输入数据的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 插入数据的表要存在
TableMapReduceUtil.initTableReducerJob("movie1", Reduce_Hbase1.class, job);
System.out.println(job.waitForCompletion(true) ? "zq" : "cw");
}

public static class MAP_Hbase1 extends Mapper<LongWritable, Text, Text, HbaseBean1> {
static Gson gson = new Gson();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//电影名字和时间龊构成一个rowkey
HbaseBean1 hbaseBean = null;
Text text = null;
try {
hbaseBean = gson.fromJson(value.toString(), HbaseBean1.class);
String movie = StringUtils.leftPad(hbaseBean.getMovie(), 4, '0');
String movieStamp = StringUtils.leftPad( String.valueOf(hbaseBean.getTimeStamp()),9,'0');
text = new Text();
text.set(movie+"_"+movieStamp);
} catch (Exception e) {

}
context.write(text,hbaseBean);


}
}

public static class Reduce_Hbase1 extends TableReducer<Text, HbaseBean1, ImmutableBytesWritable> {

@Override
protected void reduce(Text key, Iterable<HbaseBean1> values, Context context) throws IOException, InterruptedException {
//key是rowkey values,迭代器只存储了一个
Put put = new Put(key.toString().getBytes());
for (HbaseBean1 value : values) {
put.addColumn("cf".getBytes(),"moviename".getBytes(),value.getMovie().getBytes());
put.addColumn("cf".getBytes(),"movierate".getBytes(),String.valueOf(value.getRate()).getBytes());
put.addColumn("cf".getBytes(),"moviestamp".getBytes(),String.valueOf(value.getTimeStamp()).getBytes());
put.addColumn("cf".getBytes(),"movieuid".getBytes(),value.getUid().getBytes());
}
context.write(null,put);


}
}
}

OutputformatCase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.tongyongtao.BigData.MapReduce.PartitionAndGrouping;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-08
* Time: 14:36
*/
public class Movie implements WritableComparable<Movie> {
private String movie;
private double rate;
private long timeStamp;
private String uid;

public Movie() {
}

public String getMovie() {
return movie;
}

public void setMovie(String movie) {
this.movie = movie;
}

public double getRate() {
return rate;
}

public void setRate(double rate) {
this.rate = rate;
}

public long getTimeStamp() {
return timeStamp;
}

public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}

public String getUid() {
return uid;
}

public void setUid(String uid) {
this.uid = uid;
}

public Movie(String movie, double rate, long timeStamp, String uid) {
this.movie = movie;
this.rate = rate;
this.timeStamp = timeStamp;
this.uid = uid;
}

@Override
public String toString() {
return "Movie{" +
"movie='" + movie + '\'' +
", rate=" + rate +
", timeStamp=" + timeStamp +
", uid='" + uid + '\'' +
'}';
}

@Override
public int compareTo(Movie movie) {
//重写排序规则 用户名字相同按rate排序
return this.uid.equals(movie.uid) ? Double.compare(movie.rate, this.rate) : this.uid.compareTo(movie.uid);

}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(movie);
dataOutput.writeDouble(rate);
dataOutput.writeLong(timeStamp);
dataOutput.writeUTF(uid);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.movie = dataInput.readUTF();
this.rate = dataInput.readDouble();
this.timeStamp = dataInput.readLong();
this.uid = dataInput.readUTF();
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package com.tongyongtao.BigData.MapReduce.PartitionAndGrouping;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-08
* Time: 16:32
*/
public class Movie1 implements WritableComparable<Movie1> {
private String movie;
private double rate;
private long timeStamp;
private String uid;

@Override
public String toString() {
return "Movie_TOPN{" +
"movie='" + movie + '\'' +
", rate=" + rate +
", timeStamp=" + timeStamp +
", uid='" + uid + '\'' +
'}';
}

public Movie1(String movie, double rate, long timeStamp, String uid) {
this.movie = movie;
this.rate = rate;
this.timeStamp = timeStamp;
this.uid = uid;
}

public String getMovie() {
return movie;
}

public void setMovie(String movie) {
this.movie = movie;
}

public double getRate() {
return rate;
}

public void setRate(double rate) {
this.rate = rate;
}

public long getTimeStamp() {
return timeStamp;
}

public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}

public String getUid() {
return uid;
}

public void setUid(String uid) {
this.uid = uid;
}

public Movie1() {
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(movie);
dataOutput.writeDouble(rate);
dataOutput.writeLong(timeStamp);
dataOutput.writeUTF(uid);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.movie = dataInput.readUTF();
this.rate = dataInput.readDouble();
this.timeStamp = dataInput.readLong();
this.uid = dataInput.readUTF();
}

@Override
public int compareTo(Movie1 movie1) {
return this.movie.equals(movie1.movie) ? Double.compare(movie1.rate, this.rate) : this.movie.compareTo(movie1.movie);
}

@Override
protected Object clone() throws CloneNotSupportedException {

return new Movie1(getMovie(),getRate(),getTimeStamp(),getUid());
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.tongyongtao.BigData.MapReduce.PartitionAndGrouping;

import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User:tongyongtao
* Date: 2020-10-08
* Time: 10:52
* 重写分区分组方法
* 案例:从众多的电影评论中获取每个用户评分排名全三的电影,按电影名排序,电影名相同采用评分排序
*/
public class MR_PartitionAndGroup {
static Configuration con;
static Gson gson = new Gson();

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"C:\\Users\\hp\\IdeaProjects\\GitHub_Maven\\src\\main\\resources\\test.json", "C:\\MapReduce"};
con = new Configuration();
Job job = Job.getInstance(con, "MR_PartitionAndGroup");
//设置来源
job.setMapperClass(Map_PartitionAndGroup.class);
job.setReducerClass(Reduce_PartitionAndGroup.class);
//设置输入 输出类型
job.setMapOutputKeyClass(Movie.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Movie.class);
job.setOutputValueClass(NullWritable.class);
//设置分区标准
job.setPartitionerClass(MyPartion.class);
//设置分组标准
job.setGroupingComparatorClass(MyComparator.class);
//设置数据 输入 输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置reduce的个数
job.setNumReduceTasks(3);
//关闭
System.out.println(job.waitForCompletion(true) ? "true" : "false");
}

public static class Map_PartitionAndGroup extends Mapper<LongWritable, Text, Movie, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
Movie movie = gson.fromJson(value.toString(), Movie.class);
context.write(movie, NullWritable.get());
} catch (Exception e) {
//捕捉异常,不处理异常
}
}
}

public static class Reduce_PartitionAndGroup extends Reducer<Movie, NullWritable, Movie, NullWritable> {
@Override
protected void reduce(Movie key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
int num = 0;
for (NullWritable value : values) {
context.write(key, NullWritable.get());
num++;
if (num == 3) {
return;
}
}
}
}

//重写分区方法,按uid分区
public static class MyPartion extends Partitioner<Movie, NullWritable> {
@Override
public int getPartition(Movie movie, NullWritable nullWritable, int i) {
//按原码来,其实可自定义
return (movie.getUid().hashCode() & Integer.MAX_VALUE) % i;
}
}

//重写分组方法 按uid来排序
public static class MyComparator extends WritableComparator {
public MyComparator() {
super(Movie.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
Movie A = (Movie) a;
Movie B = (Movie) b;
return A.getUid().compareTo(B.getUid());
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package com.tongyongtao.BigData.MapReduce.PartitionAndGrouping;

import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.*;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-08
* Time: 16:30
* 使用set方法在处理reduce之后的工作
* 案例:获取电影评论数排名前10的电影
*/
public class MR_PartitionAndGroup1 {
static Configuration con;
static Gson gson = new Gson();


public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"C:\\Users\\hp\\IdeaProjects\\GitHub_Maven\\src\\main\\resources\\test.json", "C:\\MapReduce2"};
con = new Configuration();
Job job = Job.getInstance(con, "MR_PartitionAndGroup1");
job.setMapperClass(Map_PartitionAndGroup1.class);
job.setReducerClass(Reduce_PartitionAndGroup1.class);
job.setMapOutputKeyClass(Movie1.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Movie1.class);
job.setOutputValueClass(IntWritable.class);
job.setPartitionerClass(MyPartion.class);
job.setGroupingComparatorClass(MyComparator.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置reduce数量最好为1
job.setNumReduceTasks(2);
System.out.println(job.waitForCompletion(true) ? "true" : "false");

}

public static class Map_PartitionAndGroup1 extends Mapper<LongWritable, Text, Movie1, IntWritable> {
// IntWritable intWritable = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
Movie1 movie = gson.fromJson(value.toString(), Movie1.class);
context.write(movie, new IntWritable(1));
} catch (Exception e) {

}
}
}

static Map<Movie1, Integer> movie_topn = new HashMap<>();

public static class Reduce_PartitionAndGroup1 extends Reducer<Movie1, IntWritable, Movie1, IntWritable> {


@Override
protected void reduce(Movie1 key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//reducetask拉取maptask端 聚合排序每一组相同的key在一个迭代器中
//这里注意迭代器只有一个对象,采用赋值或者克隆的方法
// BeanUtils.copyProperties();同样也可
Movie1 movie = null;
try {
movie = (Movie1) key.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
int sum = 0;
for (IntWritable value : values) {


sum++;
}
// context.write(key,new IntWritable(sum));
movie_topn.put(movie, sum);

}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
//set方法最终执行 有且执行一次,采取构建一个集合存储
//hashmap集合中存储 key是movie 值是数量 进行排序取值
List<Map.Entry<Movie1, Integer>> TopNmovies = new ArrayList<>(movie_topn.entrySet());
TopNmovies.sort((t1, t2) -> Integer.compare(t2.getValue(), t1.getValue()));
for (int i = 0; i < Integer.min(3, TopNmovies.size()); i++) {
context.write(TopNmovies.get(i).getKey(), new IntWritable(TopNmovies.get(i).getValue()));
}
}
}

//按movie来分区
public static class MyPartion extends Partitioner<Movie1, IntWritable> {
@Override
public int getPartition(Movie1 movie_topn, IntWritable intWritable, int i) {
return (movie_topn.getMovie().hashCode() & Integer.MAX_VALUE) % i;
}
}

//按movie来排序
public static class MyComparator extends WritableComparator {
public MyComparator() {
super(Movie1.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
Movie1 A = (Movie1) a;
Movie1 B = (Movie1) b;
return A.getMovie().compareTo(B.getMovie());
}
}
}