Fork me on GitHub

[LeetCode-面试02.05]链表求和

一.题目:

给定两个用链表表示的整数,每个节点包含一个数位。
这些数位是反向存放的,也就是个位排在链表首部。
编写函数对这两个整数求和,并用链表形式返回结果。

示例:
输入:(7 -> 1 -> 6) + (5 -> 9 -> 2),即617 + 295
输出:2 -> 1 -> 9,即912

进阶:假设这些数位是正向存放的,请再做一遍。
示例:
输入:(6 -> 1 -> 7) + (2 -> 9 -> 5),即617 + 295
输出:9 -> 1 -> 2,即912

二.题解:

1.第一种方法:

(1)解题思路:
  • 新建一个链表newHead,新建一个遍历JinWei,用来标识相加是否进位
  • 新建一个指针p指向newHead
  • 利用while循环遍历,
  • 当l1与l2都没有指向空时,sum=l1.val+l2.val+JinWei
  • 然后给链表newHead新增元素,p.next=new ListNode(sum%10)
  • 然后链表l1,l2,newHead指向下一位,JinWei=sum/10
  • 然后当l2遍历结束,然而l1还未结束时,循环遍历,sum=l1.val+JinWei
  • 然后链表newHead新增元素,p.next=new ListNode(sum%10)
  • 然后链表l1,newHead指向下一位,JinWei=sum/10
  • 同理,当l2未遍历结束时,与l1未遍历结束时一样,只是sum=l2.val+JinWei
  • 最好判断JinWei是否为1,若JinWei==1,则还需要新建元素p.next=new ListNode(1)
  • 最后返回新键的链表newHead.next即可
(2)代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Definition for singly-linked list.
* public class ListNode {
* int val;
* ListNode next;
* ListNode(int x) { val = x; }
* }
*/
class Solution {
public ListNode addTwoNumbers(ListNode l1, ListNode l2) {
if(l1==null){
return l2;
}
if(l2==null){
return l1;
}

ListNode newHead = new ListNode(-1);
ListNode p =newHead;

int JinWei=0;
while(l1!=null&&l2!=null){
int sum = l1.val+l2.val+JinWei;
p.next=new ListNode(sum%10);
JinWei=sum/10;

p=p.next;
l1=l1.next;
l2=l2.next;
}

while(l1!=null){
int sum = l1.val+JinWei;
p.next=new ListNode(sum%10);
JinWei=sum/10;
p=p.next;
l1=l1.next;
}
while(l2!=null){
int sum = l2.val+JinWei;
p.next=new ListNode(sum%10);
JinWei=sum/10;
p=p.next;
l2=l2.next;

}
if(JinWei==1){
p.next=new ListNode(JinWei);
}

return newHead.next;
}
}

[LeetCode-面试02.04]分割链表

一.题目:

编写程序以 x 为基准分割链表,使得所有小于 x 的节点排在大于或等于 x 的节点之前。如果链表中包含 x,x 只需出现在小于 x 的元素之后(如下所示)。分割元素 x 只需处于“右半部分”即可,其不需要被置于左右两部分之间。

示例:
输入: head = 3->5->8->5->10->2->1, x = 5
输出: 3->1->2->10->5->5->8

二.题解:

1.第一种方法:

(1)解题思路:
  • 先创建两个头结点分别用于连接小于部分和大于等于部分
  • 遍历该链表,若当前结点小于x,将其插到小于的链表上否则插到大于链表上。
  • 遍历完毕后,将大于等于部分链表的连到小于部分的后面即可
(2)代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Definition for singly-linked list.
* public class ListNode {
* int val;
* ListNode next;
* ListNode(int x) { val = x; }
* }
*/
class Solution {
public ListNode partition(ListNode head, int x) {
ListNode preHead = new ListNode(-1);
ListNode nextHead = new ListNode(-1);
ListNode preCurrent = preHead;
ListNode nextCurrent = nextHead;
ListNode current = head;

while(current != null){
if(current.val < x){
preCurrent.next = current;
preCurrent = preCurrent.next;
}else{
nextCurrent.next = current;
nextCurrent = nextCurrent.next;
}
current = current.next;
}
preCurrent.next=nextHead.next;
nextCurrent.next=null;

return preHead.next;
}
}

