Apache-Hbase-API

Apache Hbase

HbaseUtils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-11-06
* Time: 20:01
*/
public class HbaseUtils {
private static Configuration configuration;
private static Connection connection;


static {

configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "linux03,linux04,linux05");
}

public static Connection getConnection() {
// if (connection.isClosed() && connection== null)
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
return connection;
}


}

DDLTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-11-06
* Time: 20:34
* DDL语法
*/
public class DDLTable {
public static void main(String[] args) throws IOException {
// System.out.println(createTable("tab_111", "f1", "f2"));
// System.out.println(isTableExist("tab_1"));
// System.out.println(addFamily("tab_1", "f3"));
//System.out.println(dropTable("tab_1"));
System.out.println(createNamespace("myspace"));
}

//判断表是否存在
public static Boolean isTableExist(String tableName) throws IOException {
Connection connection = HbaseUtils.getConnection();
//对表的操作
Admin admin = connection.getAdmin();
return admin.tableExists(TableName.valueOf(tableName));
}

public static Boolean createTable(String tableName, String... cfs) throws IOException {
Connection connection = HbaseUtils.getConnection();
//首先判断表是否存在
//判断列族是否为空
//发现一个错误,竟然没写! 查了好久
if (!isTableExist(tableName)) {
if (cfs.length > 0) {

Admin admin = connection.getAdmin();
//采用工厂方法
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
//添加列族
for (String cf : cfs) {
ColumnFamilyDescriptorBuilder columnFamily = ColumnFamilyDescriptorBuilder.newBuilder(cf.getBytes());
tableDescriptorBuilder.setColumnFamily(columnFamily.build());
}
admin.createTable(tableDescriptorBuilder.build());
return true;
}
}
return false;
}

public static boolean createTable1(String table, String... cfs) throws IOException {
Connection connection = HbaseUtils.getConnection();
//判断列族是否存在
if (cfs == null) {
return false;
}
//判断表是否存在
if (isTableExist(table)) {
return true;
}
Admin admin = connection.getAdmin();
//表的构建
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(table));
//列族的构建
for (String cf : cfs) {
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cf.getBytes());
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
}
admin.createTable(tableDescriptorBuilder.build());
return true;
}
//添加列族
public static Boolean addFamily(String tableName, String... cfs) throws IOException {
Connection connection = HbaseUtils.getConnection();
Admin admin = connection.getAdmin();
//先判断表是否存在
if (isTableExist(tableName)) {
for (String cf : cfs) {
ColumnFamilyDescriptorBuilder columnFamily = ColumnFamilyDescriptorBuilder.newBuilder(cf.getBytes());
admin.addColumnFamily(TableName.valueOf(tableName), columnFamily.build());
}
return true;
}
return false;
}
//删除表
public static Boolean dropTable(String tableName) throws IOException {
Connection connection = HbaseUtils.getConnection();
if (isTableExist(tableName)) {

Admin admin = connection.getAdmin();
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
return true;
}
return false;
}
//创建名称空间
public static Boolean createNamespace(String spacename) throws IOException {
Connection connection = HbaseUtils.getConnection();
Admin admin = connection.getAdmin();
NamespaceDescriptor build = NamespaceDescriptor.create(spacename).build();
admin.createNamespace(build);
return true;
}
}

DeleteData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-11-07
* Time: 14:41
*/
public class DeleteData {
public static void main(String[] args) throws IOException {
// deleteData("tab_2","10001");
deleteData1("tab_2", "40001", "status");
}

//删除rowkey
public static void deleteData(String tableName, String rowkey) throws IOException {
Connection connection = HbaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowkey));
table.delete(delete);
}

public static void deleteData1(String tableName, String rowkey, String cf) throws IOException {
Table table = HbaseUtils.getConnection().getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowkey));
delete.addFamily(Bytes.toBytes(cf));
table.delete(delete);
}
}

DMLTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DMLTable {
/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-11-06
* Time: 22:00
*/

public static void main(String[] args) throws Exception {
// putData("tab_2", "60005", "status", "name", "小胡");
// getData("tab_2","10001");
// scanData("tab_2","","");
// filterData("tab_2");
// filterData1("tab_2","status","age","19");
// filterData2("tab_2","status");
filterData3("tab_2", "name");
}

public static void putData(String tableName, String rowkey, String cf, String cm, String values) throws IOException {
Connection connection = HbaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cm), Bytes.toBytes(values));
table.put(put);
}

//获取数据get方法
public static void getData(String tableName, String rowkey) throws IOException {
Connection connection = HbaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowkey));
Result result = table.get(get);
for (Cell cell : result.rawCells()) {
System.out.println(new String(CellUtil.cloneFamily(cell)));
// System.out.println(Arrays.toString(Bytes.toBytes(ByteBuffer.wrap(CellUtil.cloneQualifier(cell)))));
System.out.println(new String(CellUtil.cloneQualifier(cell)));
System.out.println(new String(CellUtil.cloneValue(cell)));
System.out.println(new String(CellUtil.cloneRow(cell)));
}
}

//get方法获取数据
public static void getData1(String tableName, String rowkey) throws IOException {
Connection connection = HbaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName));

List<Get> gets = new ArrayList<>();
//获取多个rowkey
gets.add(new Get(Bytes.toBytes("100001")));
gets.add(new Get(Bytes.toBytes("10002")));
Result[] results = table.get(gets);
for (Result result : results) {
for (Cell cell : result.rawCells()) {

}
}
}

