HBase-MapReduce实战:利用MR将HBase中的fruit表导入到HBase中的fruit_mr表中

1.构建mapper类

ReadFruitMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package HBaseMR.HBaseToHBase;

import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
/**
* @ClassName ReadFruitMapper
* @MethodDesc:
* @Author Movle
* @Date 5/10/20 9:27 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/


public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {


@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
//将fruit的name和color提取出来,相当于将每一行数据读取出来放入到Put对象中。
Put put = new Put(key.get());
//遍历添加column行
for(Cell cell: value.rawCells()){
//添加/克隆列族:info
if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
//添加/克隆列:name
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//将该列cell加入到put对象中
put.add(cell);
//添加/克隆列:color
}else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//向该列cell加入到put对象中
put.add(cell);
}
}
}
//将从fruit读取到的每行数据写入到context中作为map的输出
context.write(key, put);
}
}

2.构建reduce类

WriteFruitMRReducer .java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package HBaseMR.HBaseToHBase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

/**
* @ClassName WriteFruitMRReducer
* @MethodDesc: TODO WriteFruitMRReducer功能介绍
* @Author Movle
* @Date 5/10/20 9:29 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/

public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put,NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
throws IOException, InterruptedException {
//读出来的每一行数据写入到fruit_mr表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}

}

3.构建runner

FruitToFruitMRRunner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package HBaseMR.HBaseToHBase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
* @ClassName FruitToFruitMRRunner
* @MethodDesc: TODO FruitToFruitMRRunner功能介绍
* @Author Movle
* @Date 5/10/20 9:32 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/


public class FruitToFruitMRRunner extends Configured implements Tool {

@Override
public int run(String[] strings) throws Exception {
//得到Configuration
Configuration conf = this.getConf();
//创建Job任务
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(FruitToFruitMRRunner.class);

//配置Job,创建一个扫描器
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);

//设置Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
TableMapReduceUtil.initTableMapperJob(
"fruit",
scan,
ReadFruitMapper.class,
ImmutableBytesWritable.class,
Put.class,
job
);
// TableMapReduceUtil.initTableMapperJob(
// "fruit", //读数据的表
// scan, //扫描器
// ReadFruitMapper.class, //设置map类
// ImmutableBytesWritable.class, //设置输出的key类型
// Put.class, //设置Mapper输出value值类型
// job //配置的job
// );
//设置Reducer
TableMapReduceUtil.initTableReducerJob(
"fruit_mr",
WriteFruitMRReducer.class,
job);
// TableMapReduceUtil.initTableReducerJob(
// "fruit_mr", //将数据戏写入的表
// WriteFruitMRReducer.class, //设置reduce类
// job);

//设置Reduce数量,最少1个
job.setNumReduceTasks(1);


boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new FruitToFruitMRRunner(), args);
System.exit(status);
}
}

4.打包上传到HBase集群并运行

1
/opt/module/hadoop-2.8.4/bin/yarn jar /opt/module/hbase-1.3.1/HBase-1.0-SNAPSHOT.jar  HBaseMR.HBaseToHBase.FruitToFruitMRRunner

提示:运行任务前,如果待数据导入的表不存在,则需要提前创建之

5.查看结果:

1
scan 'fruit_mr'

结果

打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2021 Movle
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信