Sqoop集成HBase:Mysql TO HBase

一.Sqoop集成HBase

1.利用Sqoop在HBase和RDBMS中进行数据的转储。

2.相关参数:

参数 描述
–column-family <family> 设置导入的目标列族
–hbase-create-table 是否自动创建不存在的HBase表(这就意味着,不需要手动提前在HBase中先建立表)
–hbase-row-key <col> mysql中哪一列的值作为HBase的rowkey,如果rowkey是个组合键,则以逗号分隔(注:避免rowkey的重复)
–hbase-table <table-name> 指定数据将要导入到HBase中的哪张表中
–hbase-bulkload 是否允许bulk形式的导入

二. 案例

1.目标:将RDBMS中的数据抽取到HBase中

2.分步实现:

(1) 配置sqoop-env.sh,添加如下内容:
1
export HBASE_HOME=/opt/module/hbase-1.3.1
(2) 在Mysql中新建一个数据库db_library,一张表book

进入mysql

1
mysql -uroot -p000000

建库建表

1
2
3
4
5
6
CREATE DATABASE db_library;

CREATE TABLE db_library.book(
id int(4) PRIMARY KEY NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
price VARCHAR(255) NOT NULL);
(3) 向表中插入一些数据
1
2
3
INSERT INTO db_library.book (name, price) VALUES('Lie Sporting', '30');  
INSERT INTO db_library.book (name, price) VALUES('Pride & Prejudice', '70');
INSERT INTO db_library.book (name, price) VALUES('Fall of Giants', '50');

mysql

(4) 执行Sqoop导入数据的操作

手动创建HBase表

1
hbase> create 'hbase_book','info'
(5) 在HBase中scan这张表得到如下内容
1
hbase> scan 'hbase_book'

思考:尝试使用复合键作为导入数据时的rowkey。

(6)利用sqoop导入
1
2
3
4
5
6
7
8
9
10
11
12
bin/sqoop import \
--connect jdbc:mysql://hadoop2:3306/db_library \
--username root \
--password 000000 \
--table book \
--columns "id,name,price" \
--column-family "info" \
--hbase-create-table \
--hbase-row-key "id" \
--hbase-table "hbase_book" \
--num-mappers 1 \
--split-by id

sqoop导入

提示:sqoop1.4.6只支持HBase1.0.1之前的版本的自动创建HBase表的功能

(7)结果:
1
hbase> scan 'hbase_book'

结果

HBase与Hive的集成

一.HBase与Hive的对比

Hive Hbase
特点 类SQL 数据仓库 NoSQL (Key-value)
适用场景 离线数据分析和清洗 适合在线业务
延迟 延迟高 延迟低
存储位置 存储在HDFS 存储在HDFS

二.HBase与Hive集成使用

1.环境准备

因为后续会在操作Hive的同时对HBase也会产生影响,所以Hive需要持有操作HBase的Jar,那么接下来拷贝Hive所依赖的Jar包(或者使用软连接的形式),记得还有把zookeeper的jar包考入到hive的lib目录下。

(1)在/etc/profile文件中添加环境变量
1
2
3
#环境变量/etc/profile
export HBASE_HOME=/opt/module/hbase-1.3.1
export HIVE_HOME=/opt/module/hive-1.2.1
(2)进入hive的lib目录
1
cd /opt/module/hive-1.2.1/lib/
(3)进行软连接
1
2
3
4
5
6
7
8
ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar  $HIVE_HOME/lib/hbase-common-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar

软连接
软连接

(4)同时在hive-site.xml中修改zookeeper的属性,如下:
1
2
3
4
5
6
7
8
9
10
<property>
<name>hive.zookeeper.quorum</name>
<value>hadoop2,hadoop3,hadoop4</value>
<description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
<description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>

