1.构建mapper类
ReadFruitMapper.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package HBaseMR.HBaseToHBase;
import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes;
public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { Put put = new Put(key.get()); for(Cell cell: value.rawCells()){ if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){ if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ put.add(cell); }else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ put.add(cell); } } } context.write(key, put); } }
|
2.构建reduce类
WriteFruitMRReducer .java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package HBaseMR.HBaseToHBase;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put,NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for(Put put: values){ context.write(NullWritable.get(), put); } }
}
|
3.构建runner
FruitToFruitMRRunner.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| package HBaseMR.HBaseToHBase;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class FruitToFruitMRRunner extends Configured implements Tool {
@Override public int run(String[] strings) throws Exception { Configuration conf = this.getConf(); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(FruitToFruitMRRunner.class);
Scan scan = new Scan(); scan.setCacheBlocks(false); scan.setCaching(500);
TableMapReduceUtil.initTableMapperJob( "fruit", scan, ReadFruitMapper.class, ImmutableBytesWritable.class, Put.class, job );
TableMapReduceUtil.initTableReducerJob( "fruit_mr", WriteFruitMRReducer.class, job);
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true); if(!isSuccess){ throw new IOException("Job running with error"); } return isSuccess ? 0 : 1; }
public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int status = ToolRunner.run(conf, new FruitToFruitMRRunner(), args); System.exit(status); } }
|
4.打包上传到HBase集群并运行
1
| /opt/module/hadoop-2.8.4/bin/yarn jar /opt/module/hbase-1.3.1/HBase-1.0-SNAPSHOT.jar HBaseMR.HBaseToHBase.FruitToFruitMRRunner
|
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建之
5.查看结果:
结果