Fork me on GitHub

ZooKeeper实战-分布式秒杀

1.添加所需依赖:

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 <!-- zookeeper所需依赖 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>

2.编写代码:

(1)不启动锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package SimpleZKlock;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class TestDistributedLock {

//定义共享资源
private static int count = 10;

private static void printCountNumber() {
System.out.println("***********" + Thread.currentThread().getName() + "**********");
System.out.println("当前值:" + count);
count--;

//睡2秒
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("***********" + Thread.currentThread().getName() + "**********");
}


public static void main(String[] args) {
//定义客户端重试的策略
RetryPolicy policy = new ExponentialBackoffRetry(1000, //每次等待的时间
10); //最大重试的次数

//定义ZK的一个客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.1.121:2181")
.retryPolicy(policy)
.build();

//在ZK生成锁 ---> 就是ZK的目录
client.start();
final InterProcessMutex lock = new InterProcessMutex(client, "/mylock");

// 启动10个线程去访问共享资源
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {

public void run() {
try {
//请求得到锁
//lock.acquire();
//访问共享资源
printCountNumber();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
//释放锁
try {
// lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
}
}
}

不启用锁

(2)启用锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package SimpleZKlock;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class TestDistributedLock {

//定义共享资源
private static int count = 10;

private static void printCountNumber() {
System.out.println("***********" + Thread.currentThread().getName() + "**********");
System.out.println("当前值:" + count);
count--;

//睡2秒
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("***********" + Thread.currentThread().getName() + "**********");
}


public static void main(String[] args) {
//定义客户端重试的策略
RetryPolicy policy = new ExponentialBackoffRetry(1000, //每次等待的时间
10); //最大重试的次数

//定义ZK的一个客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.1.121:2181")
.retryPolicy(policy)
.build();

//在ZK生成锁 ---> 就是ZK的目录
client.start();
final InterProcessMutex lock = new InterProcessMutex(client, "/mylock");

// 启动10个线程去访问共享资源
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {

public void run() {
try {
//请求得到锁
lock.acquire();
//访问共享资源
printCountNumber();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
//释放锁
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
}
}
}

启用锁

ZooKeeper客户端命令行操作

0.基础语法

命令基本语法 功能描述
help 显示所有操作命令
ls path [watch] 使用 ls 命令来查看当前znode中所包含的内容
ls2 path [watch] 查看当前节点数据并能看到更新次数等数据
create 普通创建(永久节点)
-s 含有序列
-e 临时(重启或者超时消失)
get path [watch] 获得节点的值
set 设置节点的具体值
stat 查看节点状态
delete 删除节点
rmr 递归删除节点

1.启动客户端

1
bin/zkCli.sh

2.显示所有操作命令

1
help

help

3.查看当前znode中所包含的内容

1
ls /

ls /

4.查看当前节点数据并能看到更新次数等数据

1
ls2 /

ls2 /

5.创建普通节点

1
create /app1 "hello app1"

create /app1 "hello app1"

1
create /app1/server101 "192.168.1.101"

create /app1/server101 "192.168.1.101"

6.获得节点的值

1
get /app1

get /app1

1
get /app1/server101

get /app1/server101

7.创建短暂节点

1
create -e /app-emphemeral 8888

(1)在当前客户端是能查看到的

1
ls /

ls /

(2)退出当前客户端然后再重启客户端

1
2
3
quit

bin/zkCli.sh

(3)再次查看根目录下短暂节点已经删除

1
ls /

ls /

8.创建带序号的节点

(1)先创建一个普通的根节点app2

1
create /app2 "app2"

(2)创建带序号的节点

1
create -s /app2/aa 888
1
create -s /app2/bb 888
1
create -s /app2/cc 888

如果原节点下有1个节点,则再排序时从1开始,以此类推。

1
create -s /app1/aa 888

9.修改节点数据值

1
set /app1 999

set /app1 999

10.节点的值变化监听

(1)在104主机上注册监听/app1节点数据变化

1
get /app1 watch

get /app1 watch

(2)在103主机上修改/app1节点的数据

1
set /app1  777

(3)观察104主机收到数据变化的监听

11.节点的子节点变化监听(路径变化)

(1)在104主机上注册监听/app1节点的子节点变化

1
ls /app1 watch

image.png

(2)在103主机/app1节点上创建子节点

1
create /app1/bb 666

(3)观察104主机收到子节点变化的监听

12.删除节点

1
delete /app1/bb

13.递归删除节点

1
rmr /app2

14.查看节点状态

1
stat /app1

ZooKeeper之zoo.cfg配置参数解读

1.解读zoo.cfg文件中参数含义

参数解读

(1)tickTime=2000:通信心跳数,Zookeeper服务器心跳时间,单位毫秒

    Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。
它用于心跳机制,并且设置最小的session超时时间为两倍心跳时间。(session的最小超时时间是2*tickTime)

(2)initLimit=10:Leader和Follower初始通信时限

    集群中的follower跟随者服务器与leader领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的Zookeeper服务器连接到Leader的时限。
投票选举新leader的初始化时间
    Follower在启动过程中,会从Leader同步所有最新数据,然后确定自己能够对外服务的起始状态。
    Leader允许Follower在initLimit时间内完成这个工作。

(3)syncLimit=5:Leader和Follower同步通信时限

    集群中Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。
    在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。
    如果L发出心跳包在syncLimit之后,还没有从F那收到响应,那么就认为这个F已经不在线了。

(4)dataDir:数据文件目录+数据持久化路径

    保存内存数据库快照信息的位置,如果没有其他说明,更新的事务日志也保存到数据库。

(5)clientPort=2181:客户端连接端口

    监听客户端连接的端口

Zookeeper之内部原理

一.选举机制

Server ID: myid(权重越大)
Zxid:数据ID(先一数据低进行选择)

1.半数机制(Paxos 协议):

    集群中半数以上机器存活,集群可用。所以zookeeper适合装在奇数台机器上。

2.Zookeeper虽然在配置文件中并没有指定master和slave。

    但是,zookeeper工作时,是有一个节点为leader,其他则为follower,Leader是通过内部的选举机制临时产生的。

3.以一个简单的例子来说明整个选举的过程。

    假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么。

  • (1)服务器1启动,此时只有它一台服务器启动了,它发出去的报没有任何响应,所以它的选举状态一直是LOOKING状态。
  • (2)服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态。
  • (3)服务器3启动,根据前面的理论分析,服务器3成为服务器1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的leader。
  • (4)服务器4启动,根据前面的分析,理论上服务器4应该是服务器1、2、3、4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了。
  • (5)服务器5启动,同4一样当小弟。

二.节点类型

1.Znode有两种类型:

  • 短暂(ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
  • 持久(persistent):客户端和服务器端断开连接后,创建的节点不删除

2.Znode有四种形式的目录节点(默认是persistent)

(1)持久化目录节点(PERSISTENT)(小写:persistent)

    客户端与zookeeper断开连接后,该节点依旧存在。

(2)持久化顺序编号目录节点(PERSISTENT_SEQUENTIAL)(小写:persistent_sequential)

    客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号。

(3)临时目录节点(EPHEMERAL)(ephemeral)

    客户端与zookeeper断开连接后,该节点被删除。

(4)临时顺序编号目录节点(EPHEMERAL_SEQUENTIAL)(ephemeral_sequential)

    客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号。

3.创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护

4.在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

三.stat结构体

1.czxid- 引起这个znode创建的zxid,创建节点的事务的zxid

    每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。
    事务ID是ZooKeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。

2.ctime - znode被创建的毫秒数(从1970年开始)

3.mzxid - znode最后更新的zxid

4.mtime - znode最后修改的毫秒数(从1970年开始)

5.pZxid-znode最后更新的子节点zxid

6.cversion - znode子节点变化号,znode子节点修改次数

7.dataversion - znode数据变化号

8.aclVersion - znode访问控制列表的变化号

9.ephemeralOwner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。

10.dataLength- znode的数据长度

11.numChildren - znode子节点数量

四. 监听器原理

1.监听原理详解:

(1) 首先要有一个main()线程

(2) 在main线程中创建ZK客户端,这是会创建两个线程,一个负责网络连接通信(connect),一个负责监听(listener)

(3) 通过connect线程将注册的监听事件发送给ZK

(4) 在ZK的注册监听器列表中将注册的监听事件添加到列表中

(5) ZK监听到有数据或路径发生变化时,就会将这个消息发送给listener线程

(6) Listener线程内部调用process()方法

2.常见的监听

1.监听节点数据的变化

1
Get path [watch]

2.监听子节点增减的变化

1
Ls path [watch]

监听器原理

五. 写数据流程

写数据流程

    读是局部性的,即client只需要从与它相连的server上读取数据即可;而client有写请求的话,与之相连的server会通知leader,然后leader会把写操作分发给所有server。所以定要比读慢很多。

ZooKeeper分布式安装部署

1.集群规划

在hadoop1、hadoop2和hadoop3三个节点上部署Zookeeper。

2.解压安装

(1)解压zookeeper安装包到/opt/module/目录下

1
tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/

(2)在/opt/module/zookeeper-3.4.10/这个目录下创建zkData

1
mkdir -p zkData

(3)重命名/opt/module/zookeeper-3.4.10/conf这个目录下的zoo_sample.cfg为zoo.cfg

1
mv zoo_sample.cfg zoo.cfg

3.配置zoo.cfg文件

(1)具体配置

1
vi /opt/module/zookeeper-3.4.10/conf/zoo.cfg

修改内容:

1
dataDir=/opt/module/zookeeper-3.4.10/zkData

增加如下配置

1
2
3
4
5
6
7
#######################cluster##########################

server.1=hadoop1:2888:3888

server.2=hadoop2:2888:3888

server.3=hadoop3:2888:3888

zoo.cfg

(2)配置参数解读

1
Server.A=B:C:D。
  • A是一个数字,表示这个是第几号服务器;

  • B是这个服务器的ip地址;

  • C是这个服务器与集群中的Leader服务器交换信息的端口;

  • D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

    集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。

3.集群操作

(1)在/opt/module/zookeeper-3.4.10/zkData目录下创建一个myid的文件

1
touch myid

添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码

(2)编辑myid文件

1
vi myid

在文件中添加与server对应的编号:如1
1
myid

(3)拷贝配置好的zookeeper到其他机器上
并分别修改myid文件中内容为2、3,并修改环境变量,和hadoop1一样

1
2
3
scp -r /opt/module/zookeeper-3.4.10/ root@hadoop2:/opt/module/

scp -r /opt/module/zookeeper-3.4.10/ root@hadoop3:/opt/module/

拷贝
2
myid
3
myid
(4)三台都修改环境变量

1
vi /etc/profile

修改内容:

1
2
export ZOOKEEPER_HOME=/opt/module/zookeeper-3.4.10
export PATH=$PATH:$ZOOKEEPER_HOME/bin

使环境变量生效:

1
source /etc/profile

修改环境变量

(5)分别启动zookeeper
hadoop1:

1
bin/zkServer.sh start

hadoop2

1
bin/zkServer.sh start

hadoop3

1
bin/zkServer.sh start

(5)查看状态

1
bin/zkServer.sh status

hadoop1查看状态

1
bin/zkServer.sh status

hadoop2查看状态

1
bin/zkServer.sh status

hadoop3查看状态

ZooKeeper本地模式安装

1.安装前准备:

(1)安装jdk
(2)上传zookeeper到linux系统下
(3)解压到指定目录

1
tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/

解压

(4)配置环境变量

输入命令:

1
vi /etc/profile

添加内容:

1
2
export ZOOKEEPER_HOME=/opt/module/zookeeper-3.4.10
export PATH=$PATH:$ZOOKEEPER_HOME/bin

环境变量

2.配置修改

(1)将/opt/module/zookeeper-3.4.10/conf这个路径下的zoo_sample.cfg修改为zoo.cfg;

1
mv zoo_sample.cfg  zoo.cfg

(2)进入zoo.cfg文件:

1
vim zoo.cfg

(3)修改dataDir路径为

1
dataDir=/opt/module/zookeeper-3.4.10/zkData

(4)在/opt/module/zookeeper-3.4.10/这个目录上创建zkData文件夹

1
2
3
cd /opt/module/zookeeper-3.4.10 

mkdir zkData

创建文件夹

3.操作zookeeper

(1)启动zookeeper
1
bin/zkServer.sh start
(2)查看进程是否启动
1
jps
(3)查看状态:
1
bin/zkServer.sh status

(4)启动客户端:
1
bin/zkCli.sh

启动客户端

(5)退出客户端:
1
quit

退出客户端

(6)停止zookeeper
1
bin/zkServer.sh stop

停止zookeeper

Zookeeper概述

一.概述:

    Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

二.特点:

(1)Zookeeper:一个领导者(leader),多个跟随者(follower)组成的集群。

(2)Leader负责进行投票的发起和决议,更新系统状态。

(3)Follower用于接收客户请求并向客户端返回结果,在选举Leader过程中参与投票。

(4)集群中奇数台服务器只要有半数以上节点存活,Zookeeper集群就能正常服务。

(5)全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的。

(6)更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行。

(7)数据更新原子性,一次数据更新要么成功,要么失败。

(8)实时性,在一定时间范围内,client能读到最新数据。

三.数据结构:

    ZooKeeper数据模型的结构与Unix文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode。每一个ZNode默认能够存储1MB的元数据,每个ZNode都可以通过其路径唯一标识

ZooKeeper数据结构

四.应用场景:

1.统一命名服务

统一命名服务

2.统一配置管理

(1).分布式环境下,配置文件管理和同步是一个常见问题

(a)一个集群中,所有节点的配置信息是一致的,比如hadoop集群

(b)对配置文件修改后,希望能够快速同步到各个节点上

(2).配置管理可交由ZK实现

(a)可配置信息写入ZK上的一个Znode

(b)各个节点监听这个ZNode

(c )一旦Znode中的数据被修改,ZK将通知各个节点

配置管理

3.统一集群管理

集群管理结构图如下所示。

(1).分布式环境中,实时掌握每个节点的状态是必要的。

(a)可根据节点实时做出一些调整

(2).可交由Zk实现

(a)可将节点信息写入ZK上的一个ZNode

(b)监听这个Znode可获取它的实时状态变化

(3).典型应用

(a)HBase中Master状态监控与选举

集群管理

4.服务器节点动态上下线

服务器动态上下线

5.软负载均衡

软负载均衡

Hadoop之优化

一.MapReduce 跑的慢的原因

Mapreduce 程序效率的瓶颈在于两点:

1.计算机性能

CPU、内存、磁盘健康、网络

2.I/O 操作优化

(1)数据倾斜
(2)map和reduce数设置不合理
(3)map运行时间太长,导致reduce等待过久
(4)小文件过多
(5)大量的不可分块的超大文件
(6)spill次数过多
(7)merge次数过多等。

二. MapReduce优化方法

MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。

1.数据输入

(1)合并小文件:在执行mr任务前将小文件进行合并,大量的小文件会产生大量的map任务,增大map任务装载次数,而任务的装载比较耗时,从而导致mr运行较慢。

(2)采用CombineTextInputFormat来作为输入,解决输入端大量小文件场景。

2 Map阶段

(1)减少溢写(spill)次数:

通过调整io.sort.mb及sort.spill.percent参数值,增大触发spill的内存上限,减少spill次数,从而减少磁盘IO。

(2)减少合并(merge)次数:

通过调整io.sort.factor参数,增大merge的文件数目,减少merge的次数,从而缩短mr处理时间。

(3)在map之后,不影响业务逻辑前提下,先进行combine处理,减少 I/O。

3 Reduce阶段

(1)合理设置map和reduce数:

    两个都不能设置太少,也不能设置太多。太少,会导致task等待,延长处理时间;太多,会导致 map、reduce任务间竞争资源,造成处理超时等错误。

(2)设置map、reduce共存:

    调整slowstart.completedmaps参数,使map运行到一定程度后,reduce也开始运行,减少reduce的等待时间。

(3)规避使用reduce:

    因为reduce在用于连接数据集的时候将会产生大量的网络消耗。

(4)合理设置reduce端的buffer:

    默认情况下,数据达到一个阈值的时候,buffer中的数据就会写入磁盘,然后reduce会从磁盘中获得所有的数据。也就是说,buffer和reduce是没有直接关联的,中间多个一个写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得buffer中的一部分数据可以直接输送到reduce,从而减少IO开销:mapred.job.reduce.input.buffer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读buffer中的数据直接拿给reduce使用。这样一来,设置buffer需要内存,读取数据需要内存,reduce计算也要内存,所以要根据作业的运行情况进行调整。

4 IO传输

(1)采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZO压缩编码器。

(2)使用SequenceFile二进制文件。

5 数据倾斜问题

(1)数据倾斜现象
  • 数据频率倾斜——某一个区域的数据量要远远大于其他区域。
  • 数据大小倾斜——部分记录的大小远远大于平均值。
(2)如何收集倾斜数据

在reduce方法中加入记录map输出键的详细情况的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static final String MAX_VALUES = "skew.maxvalues"; 
private int maxValueThreshold;

@Override
public void configure(JobConf job) {
maxValueThreshold = job.getInt(MAX_VALUES, 100);
}
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
int i = 0;
while (values.hasNext()) {
values.next();
i++;
}

if (++i > maxValueThreshold) {
log.info("Received " + i + " values for key " + key);
}
}
(3)减少数据倾斜的方法
  • 方法1:抽样和范围分区
    可以通过对原始数据进行抽样得到的结果集来预设分区边界值。

  • 方法2:自定义分区
    基于输出键的背景知识进行自定义分区。例如,如果map输出键的单词来源于一本书。且其中某几个专业词汇较多。那么就可以自定义分区将这这些专业词汇发送给固定的一部分reduce实例。而将其他的都发送给剩余的reduce实例。

  • 方法3:Combine
    使用Combine可以大量地减小数据倾斜。在可能的情况下,combine的目的就是聚合并精简数据。

  • 方法4:采用Map Join,尽量避免Reduce Join。

6 常用的调优参数

(1)资源相关参数
(a).以下参数是在用户自己的mr应用程序中配置就可以生效(mapred-default.xml)
配置参数 参数说明
mapreduce.map.memory.mb 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。
mapreduce.reduce.memory.mb 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。
mapreduce.map.cpu.vcores 每个Map task可使用的最多cpu core数目,默认值: 1
mapreduce.reduce.cpu.vcores 每个Reduce task可使用的最多cpu core数目,默认值: 1
mapreduce.reduce.shuffle.parallelcopies 每个reduce去map中拿数据的并行数。默认值是5
mapreduce.reduce.shuffle.merge.percent buffer中的数据达到多少比例开始写入磁盘。默认值0.66
mapreduce.reduce.shuffle.input.buffer.percent buffer大小占reduce可用内存的比例。默认值0.7
mapreduce.reduce.input.buffer.percent 指定多少比例的内存用来存放buffer中的数据,默认值是0.0
(b).应该在yarn启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)
配置参数 参数说明
yarn.scheduler.minimum-allocation-mb 1024 给应用程序container分配的最小内存
yarn.scheduler.maximum-allocation-mb 8192 给应用程序container分配的最大内存
yarn.scheduler.minimum-allocation-vcores 1 每个container申请的最小CPU核数
yarn.scheduler.maximum-allocation-vcores 32 每个container申请的最大CPU核数
yarn.nodemanager.resource.memory-mb 8192 给containers分配的最大物理内存
(c ).shuffle性能优化的关键参数,应在yarn启动之前就配置好(mapred-default.xml)
配置参数 参数说明
mapreduce.task.io.sort.mb 100 shuffle的环形缓冲区大小,默认100m
mapreduce.map.sort.spill.percent 0.8 环形缓冲区溢出的阈值,默认80%
(2)容错相关参数(mapreduce性能优化)
配置参数 参数说明
mapreduce.map.maxattempts 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.reduce.maxattempts 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.task.timeout Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”

三. HDFS小文件优化方法

1 HDFS小文件弊端

    HDFS上每个文件都要在namenode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用namenode的内存空间,另一方面就是索引文件过大是的索引速度变慢。

2 解决方案

(1)Hadoop Archive:

    是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样就减少了namenode的内存使用。

(2)Sequence file:

    sequence file由一系列的二进制key/value组成,如果key为文件名,value为文件内容,则可以将大批小文件合并成一个大文件。

(3)CombineFileInputFormat:

    CombineFileInputFormat是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置。

(4)开启JVM重用

    对于大量小文件Job,可以开启JVM重用会减少45%运行时间。
    JVM重用理解:一个map运行一个jvm,重用的话,在一个map在jvm上运行完毕后,jvm继续运行其他map。
    具体设置:mapreduce.job.jvm.numtasks值在10-20之间。

Hadoop之数据压缩

1.概述

    压缩技术能够有效减少底层存储系统(HDFS)读写字节数。压缩提高了网络带宽和磁盘空间的效率。在Hadoop下,尤其是数据规模很大和工作负载密集的情况下,使用数据压缩显得非常重要。在这种情况下,I/O操作和网络数据传输要花大量的时间。还有,Shuffle与Merge过程同样也面临着巨大的I/O压力。

    鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。不过,尽管压缩与解压操作的CPU开销不高,其性能的提升和资源的节省并非没有代价。

    如果磁盘I/O和网络带宽影响了MapReduce作业性能,在任意MapReduce阶段启用压缩都可以改善端到端处理时间并减少I/O和网络流量。

    压缩Mapreduce的一种优化策略:通过压缩编码对Mapper或者Reducer的输出进行压缩,以减少磁盘IO,提高MR程序运行速度(但相应增加了cpu运算负担)

    注意:压缩特性运用得当能提高性能,但运用不当也可能降低性能。

(1)基本原则:
  • (a)运算密集型的job,少用压缩
  • (b)IO密集型的job,多用压缩

2.MR支持的压缩编码

压缩格式 hadoop自带? 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序是否需要修改
DEFAULT 是,直接使用 DEFAULT .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFAULT .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
Snappy 否,需要安装 Snappy .snappy 和文本处理一样,不需要修改

为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

压缩性能的比较

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s
http://google.github.io/snappy/

3 压缩方式选择

(1) Gzip压缩
  • 优点:压缩率比较高,而且压缩/解压速度也比较快;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;大部分linux系统都自带gzip命令,使用方便。

  • 缺点:不支持split。

应用场景:当每个文件压缩之后在130M以内的(1个块大小内),都可以考虑用gzip压缩格式。例如说一天或者一个小时的日志压缩成一个gzip文件,运行mapreduce程序的时候通过多个gzip文件达到并发。hive程序,streaming程序,和java写的mapreduce程序完全和文本处理一样,压缩之后原来的程序不需要做任何修改。

(2) Bzip2压缩
  • 优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native(java和c互操作的API接口);在linux系统下自带bzip2命令,使用方便。

  • 缺点:压缩/解压速度慢;不支持native。

应用场景:适合对速度要求不高,但需要较高的压缩率的时候,可以作为mapreduce作业的输出格式;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序(即应用程序不需要修改)的情况。

(3) Lzo压缩
  • 优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;可以在linux系统下安装lzop命令,使用方便。

  • 缺点:压缩率比gzip要低一些;hadoop本身不支持,需要安装;在应用中对lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定inputformat为lzo格式)。

应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越越明显。

(4)Snappy压缩
  • 优点:高速压缩速度和合理的压缩率。

  • 缺点:不支持split;压缩率比gzip要低;hadoop本身不支持,需要安装;

应用场景:当Mapreduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个Mapreduce作业的输出和另外一个Mapreduce作业的输入。

4 压缩位置选择

压缩可以在MapReduce作用的任意阶段启用。

5 压缩配置参数

要在Hadoop中启用压缩,可以配置如下参数:

参数 默认值 阶段 建议
io.compression.codecs
(在core-site.xml中配置)
org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec 输入压缩 Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress
(在mapred-site.xml中配置)
false mapper输出 这个参数设为true启用压缩
mapreduce.map.output.compress.codec
(在mapred-site.xml中配置)
org.apache.hadoop.io.compress.DefaultCodec mapper输出 使用LZO或snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress
(在mapred-site.xml中配置)
false reducer输出 这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codec
(在mapred-site.xml中配置)
org.apache.hadoop.io.compress. DefaultCodec reducer输出 使用标准工具或者编解码器,如gzip和bzip2
mapreduce.output.fileoutputformat.compress.type
(在mapred-site.xml中配置)
RECORD reducer输出 SequenceFile输出使用的压缩类型:NONE和BLOCK

6 压缩实战

Yarn-HA配置

一.Yarn-HA配置

1.YARN-HA工作机制

(1).官方文档:

官方文档

(2).YARN-HA工作机制

image.png

2.配置YARN-HA集群

(0).环境准备

(a)修改IP
(b)修改主机名及主机名和IP地址的映射
(c )关闭防火墙
(d)ssh免密登录
(e)安装JDK,配置环境变量等
(f)配置Zookeeper集群
(g)配置过mapred-site.xml(和以前配置一样)

(1).规划集群
bigdata111 bigdata112 bigdata113
NameNode NameNode
JournalNode JournalNode JournalNode
DataNode DataNode DataNode
ZK ZK ZK
ResourceManager ResourceManager
NodeManager NodeManager NodeManager
(2).具体配置
(a)yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<configuration>

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!--启用resourcemanager ha-->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>

<!--声明两台resourcemanager的地址-->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster-yarn1</value>
</property>

<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>

<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>hadoop1</value>
</property>

<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>hadoop2</value>
</property>

<!--指定zookeeper集群的地址-->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>hadoop1:2181,hadoop2:2181,hadoop3:2181</value>
</property>

<!--启用自动恢复-->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>

<!--指定resourcemanager的状态信息存储在zookeeper集群-->
<property>
<name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
</configuration>
(b)同步更新其他节点的配置信息
(3).启动hdfs

(a)在各个JournalNode节点上,输入以下命令启动journalnode服务:

1
sbin/hadoop-daemon.sh start journalnode

(b)在[nn1]上,对其进行格式化,并启动:

1
2
bin/hdfs namenode -format
sbin/hadoop-daemon.sh start namenode

(c )在[nn2]上,同步nn1的元数据信息:

1
bin/hdfs namenode -bootstrapStandby

(d)启动[nn2]:

1
sbin/hadoop-daemon.sh start namenode

(e)启动所有datanode

1
sbin/hadoop-daemons.sh start datanode

(f)将[nn1]切换为Active

1
bin/hdfs haadmin -transitionToActive nn1
(4).启动yarn

(a)在hadoop1中执行:

1
sbin/start-yarn.sh

(b)在hadoop2中执行:

1
sbin/yarn-daemon.sh start resourcemanager

(c)查看服务状态

1
bin/yarn rmadmin -getServiceState rm1

image.png

(d)强制切换状态:

1
bin/yarn rmadmin -transitionToStandby rm2 --forcemanual

(e)关于sbin/start.all.sh
hadoop1:

1
sbin/start.all.sh

二.HDFS Federation架构设计

1.NameNode架构的局限性

(1)Namespace(命名空间)的限制

由于NameNode在内存中存储所有的元数据(metadata),因此单个namenode所能存储的对象(文件+块)数目受到namenode所在JVM的heap size的限制。50G的heap能够存储20亿(200million)个对象,这20亿个对象支持4000个datanode,12PB的存储(假设文件平均大小为40MB)。随着数据的飞速增长,存储的需求也随之增长。单个datanode从4T增长到36T,集群的尺寸增长到8000个datanode。存储的需求从12PB增长到大于100PB。

(2)隔离问题

由于HDFS仅有一个namenode,无法隔离各个程序,因此HDFS上的一个实验程序就很有可能影响整个HDFS上运行的程序。

(3)性能的瓶颈

由于是单个namenode的HDFS架构,因此整个HDFS文件系统的吞吐量受限于单个namenode的吞吐量。

2.HDFS Federation架构设计

能不能有多个NameNode

NameNode NameNode NameNode
元数据 元数据 元数据
Log machine 电商数据/话单数据

image.png

3.HDFS Federation应用思考

不同应用可以使用不同NameNode进行数据管理
图片业务、爬虫业务、日志审计业务
Hadoop生态系统中,不同的框架使用不同的namenode进行管理namespace。(隔离性)

  • Copyrights © 2015-2021 Movle
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信