hive-site.xml

三.案例

1.案例一

目标:建立Hive表,关联HBase表,插入数据到Hive表的同时能够影响HBase表。
分步实现:

(1) 在Hive中创建表同时关联HBase
1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE hive_hbase_emp_table1(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table1");

提示:完成之后,可以分别进入Hive和HBase查看,都生成了对应的表

原有的hive-hbase-handler-1.2.1.jar可能有问题,换一个
结果
结果

(2) 在Hive中创建临时中间表,用于load文件中的数据

提示:不能将数据直接load进Hive所关联HBase的那张表中

1
2
3
4
5
6
7
8
9
10
CREATE TABLE emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by '\t';
(3) 向Hive中间表中load数据
1
hive> load data local inpath '/opt/TestFolder/emp.txt' into table emp;
(4) 通过insert命令将中间表中的数据导入到Hive关联HBase的那张表中
1
hive> insert into table hive_hbase_emp_table1 select * from emp;
(5) 查看Hive以及关联的HBase表中是否已经成功的同步插入了数据

Hive:

1
hive> select * from hive_hbase_emp_table1;

hive_hbase_emp_table1

HBase:

1
hbase> scan 'hbase_emp_table1'

hbase_emp_table1

2.案例二

目标:在HBase中已经存储了某一张表hbase_emp_table,然后在Hive中创建一个外部表来关联HBase中的hbase_emp_table这张表,使之可以借助Hive来分析HBase这张表中的数据。
注:该案例2紧跟案例1的脚步,所以完成此案例前,请先完成案例1。
分步实现:

(1) 在Hive中创建外部表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE EXTERNAL TABLE relevance_hbase_emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table1");
(2) 关联后就可以使用Hive函数进行一些分析操作了
1
hive (default)> select * from relevance_hbase_emp;

外部表

Phoenix集成HBase

一. Phoenix介绍

    可以把Phoenix理解为Hbase的查询引擎,phoenix,由saleforce.com开源的一个项目,后又捐给了Apache。它相当于一个Java中间件,帮助开发者,像使用jdbc访问关系型数据库一些,访问NoSql数据库HBase。

    phoenix,操作的表及数据,存储在hbase上。phoenix只是需要和Hbase进行表关联起来。然后再用工具进行一些读或写操作。

    其实,可以把Phoenix只看成一种代替HBase的语法的一个工具。虽然可以用java可以用jdbc来连接phoenix,然后操作HBase,但是在生产环境中,不可以用在OLTP中。在线事务处理的环境中,需要低延迟,而Phoenix在查询HBase时,虽然做了一些优化,但延迟还是不小。所以依然是用在OLAT中,再将结果返回存储下来。

二.安装

1.phoenix安装包解压缩,更换目录

1
2
3
tar -zxvf apache-phoenix-4.14.1-HBase-1.2-bin.tar.gz -C /opt/module

mv apache-phoenix-4.14.1-HBase-1.2-bin phoenix-4.14.1

2.修改环境变量

1
2
3
4
5
6
vi /etc/profile

#在最后两行加上如下phoenix配置
export PHOENIX_HOME=/opt/module/phoenix-4.14.1

export PATH=$PATH:$PHOENIX_HOME/bin

3.使环境变量配置生效

1
source /etc/profile

4.将主节点的phoenix包传到从节点

1
2
3
scp -r phoenix-4.14.1 root@bigdata13:/opt/module

scp -r phoenix-4.14.1 root@bigdata12:/opt/module

5.拷贝hbase-site.xml(注)三台都要

1
cp hbase-site.xml /opt/module/phoenix-4.14.1/bin/

将如下两个jar包,目录在/opt/module/phoenix-4.14.1下,拷贝到hbase的lib目录,目录在/opt/module/hbase-1.3.1/lib/
(注)三台都要

1
2
3
phoenix-4.10.0-HBase-1.2-server.jar

phoenix-core-4.10.0-HBase-1.2.jar

6.启动Phoenix

配置好之后重启下hbase。

1
bin/sqlline.py bigdata11:2181

三.基本命令

1.展示表

1
!table

2.创建表

1
2
3
4
5
create table test(id integer not null primary key,name varchar);

create table "Andy"(
id integer not null primary key,
name varchar);

3.删除表

1
drop table test;

4.插入数据

1
2
3
upsert into test values(1,'Andy');

upsert into users(name) values('toms');

5.查询数据

1
phoenix > select * from test;
1
hbase > scan 'test'

6.退出phoenix

1
!q

7.删除数据

1
delete from "Andy" where id=4;

8.sum函数的使用

1
select sum(id) from "Andy";

9.增加一列

1
alter table "Andy" add address varchar;

10.删除一列

1
alter table "Andy" drop column address;

其他语法详见:http://phoenix.apache.org/language/index.html

四.表映射

1.hbase中创建表

1
create 'teacher','info','contact'

2.插入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
put 'teacher','1001','info:name','Jack'

put 'teacher','1001','info:age','28'

put 'teacher','1001','info:gender','male'

put 'teacher','1001','contact:address','shanghai'

put 'teacher','1001','contact:phone','13458646987'

put 'teacher','1002','info:name','Jim'

put 'teacher','1002','info:age','30'

put 'teacher','1002','info:gender','male'

put 'teacher','1002','contact:address','tianjian'

put 'teacher','1002','contact:phone','13512436987'

3.在Phoenix创建映射表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create view "teacher"(

"ROW" varchar primary key,

"contact"."address" varchar,

"contact"."phone" varchar,

"info"."age" varchar,

"info"."gender" varchar,

"info"."name" varchar

);

4.在Phoenix查找数据

1
select * from "teacher";

HBase-MapReduce实战:利用MR将HDFS中的数据写入HBase中的fruit_mr2表

1.构建mapper

ReadFruitFromHDFSMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package HBaseMR.HDFSToHBase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* @ClassName ReadFruitFromHDFSMapper
* @MethodDesc: TODO ReadFruitFromHDFSMapper功能介绍
* @Author Movle
* @Date 5/10/20 10:16 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/


public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//从HDFS中读取的数据
String lineValue = value.toString();
//读取出来的每行数据使用\t进行分割,存于String数组
String[] values = lineValue.split("\t");

//根据数据中值的含义取值
String rowKey = values[0];
String name = values[1];
String color = values[2];

//初始化rowKey
ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));