//scan方法
public static void scanData(String tableName, String start, String stop) throws IOException {
Connection connection = HbaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
for (Cell cell : result.rawCells()) {
System.out.println(((Bytes.toString(CellUtil.cloneRow(cell)))));
System.out.println(Bytes.toString(CellUtil.cloneFamily(cell)));
}
}
}
}

//关于scan的过滤器 列族过滤
public static void filterData(String tableName) throws IOException {
Connection connection = HbaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
Filter filter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator("10001"));
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
for (Cell cell : result.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}

//关于scan过滤 值过滤 一般采用SingleColumnValueFilter
public static void filterData1(String tableName, String cf, String cm, String values) throws IOException {
Connection connection = HbaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
Filter filter = new SingleColumnValueFilter
(Bytes.toBytes(cf), Bytes.toBytes(cm), CompareOperator.EQUAL, Bytes.toBytes(values));
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
for (Cell cell : result.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}

//列族的过滤
public static void filterData2(String tableName, String cf) throws Exception {
Connection connection = HbaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
FamilyFilter filter = new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(cf)));
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
for (Cell cell : result.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}

public static void filterData3(String tableName, String cm) throws IOException {
Connection connection = HbaseUtils.getConnection();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
QualifierFilter filter = new QualifierFilter(CompareOperator.EQUAL, new SubstringComparator(cm));
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
for (Cell cell : result.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}
}

MR Interaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class HbaseBean1 implements Writable {
private String movie;
private double rate;
private long timeStamp;
private String uid;

public HbaseBean1() {
}

public String getMovie() {
return movie;
}

public void setMovie(String movie) {
this.movie = movie;
}

public double getRate() {
return rate;
}

public void setRate(double rate) {
this.rate = rate;
}

public long getTimeStamp() {
return timeStamp;
}

public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}

public String getUid() {
return uid;
}

public void setUid(String uid) {
this.uid = uid;
}

public HbaseBean1(String movie, double rate, long timeStamp, String uid) {
this.movie = movie;
this.rate = rate;
this.timeStamp = timeStamp;
this.uid = uid;
}

@Override
public String toString() {
return "Movie{" +
"movie='" + movie + '\'' +
", rate=" + rate +
", timeStamp=" + timeStamp +
", uid='" + uid + '\'' +
'}';
}


@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(movie);
dataOutput.writeDouble(rate);
dataOutput.writeLong(timeStamp);
dataOutput.writeUTF(uid);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.movie = dataInput.readUTF();
this.rate = dataInput.readDouble();
this.timeStamp = dataInput.readLong();
this.uid = dataInput.readUTF();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* Description:
* User: tongyongtao
* Date: 2020-10-12
* Time: 22:19
* Hbase与MR的交互
* 案例:电影案例 分析:设计rowkey ImmutableBytesWritable
*/
public class MR_Hbase1 {
public static void main(String[] args) throws Exception {
args = new String[]{"C:\\Users\\hp\\IdeaProjects\\GitHub_Maven\\src\\main\\resources\\hbase.json"};
Configuration con = HBaseConfiguration.create();
con.set("hbase.zookeeper.quorum", "linux03,linux04,linux05");
Job job = Job.getInstance(con, "MR_Hbase");
job.setMapperClass(MAP_Hbase1.class);
job.setReducerClass(Reduce_Hbase1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(HbaseBean1.class);
// 穿件扫描对象用来扫描源hbase中的所有的数据
Scan scan = new Scan();
// 接收的扫描的数据的行数
scan.setCaching(200);
scan.setCacheBlocks(false);
job.setJarByClass(HbaseBean1.class);

// 输入数据的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 插入数据的表要存在
TableMapReduceUtil.initTableReducerJob("movie1", Reduce_Hbase1.class, job);
System.out.println(job.waitForCompletion(true) ? "zq" : "cw");
}

public static class MAP_Hbase1 extends Mapper<LongWritable, Text, Text, HbaseBean1> {
static Gson gson = new Gson();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//电影名字和时间龊构成一个rowkey
HbaseBean1 hbaseBean = null;
Text text = null;
try {
hbaseBean = gson.fromJson(value.toString(), HbaseBean1.class);
String movie = StringUtils.leftPad(hbaseBean.getMovie(), 4, '0');
String movieStamp = StringUtils.leftPad( String.valueOf(hbaseBean.getTimeStamp()),9,'0');
text = new Text();
text.set(movie+"_"+movieStamp);
} catch (Exception e) {

}
context.write(text,hbaseBean);


}
}

public static class Reduce_Hbase1 extends TableReducer<Text, HbaseBean1, ImmutableBytesWritable> {

@Override
protected void reduce(Text key, Iterable<HbaseBean1> values, Context context) throws IOException, InterruptedException {
//key是rowkey values,迭代器只存储了一个
Put put = new Put(key.toString().getBytes());
for (HbaseBean1 value : values) {
put.addColumn("cf".getBytes(),"moviename".getBytes(),value.getMovie().getBytes());
put.addColumn("cf".getBytes(),"movierate".getBytes(),String.valueOf(value.getRate()).getBytes());
put.addColumn("cf".getBytes(),"moviestamp".getBytes(),String.valueOf(value.getTimeStamp()).getBytes());
put.addColumn("cf".getBytes(),"movieuid".getBytes(),value.getUid().getBytes());
}
context.write(null,put);


}
}
}

Coprocessor

Phoenix