//初始化put对象
Put put = new Put(Bytes.toBytes(rowKey));

//参数分别:列族、列、值
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.add(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));

context.write(rowKeyWritable, put);
}
}

2.构建reducer

WriteFruitMRFromTxtReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package HBaseMR.HDFSToHBase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

/**
* @ClassName WriteFruitMRFromTxtReducer
* @MethodDesc: TODO WriteFruitMRFromTxtReducer功能介绍
* @Author Movle
* @Date 5/10/20 10:18 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/


public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {

@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
//读出来的每一行数据写入到fruit_hdfs表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}
}

3.构建runner

Txt2FruitRunner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package HBaseMR.HDFSToHBase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
* @ClassName Txt2FruitRunner
* @MethodDesc: TODO Txt2FruitRunner功能介绍
* @Author Movle
* @Date 5/10/20 10:19 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/

public class Txt2FruitRunner extends Configured implements Tool {

@Override
public int run(String[] strings) throws Exception {
//得到Configuration
Configuration conf = this.getConf();

//创建Job任务
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Txt2FruitRunner.class);
Path inPath = new Path("hdfs://hadoop2:9000/input_fruit/fruit.tsv");
FileInputFormat.addInputPath(job, inPath);

//设置Mapper
job.setMapperClass(ReadFruitFromHDFSMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

//设置Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr2", WriteFruitMRFromTxtReducer.class, job);

//设置Reduce数量,最少1个
job.setNumReduceTasks(1);

boolean isSuccess = job.waitForCompletion(true);
if (!isSuccess) {
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new Txt2FruitRunner(), args);
System.exit(status);
}
}

4.打包上传到Hbase集群并运行:

1
/opt/module/hadoop-2.8.4/bin/yarn jar /opt/module/hbase-1.3.1/HBase-1.0-SNAPSHOT.jar  HBaseMR.HDFSToHBase.Txt2FruitRunner

5.查看结果:

1
scan 'fruit_mr2'

结果

HBase-MapReduce实战:利用MR将HBase中的fruit表导入到HBase中的fruit_mr表中

1.构建mapper类

ReadFruitMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package HBaseMR.HBaseToHBase;

import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
/**
* @ClassName ReadFruitMapper
* @MethodDesc:
* @Author Movle
* @Date 5/10/20 9:27 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/


public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {


@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
//将fruit的name和color提取出来,相当于将每一行数据读取出来放入到Put对象中。
Put put = new Put(key.get());
//遍历添加column行
for(Cell cell: value.rawCells()){
//添加/克隆列族:info
if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
//添加/克隆列:name
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//将该列cell加入到put对象中
put.add(cell);
//添加/克隆列:color
}else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//向该列cell加入到put对象中
put.add(cell);
}
}
}
//将从fruit读取到的每行数据写入到context中作为map的输出
context.write(key, put);
}
}

2.构建reduce类

WriteFruitMRReducer .java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package HBaseMR.HBaseToHBase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

/**
* @ClassName WriteFruitMRReducer
* @MethodDesc: TODO WriteFruitMRReducer功能介绍
* @Author Movle
* @Date 5/10/20 9:29 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/

public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put,NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
throws IOException, InterruptedException {
//读出来的每一行数据写入到fruit_mr表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}

}

3.构建runner

FruitToFruitMRRunner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package HBaseMR.HBaseToHBase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
* @ClassName FruitToFruitMRRunner
* @MethodDesc: TODO FruitToFruitMRRunner功能介绍
* @Author Movle
* @Date 5/10/20 9:32 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/


public class FruitToFruitMRRunner extends Configured implements Tool {

@Override
public int run(String[] strings) throws Exception {
//得到Configuration
Configuration conf = this.getConf();
//创建Job任务
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(FruitToFruitMRRunner.class);

//配置Job,创建一个扫描器
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);

//设置Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
TableMapReduceUtil.initTableMapperJob(
"fruit",
scan,
ReadFruitMapper.class,
ImmutableBytesWritable.class,
Put.class,
job
);
// TableMapReduceUtil.initTableMapperJob(
// "fruit", //读数据的表
// scan, //扫描器
// ReadFruitMapper.class, //设置map类
// ImmutableBytesWritable.class, //设置输出的key类型
// Put.class, //设置Mapper输出value值类型
// job //配置的job
// );
//设置Reducer
TableMapReduceUtil.initTableReducerJob(
"fruit_mr",
WriteFruitMRReducer.class,
job);
// TableMapReduceUtil.initTableReducerJob(
// "fruit_mr", //将数据戏写入的表
// WriteFruitMRReducer.class, //设置reduce类
// job);

//设置Reduce数量,最少1个
job.setNumReduceTasks(1);


boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new FruitToFruitMRRunner(), args);
System.exit(status);
}
}

4.打包上传到HBase集群并运行

1
/opt/module/hadoop-2.8.4/bin/yarn jar /opt/module/hbase-1.3.1/HBase-1.0-SNAPSHOT.jar  HBaseMR.HBaseToHBase.FruitToFruitMRRunner

提示:运行任务前,如果待数据导入的表不存在,则需要提前创建之

5.查看结果:

1
scan 'fruit_mr'

结果

HBase-MapReduce案例:统计表中数据,使用MapReduce将本地数据导入Hbase

一.HBase的MapReduce任务过程

1.查看HBase的MapReduce任务所需的依赖:

1
2
3
cd /opt/module/hbase-1.3.1

bin/hbase mapredcp

2.执行环境变量的导入

1
2
export HBASE_HOME=/opt/module/hbase-1.3.1
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

3.运行官方的MapReduce任务

二.案例一:统计default:student表中有多少行数据

1
2
3
cd /opt/module/hbase-1.3.1

/opt/module/hadoop-2.8.4/bin/yan jar lib/hbase-server-1.3.1.jar rowcounter default:student

命令
结果

三.案例二:使用MapReduce将本地数据导入到HBase

(1) 在本地创建一个tsv格式的文件:fruit.tsv,自己建表用\t分割数据
1
2
3
1001	Apple	Red
1002 Pear Yellow
1003 Pineapple Yellow

尖叫提示:上面的这个数据不要从word中直接复制,有格式错误

(2) 创建HBase表
1
hbase(main):001:0> create 'fruit','info'
(3) 在HDFS中创建input_fruit文件夹并上传fruit.tsv文件
1
2
hdfs dfs -mkdir /input_fruit/
hdfs dfs -put fruit.tsv /input_fruit/
(4) 执行MapReduce到HBase的fruit表中
1
2
3
/opt/module/hadoop-2.8.4/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop2:9000/input_fruit

执行

(5) 使用scan命令查看导入后的结果
1
hbase(main):001:0> scan 'fruit' 

结果

HBase工具类

1.HBASEUtil.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package HBaseUtil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Iterator;
import java.util.TreeSet;
/**
* @ClassName HBASEUTIL
* @MethodDesc: TODO HBASEUTIL功能介绍
* @Author Movle
* @Date 5/10/20 8:04 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/


public class HBASEUtil {

/**
* 创建命名
* */
public static void create_Namecpace(Configuration conf,String namecpace) throws IOException {
//获取hbase的客户端
Connection connection = ConnectionFactory.createConnection(conf);
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
//创建命名描述器
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namecpace).
addConfiguration("author","plus").build();

//创建命名空间
admin.createNamespace(namespaceDescriptor);
System.out.println("初始化命名空间");
close(admin, connection);
}

/**
* 关闭资源
* */
private static void close(Admin admin, Connection connection) throws IOException {
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
}

/**
* 创建HBase的表
* @param conf
* @param tableName
* @param regions
* @param columnFamily
*/
public static void createTable(Configuration conf, String tableName, int regions, String... columnFamily) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
//判断表
if (isExistTable(conf, tableName)) {
return;
}
//表描述器 HTableDescriptor

HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
for (String cf : columnFamily) {
//列描述器 :HColumnDescriptor
htd.addFamily(new HColumnDescriptor(cf));
}
//htd.addCoprocessor("hbase.CalleeWriteObserver");
//创建表
admin.createTable(htd,genSplitKeys(regions));
System.out.println("已建表");
//关闭对象
close(admin,connection);
}

/**
* 分区键
* @param regions region个数
* @return splitKeys
*/
private static byte[][] genSplitKeys(int regions) {
//存放分区键的数组
String[] keys = new String[regions];
//格式化分区键的形式 00 01 02
DecimalFormat df = new DecimalFormat("000");
for (int i = 0; i < regions; i++) {
keys[i] = df.format(i*10) + "";
}


byte[][] splitKeys = new byte[regions][];
//排序 保证你这个分区键是有序得
TreeSet<byte[]> treeSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for (int i = 0; i < regions; i++) {
treeSet.add(Bytes.toBytes(keys[i]));
}

//输出
Iterator<byte[]> iterator = treeSet.iterator();
int index = 0;
while (iterator.hasNext()) {
byte[] next = iterator.next();
splitKeys[index++]= next;
}

return splitKeys;
}


/**
* 判断表是否存在
* @param conf 配置 conf
* @param tableName 表名
*/
public static boolean isExistTable(Configuration conf, String tableName) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();


boolean result = admin.tableExists(TableName.valueOf(tableName));
close(admin, connection);
return result;
}
}
2.HBaseDAO.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package HBaseUtil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
/**
* @ClassName HBaseDAO
* @MethodDesc:
* @Author Movle
* @Date 5/10/20 8:06 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/


public class HBaseDAO {

private static String namespace = PropertiesUtil.getProperty("hbase.calllog.namespace");
private static String tableName = PropertiesUtil.getProperty("hbase.calllog.tablename");
private static Integer regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.calllog.regions"));

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.zookeeper.quorum", "hadoop2");
conf.set("zookeeper.znode.parent", "/hbase");

if (!HBASEUtil.isExistTable(conf, tableName)) {
HBASEUtil.create_Namecpace(conf, namespace);
HBASEUtil.createTable(conf, tableName, regions, "f1", "f2");
}
}
}
3.PropertiesUtil.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package HBaseUtil;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* @ClassName PropertiesUtil
* @MethodDesc: TODO PropertiesUtil功能介绍
* @Author Movle
* @Date 5/10/20 8:07 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/


public class PropertiesUtil {

public static Properties properties = null;
static {
//获取配置文件、方便维护
InputStream is = ClassLoader.getSystemResourceAsStream("hbase_consumer.properties");
properties = new Properties();

try {
properties.load(is);
} catch (IOException e) {
e.printStackTrace();
}
}


/**
* 获取参数值
* @param key 名字
* @return 参数值
*/
public static String getProperty(String key){
return properties.getProperty(key);
}
}
4.hbase_consumer.properties
1
2
3
hbase.calllog.namespace=plus
hbase.calllog.tablename=plus:split
hbase.calllog.regions=5

HBase之API

0.用途

通过HBase的相关JavaAPI可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到HBase的表中,比如从HBase中读取一些原始数据后使用MapReduce做数据分析

1.pom.xml

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>

2.编写HBaseAPI

(1)获取Configuration对象
1
2
3
4
5
6
7
8
public static Configuration conf;
static{
//使用HBaseConfiguration的单例方法实例化
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "bigdata111");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
}
(2)判断表是否存在
1
2
3
4
5
6
7
public static boolean isTableExist(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
//在HBase中管理、访问表需要先创建HBaseAdmin对象
Connection connection = ConnectionFactory.createConnection(conf);
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
//HBaseAdmin admin = new HBaseAdmin(conf);
return admin.tableExists(tableName);
}
(3)创建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void createTable(String tableName, String... columnFamily) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
HBaseAdmin admin = new HBaseAdmin(conf);
//判断表是否存在
if(isTableExist(tableName)){
System.out.println("表" + tableName + "已存在");
//System.exit(0);
}else{
//创建表属性对象,表名需要转字节
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
//创建多个列族
for(String cf : columnFamily){
descriptor.addFamily(new HColumnDescriptor(cf));
}
//根据对表的配置,创建表
admin.createTable(descriptor);
System.out.println("表" + tableName + "创建成功!");
}
}
(4)删除表
1
2
3
4
5
6
7
8
9
10
public static void dropTable(String tableName) throws Exception{
HBaseAdmin admin = new HBaseAdmin(conf);
if(isTableExist(tableName)){
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("表" + tableName + "删除成功!");
}else{
System.out.println("表" + tableName + "不存在!");
}
}
(5)向表中插入数据
1
2
3
4
5
6
7
8
9
10
11
public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws Exception{
//创建HTable对象
HTable hTable = new HTable(conf, tableName);
//向表中插入数据
Put put = new Put(Bytes.toBytes(rowKey));
//向Put对象中组装数据
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
hTable.put(put);
hTable.close();
System.out.println("插入数据成功");
}
(6)删除多行数据
1
2
3
4
5
6
7
8
9
10
11
public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws Exception{
//创建HTable对象
HTable hTable = new HTable(conf, tableName);
//向表中插入数据
Put put = new Put(Bytes.toBytes(rowKey));
//向Put对象中组装数据
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
hTable.put(put);
hTable.close();
System.out.println("插入数据成功");
}
(7)得到所有数据
1
2
3
4
5
6
7
8
9
10
11
public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws Exception{
//创建HTable对象
HTable hTable = new HTable(conf, tableName);
//向表中插入数据
Put put = new Put(Bytes.toBytes(rowKey));
//向Put对象中组装数据
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
hTable.put(put);
hTable.close();
System.out.println("插入数据成功");
}
(8)得到某一行所有数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void getRow(String tableName, String rowKey) throws IOException{
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
//get.setMaxVersions();显示所有版本
//get.setTimeStamp();显示指定时间戳的版本
Result result = table.get(get);
for(Cell cell : result.rawCells()){
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("时间戳:" + cell.getTimestamp());
}
}
(9)获取某一行指定”列族:列”的数据
1
2
3
4
5
6
7
8
9
10
11
12
public static void getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException{
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
Result result = table.get(get);
for(Cell cell : result.rawCells()){
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
(10)完整代码HBASE_API.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package HBase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;


public class HBASE_API {
//获取配置文件
private static Configuration conf = null;
/**
* 初始化
* */
static {
//使用hbaseconfiguration获取conf
conf = HBaseConfiguration.create();
//注意使用主机名之前,确保win的hosts配置文件中配置Linux对应的ip和主机名的映射
conf.set("hbase.zookeeper.quorum","192.168.31.132");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
}
/**
* 判断表是否存在
* */
public static boolean is_table_exists(String table_Name) throws IOException {
//1.创建hbase的客户端
Connection connection = ConnectionFactory.createConnection(conf);
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
//2.判断表是否存在meta
return admin.tableExists(Bytes.toBytes(table_Name));

}

public static void main(String[] args) throws IOException {
//System.out.println(is_table_exists("aa"));;
//create_table("idea1","cf1","cf2","cf3");
// for (int i = 0;i<100;i++){
// put_table("idea1",String.valueOf(i),"cf1","name","plus"+i);
// }
scan_table("idea1");
}
/**
* 创建表:表名,列簇可以是多个
* */
public static void create_table(String table_name,String... columnFamily) throws IOException {
//1.创建hbase的客户端
Connection connection = ConnectionFactory.createConnection(conf);
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();

if(is_table_exists(table_name)){
System.out.println(table_name+"已存在");
return;
}else {
//创建表,创建一个表描述对象
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(table_name));
//创建列簇
for (String cf : columnFamily){
hTableDescriptor.addFamily(new HColumnDescriptor(cf));
}
admin.createTable(hTableDescriptor);
System.out.println(table_name+"创建成功");
}
}
/**
* 插入数据:表名,rowkey,列簇,列,value
* */
public static void put_table(String name,String row,
String columnfamily,String column,
String value) throws IOException {
//创建Htable
HTable hTable = new HTable(conf, name);
//创建put对象
Put put = new Put(Bytes.toBytes(row));
//添加列簇、列、数据
put.add(Bytes.toBytes(columnfamily),Bytes.toBytes(column),Bytes.toBytes(value));
hTable.put(put);
hTable.close();

}
/**
* 扫描数据
* */
public static void scan_table(String table_Name) throws IOException {
HTable hTable = new HTable(conf, table_Name);

//创建一个scan扫描region
Scan scan = new Scan();
//用htable创建resultScanner对象
ResultScanner scanner = hTable.getScanner(scan);
for (Result result: scanner){
Cell[] cells = result.rawCells();
for (Cell cell:cells){
System.out.println(Bytes.toString(CellUtil.cloneRow(cell))+"\t"+
Bytes.toString(CellUtil.cloneFamily(cell))+","+
Bytes.toString(CellUtil.cloneQualifier(cell))+","+
Bytes.toString(CellUtil.cloneValue(cell)));
}
}

}
public static void deleteMultiRow(String tableName,String... rows) throws IOException {
HTable hTable = new HTable(conf,tableName);
List<Delete> deleteList = new ArrayList<Delete>();

for(String row:rows){
Delete delete = new Delete(Bytes.toBytes(row));

deleteList.add(delete);
}
hTable.delete(deleteList);
hTable.close();
}
}
  • Copyrights © 2015-2021 Movle
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信