Fork me on GitHub

HDFS-HA高可用集群配置

一. HDFS-HA集群配置

1 环境准备

(1)修改IP
(2)修改主机名及主机名和IP地址的映射
(3)关闭防火墙
(4)ssh免密登录
(5)安装JDK,配置环境变量等
(6)hadoop1,hadoop2,hadoop3集群已经配置好zookeeper集群

2 规划集群

hadoop1 hadoop2 hadoop3
NameNode NameNode
JournalNode JournalNode JournalNode
DataNode DataNode DataNode
ZK ZK ZK
zkfc zkfc
ResourceManager
NodeManager NodeManager NodeManager

3.具体步骤

(1).在opt目录下创建一个HA文件夹

1
2
3
cd /opt

mkdir HA

(2).将hadoop-2.8.4压缩包解压到/opt/HA目录下

1
tar -zxvf hadoop-2.8.4.tar.gz -C /opt/HA/

(3).配置hadoop-env.sh

1
export JAVA_HOME=/opt/module/jdk1.8.0_144

(4).配置core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<configuration>
<!-- 把两个NameNode)的地址组装成一个集群mycluster -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://mycluster</value>
</property>
<!-- 指定hadoop运行时产生文件的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/HA/hadoop-2.8.4/data/tmp</value>
</property>
<!--配置zookeeper-->
<property>
<name>ha.zookeeper.quorum</name>
<value>hadoop1:2181,hadoop2:2181,hadoop3:2181</value>
</property>
</configuration>

(5).配置hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
<configuration>
<!-- 完全分布式集群名称 -->
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<!-- 集群中NameNode节点都有哪些 -->
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>hadoop1:9000</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>hadoop2:9000</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.mycluster.nn1</name>
<value>hadoop1:50070</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.mycluster.nn2</name>
<value>hadoop2:50070</value>
</property>
<!-- 指定NameNode元数据在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://hadoop1:8485;hadoop2:8485;hadoop3:8485/mycluster</value>
</property>
<!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应 -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<!-- 使用隔离机制时需要ssh无秘钥登录-->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
<!--这里主要是看自己是什么用户,我是root用户-->
</property>
<!-- 声明journalnode服务器存储目录-->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/opt/HA/hadoop-2.8.4/data/jn</value>
</property>
<!-- 关闭权限检查-->
<property>
<name>dfs.permissions.enable</name>
<value>false</value>
</property>
<!-- 访问代理类:client,mycluster,active配置失败自动切换实现方式-->
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
</configuration>

(6).配置slaves文件:

1
2
3
cd /opt/HA/hadoop-2.8.4/etc/hadoop

vi slaves

修改内容:

1
2
3
hadoop1
hadoop2
hadoop3

(7).拷贝配置好的hadoop环境到其他节点

1
2
3
scp -r /opt/HA/* root@hadoop2:/opt/HA/

scp -r /opt/HA/* root@hadoop3:/opt/HA/

4 初始化namenode

(1).在各个JournalNode节点上,输入以下命令启动journalnode服务:

1
sbin/hadoop-daemon.sh start journalnode

(2).在[nn1]上,对其进行格式化,并启动:
hadoop1:

1
2
3
bin/hdfs namenode -format

sbin/hadoop-daemon.sh start namenode

(3).在[nn2]上,同步nn1的元数据信息:

1
bin/hdfs namenode -bootstrapStandby

(4).启动[nn2]:
hadoop2:

1
sbin/hadoop-daemon.sh start namenode

image.png

(5).查看web页面显示

hadoop1
hadoop2

5.高可用HDFS集群启动

(1)关闭所有HDFS服务:
hadoop1

1
sbin/stop-dfs.sh

hadoop2:

1
sbin/hadoop-daemon.sh stop namenode  

(2)启动Zookeeper集群,每一台都启动zookeeper:

1
bin/zkServer.sh start

(3)初始化HA在Zookeeper中状态:
在hadoop1中:

1
bin/hdfs zkfc -formatZK

(4)启动HDFS服务:

1
sbin/start-dfs.sh

(5)在各个NameNode节点上启动DFSZK Failover Controller,先在哪台机器启动,哪个机器的NameNode就是Active NameNode

1
sbin/hadoop-daemin.sh start zkfc

(若是直接设置自动切换,DFSZK Failover Controller没有启动,则需要走这一步)

6.验证

(1)将Active NameNode进程kill

1
kill -9 namenode的进程id

(2)将Active NameNode机器断开网络

1
service network stop

HDFS之HA高可用概述

一.HA概述

1.所谓HA(2high available),即高可用(7*24小时不中断服务)

2.实现高可用最关键的策略是消除单点故障。HA严格来说应该分成各个组件的HA机制:HDFS的HA和YARN的HA。

3.Hadoop2.0之前,在HDFS集群中NameNode存在单点故障(SPOF)

4.NameNode主要在以下两个方面影响HDFS集群
    NameNode机器发生意外,如宕机,集群将无法使用,直到管理员重启
    NameNode机器需要升级,包括软件、硬件升级,此时集群也将无法使用
    HDFS HA功能通过配置Active/Standby两个nameNodes实现在集群中对NameNode的热备来解决上述问题。如果出现故障,如机器崩溃或机器需要升级维护,这时可通过此种方式将NameNode很快的切换到另外一台机器。

二. HDFS-HA工作机制

通过双namenode消除单点故障

1.HDFS-HA工作要点

(1).元数据管理方式需要改变:
  • 内存中各自保存一份元数据;
  • Edits日志只有Active状态的namenode节点可以做写操作;
  • 两个namenode都可以读取edits;
  • 共享的edits放在一个共享存储中管理(qjournal和NFS两个主流实现)
(2).需要一个状态管理功能模块

实现了一个zkfailover,常驻在每一个namenode所在的节点,每一个zkfailover负责监控自己所在namenode节点,利用zk进行状态标识,当需要进行状态切换时,由zkfailover来负责切换,切换时需要防止brain split现象的发生。

(3).必须保证两个NameNode之间能够ssh无密码登录。
(4).隔离(Fence),即同一时刻仅仅有一个NameNode对外提供服务

2.HDFS-HA自动故障转移工作机制

    可以使用命令hdfs haadmin -failover手动进行故障转移,在该模式下,即使现役NameNode已经失效,系统也不会自动从现役NameNode转移到待机NameNode。自动故障转移为HDFS部署增加了两个新组件:ZooKeeper和ZKFailoverController(ZKFC)进程。

(1)ZooKeeper是维护少量协调数据,通知客户端这些数据的改变和监视客户端故障的高可用服务。HA的自动故障转移依赖于ZooKeeper的以下功能:

  • (a)故障检测:集群中的每个NameNode在ZooKeeper中维护了一个持久会话,如果机器崩溃,ZooKeeper中的会话将终止,ZooKeeper通知另一个NameNode需要触发故障转移。
  • (b)现役NameNode选择:ZooKeeper提供了一个简单的机制用于唯一的选择一个节点为active状态。如果目前现役NameNode崩溃,另一个节点可能从ZooKeeper获得特殊的排外锁以表明它应该成为现役NameNode。

(2)ZKFC是自动故障转移中的另一个新组件,是ZooKeeper的客户端,也监视和管理NameNode的状态。每个运行NameNode的主机也运行了一个ZKFC进程,ZKFC负责:

  • (a)健康监测:ZKFC使用一个健康检查命令定期地ping与之在相同主机的NameNode,只要该NameNode及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的。
  • (b)ZooKeeper会话管理:当本地NameNode是健康的,ZKFC保持一个在ZooKeeper中打开的会话。如果本地NameNode处于active状态,ZKFC也保持一个特殊的znode锁,该锁使用了ZooKeeper对短暂节点的支持,如果会话终止,锁节点将自动删除。
  • (c )基于ZooKeeper的选择:如果本地NameNode是健康的,且ZKFC发现没有其它的节点当前持有znode锁,它将为自己获取该锁。如果成功,则它已经赢得了选择,并负责运行故障转移进程以使它的本地NameNode为active。故障转移进程与前面描述的手动故障转移相似,首先如果必要保护之前的现役NameNode,然后本地NameNode转换为active状态。

HDFS-HA故障转移机制

MapReduce开发总结

在编写mapreduce程序时,需要考虑的几个方面:

1.输入数据接口:InputFormat

    默认使用的实现类是:TextInputFormat
    TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
    KeyValueTextInputFormat每一行均为一条记录,被分隔符分割为key,value。默认分隔符是tab(\t)。
    NlineInputFormat按照指定的行数N来划分切片。
    CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。
    用户还可以自定义InputFormat。

2.逻辑处理接口:Mapper

用户根据业务需求实现其中三个方法:map() setup() cleanup ()

3.Partitioner分区

    有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
如果业务上有特别的需求,可以自定义分区。

4.Comparable排序

    当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。

  • 部分排序:对最终输出的每一个文件进行内部排序。
  • 全排序:对所有数据进行排序,通常只有一个Reduce。
  • 二次排序:排序的条件有两个。

5.Combiner合并

    Combiner合并可以提高程序执行效率,减少io传输。但是使用时必须不能影响原有的业务处理结果。

6.reduce端分组:Groupingcomparator

    reduceTask拿到输入数据(一个partition的所有数据)后,首先需要对数据进行分组,其分组的默认原则是key相同,然后对每一组kv数据调用一次reduce()方法,并且将这一组kv中的第一个kv的key作为参数传给reduce的key,将这一组数据的value的迭代器传给reduce()的values参数。
    利用上述这个机制,我们可以实现一个高效的分组取最大值的逻辑。
    自定义一个bean对象用来封装我们的数据,然后改写其compareTo方法产生倒序排序的效果。然后自定义一个Groupingcomparator,将bean对象的分组逻辑改成按照我们的业务分组id来分组(比如订单号)。这样,我们要取的最大值就是reduce()方法中传进来key。

7.逻辑处理接口:Reducer

    用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()

8.输出数据接口:OutputFormat

    默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对向目标文本文件中输出为一行。
    SequenceFileOutputFormat将它的输出写为一个顺序文件。如果输出需要作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。
    用户还可以自定义OutputFormat。

MapRedude之Join,数据清洗,计数器应用

一. Join多种应用

1.Reduce join

(1).原理:

    Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
    Reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行合并就ok了。

(2).该方法的缺点

这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。

(3).案例实操

2.Map join(Distributedcache分布式缓存)

(1).使用场景:一张表十分小、一张表很大。
(2).解决方案

在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端数据的压力,尽可能的减少数据倾斜。

(3).具体办法:采用distributedcache

(a)在mapper的setup阶段,将文件读取到缓存集合中。
(b)在驱动函数中加载缓存。

1
job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 缓存普通文件到task运行节点
(4).实操案例:

二. 数据清洗(ETL)

1.概述

    在运行核心业务Mapreduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行mapper程序,不需要运行reduce程序。

2.实操案例

三.计数器应用

    Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。

1.API

(1)采用枚举的方式统计计数

1
2
3
enum MyCounter{MALFORORMED,NORMAL}
//对枚举定义的自定义计数器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);

(2)采用计数器组、计数器名称的方式统计

1
2
context.getCounter("counterGroup", "countera").increment(1);
//组名和计数器名称随便起,但最好有意义。

(3)计数结果在程序运行后的控制台上查看。

2.案例实操

MapReduce之OutputFormat数据输出

1.OutputFormat接口实现类

    OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

(1).文本输出TextOutputFormat

    默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。

(2).SequenceFileOutputFormat

    SequenceFileOutputFormat将它的输出写为一个顺序文件。如果输出需要作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

(3).自定义OutputFormat

    根据用户需求,自定义实现输出。

2.自定义OutputFormat

    为了实现控制最终文件的输出路径,可以自定义OutputFormat。
    要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现。

(1).自定义OutputFormat步骤

(a)自定义一个类继承FileOutputFormat。
(b)改写recordwriter,具体改写输出数据的方法write()。

(2).实操案例:

MapReduce之ReduceTask工作机制

1.设置ReduceTask并行度(个数)

    reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:

1
2
//默认值是1,手动设置为4
job.setNumReduceTasks(4);

2.注意

(1)reducetask=0 ,表示没有reduce阶段,输出文件个数和map个数一致。

(2)reducetask默认值就是1,所以输出文件个数为一个。

(3)如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜

(4)reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask。

(5)具体多少个reducetask,需要根据集群性能而定。

(6)如果分区数不是1,但是reducetask为1,是否执行分区过程。答案是:不执行分区过程。因为在maptask的源码中,执行分区的前提是先判断reduceNum个数是否大于1。不大于1肯定不执行。

3.实验:测试reducetask多少合适。

(1)实验环境:1个master节点,16个slave节点:CPU:8GHZ,内存: 2G

(2)实验结论:

表1 改变reduce task (数据量为1GB)

maptask=16
Reduce task 1 5 10 15 16 20 25 30 45 60
总时间 892 146 110 92 88 100 128 101 145 104

4.ReduceTask工作机制

ReduceTask工作机制

(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

MapReduce之Shuffle机制

1. Shuffle机制

    Mapreduce确保每个reducer的输入都是按键排序的。系统执行排序的过程(即将map输出作为输入传给reducer)称为shuffle。


shuffle机制

2.Partition分区

(0).问题引出:

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

(1).默认partition分区
1
2
3
4
5
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}

默认分区是根据key的hashCode对reduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

(2).自定义Partitioner步骤

(a)自定义类继承Partitioner,重写getPartition()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 获取电话号码的前三位
String preNum = key.toString().substring(0, 3);
partition = 4;

// 2 判断是哪个省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}

(b)在job驱动中,设置自定义partitioner:

1
job.setPartitionerClass(CustomPartitioner.class);

(c )自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task

1
job.setNumReduceTasks(5);
(3).注意:
  • 如果reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

  • 如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;

  • 如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;

例如:假设自定义分区数为5,则
(a)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
(b)job.setNumReduceTasks(2);会报错
(c )job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件

(4).案例实操

3.WritableComparable排序

    排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均会对数据(按照key)进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

    对于Map Task,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。

    对于Reduce Task,它从每个Map Task上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次合并。

每个阶段的默认排序

(1).排序的分类:
(a)部分排序:

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。

(b)全排序:

    如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。

    替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。

(c )辅助排序:(GroupingComparator分组)

    Mapreduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。

(d)二次排序:

    在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

(2).自定义排序WritableComparable
(a)原理分析

    bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序

1
2
3
4
5
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
(b)案例实操

4.GroupingComparator分组(辅助排序)

(1).对reduce阶段的数据根据某一个或几个字段进行分组。

(2).案例实操

5.Combiner合并

(0).在分布式的架构中,分布式文件系统HDFS,和分布式运算程序编程框架mapreduce。
  • HDFS:不怕大文件,怕很多小文件
  • mapreduce :怕数据倾斜

那么mapreduce是如果解决多个小文件的问题呢?

(1).mapreduce关于大量小文件的优化策略

(a)默认情况下,TextInputFormat对任务的切片机制是按照文件规划切片,不管有多少个小文件,都会是单独的切片,都会交给一个maptask,这样,如果有大量的小文件
就会产生大量的maptask,处理效率极端底下

(b)优化策略

  • 最好的方法:在数据处理的最前端(预处理、采集),就将小文件合并成大文件,在上传到HDFS做后续的分析
  • 补救措施:如果已经是大量的小文件在HDFS中了,可以使用另一种inputformat来做切片(CombineFileInputformat),它的切片逻辑跟TextInputformat不同,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask了
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    //如果不设置InputFormat,它默认的用的是TextInputFormat.class

    /*CombineTextInputFormat为系统自带的组件类

    * setMinInputSplitSize 中的2048是表示n个小文件之和不能大于2048

    * setMaxInputSplitSize 中的4096是 当满足setMinInputSplitSize中的2048情况下 在满足n+1个小文件之和不能大于4096

    */

    job.setInputFormatClass(CombineTextInputFormat.class);

    CombineTextInputFormat.setMinInputSplitSize(job, 2048);

    CombineTextInputFormat.setMaxInputSplitSize(job, 4096);
    (2).示例
    (a)输入数据:准备5个小文件
    (b)实现过程
  • (a)不做任何处理,运行需求1中的wordcount程序,观察切片个数为5
  • (b)在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为1
    1
    2
    3
    4
    5
    6
    7
    // 如果不设置InputFormat,它默认用的是TextInputFormat.class

    job.setInputFormatClass(CombineTextInputFormat.**class**);

    CombineTextInputFormat.*setMaxInputSplitSize*(job, 4*1024*1024);// 4m

    CombineTextInputFormat.*setMinInputSplitSize*(job, 2*1024*1024);// 2m
    注:在看number of splits时,和最大值(MaxSplitSize)有关、总体规律就是和低于最大值是一片、高于最大值1.5倍+,则为两片;高于最大值2倍以上则向下取整,比如文件大小65MB,切片最大值为4MB,那么切片为16个.总体来说,切片差值不超过1个,不影响整体性能
(3).自定义Combiner实现步骤:

(a)自定义一个combiner继承Reducer,重写reduce方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {

// 1 汇总操作
int count = 0;
for(IntWritable v :values){
count = v.get();
}
// 2 写出
context.write(key, new IntWritable(count));
}
}

(b)在job驱动类中设置:

1
job.setCombinerClass(WordcountCombiner.class);

MapReduce之MapTask工作机制

1.并行度决定机制

(1).问题引出

    maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度。那么,mapTask并行任务是否越多越好呢?

(2).MapTask并行度决定机制

    一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定。

数据切片及Maptask并行度决定机制

2.MapTask工作机制

MapTask工作机制

(1)Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。

(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

    溢写阶段详情:

  • 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

  • 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

  • 步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

    当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
    在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
    让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

MapReduce之InputFormat数据输入

1.Job提交流程和切片源码详解

(1).job提交流程源码详解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
waitForCompletion()

submit();
connect(); // 1建立连接

new Cluster(getConfiguration()); // 1)创建提交job的代理

initialize(jobTrackAddr, conf); // (1)判断是本地yarn还是远程

submitter.submitJobInternal(Job.this, cluster) // 2 提交job

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 1)创建给集群提交数据的Stag路径

JobID jobId = submitClient.getNewJobID(); // 2)获取jobid ,并创建job路径

copyAndConfigureFiles(job, submitJobDir); // 3)拷贝jar包到集群
rUploader.uploadFiles(job, jobSubmitDir);

writeSplits(job, submitJobDir);// 4)计算切片,生成切片规划文件
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向Stag路径写xml配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
(2).FileInputFormat源码解析(input.getSplits(job))(这里留一个坑)

(a)找到你数据存储的目录。
(b)开始遍历处理(规划切片)目录下的每一个文件
(c )遍历第一个文件ss.txt

1
2
3
4
5
6
7
8
(a)获取文件大小fs.sizeOf(ss.txt);
(b)计算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
(c)默认情况下,切片大小=blocksize
(d)开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
(e)将切片信息写到一个切片规划文件中
(f)整个切片的核心过程在getSplit()方法中完成。
(g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
(h)注意:block是HDFS物理上存储的数据,切片是对数据逻辑上的划分。

(d)提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数。

2.FileInputFormat切片机制

(1).FileInputFormat中默认的切片机制:

(a)简单地按照文件的内容长度进行切片

(b)切片大小,默认等于block大小

(c)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

比如待处理数据有两个文件:

1
2
3
file1.txt    320M

file2.txt 10M

经过FileInputFormat的切片机制运算后,形成的切片信息如下:

1
2
3
4
5
6
7
file1.txt.split1--  0~128

file1.txt.split2-- 128~256

file1.txt.split3-- 256~320

file2.txt.split1-- 0~10M
(2).FileInputFormat切片大小的参数配置

(a)通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));

(b)切片主要由这几个值来运算决定

  • mapreduce.input.fileinputformat.split.minsize=1 默认值为1

  • mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue,因此,默认情况下,切片大小=blocksize。

  • maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值。

  • minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。

(3).获取切片信息API
1
2
3
4
5
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();

// 获取切片的文件名称
String name = inputSplit.getPath().getName();

3.CombineTextInputFormat切片机制

    关于大量小文件的优化策略

(1).默认情况下TextInputformat对任务的切片机制是按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个maptask,这样如果有大量小文件,就会产生大量的maptask,处理效率极其低下。
(2).优化策略
  • (a)最好的办法,在数据处理系统的最前端(预处理/采集),将小文件先合并成大文件,再上传到HDFS做后续分析。

  • (b)补救措施:如果已经是大量小文件在HDFS中了,可以使用另一种InputFormat来做切片(CombineTextInputFormat),它的切片逻辑跟TextFileInputFormat不同:它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask。

  • (c)优先满足最小切片大小,不超过最大切片大小

    1
    2
    3
    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
    CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
    举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
(3).具体实现步骤
1
2
3
4
//  如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
(4).案例实操

4.InputFormat接口实现类

    MapReduce任务的输入文件一般是存储在HDFS里面。输入的文件格式包括:基于行的日志文件、二进制格式文件等。这些文件一般会很大,达到数十GB,甚至更大。

    InputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。

(1).TextInputFormat

    TextInputFormat是默认的InputFormat。每条记录是一行输入。键是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符)
    以下是一个示例,比如,一个分片包含了如下4条文本记录。

1
2
3
4
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

    每条记录表示为以下键/值对:

1
2
3
4
(0,Rich learning form)
(19,Intelligent learning engine)
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)

很明显,键并不是行号。一般情况下,很难取得行号,因为文件按字节而不是按行切分为分片。

(2).KeyValueTextInputFormat

    每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, “ “);来设定分隔符。默认分隔符是tab(\t)。
    以下是一个示例,输入是一个包含4条记录的分片。其中——>表示一个(水平方向的)制表符

1
2
3
4
line1 ——>Rich learning form
line2 ——>Intelligent learning engine
line3 ——>Learning more convenient
line4 ——>From the real demand for more close to the enterprise

    每条记录表示为以下键/值对:

1
2
3
4
(line1,Rich learning form)
(line2,Intelligent learning engine)
(line3,Learning more convenient)
(line4,From the real demand for more close to the enterprise)

    此时的键是每行排在制表符之前的Text序列。

(3).NLineInputFormat

    如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按block块去划分,而是按NlineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数(20),如果不整除,切片数=商+1。
    以下是一个示例,仍然以上面的4行输入为例。

1
2
3
4
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

    例如,如果N是2,则每个输入分片包含两行。开启2个maptask。

1
2
(0,Rich learning form)
(19,Intelligent learning engine)

    另一个 mapper 则收到后两行:

1
2
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)

    这里的键和值与TextInputFormat生成的一样。

5.自定义InputFormat

(1).概述

(a)自定义一个类继承FileInputFormat

(b)改写RecordReader,实现一次读取一个完整文件封装为KV。

(c )在输出时使用SequenceFileOutPutFormat输出合并文件。

(2).案例实操

Hadoop序列化

1 为什么要序列化?

    一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

2 什么是序列化?

    序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。

3 为什么不用Java的序列化?

    Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。

4 为什么序列化对Hadoop很重要?

    因为Hadoop在集群之间进行通讯或者RPC调用的时候,需要序列化,而且要求序列化要快,且体积要小,占用带宽要小。所以必须理解Hadoop的序列化机制。
    序列化和反序列化在分布式数据处理领域经常出现:进程通信和永久存储。然而Hadoop中各个节点的通信是通过远程调用(RPC)实现的,那么RPC序列化要求具有以下特点:

  • 紧凑:紧凑的格式能让我们充分利用网络带宽,而带宽是数据中心最稀缺的资
  • 快速:进程通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的;
  • 可扩展:协议为了满足新的需求变化,所以控制客户端和服务器过程中,需要直接引进相应的协议,这些是新协议,原序列化方式能支持新的协议报文;
  • 互操作:能支持不同语言写的客户端和服务端进行交互;

5.常用数据序列化类型

常用的数据类型对应的hadoop数据序列化类型

Java类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
string Text
map MapWritable
array ArrayWritable

6 自定义bean对象实现序列化接口(Writable)

(1)自定义bean对象要想序列化传输,必须实现序列化接口,需要注意以下7项

(a)必须实现Writable接口

(b)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

1
2
3
public FlowBean() {
super();
}

(c )重写序列化方法

1
2
3
4
5
6
7
8
9
@Override
public void write(DataOutput out) throws IOException {

out.writeLong(upFlow);

out.writeLong(downFlow);

out.writeLong(sumFlow);
}

(d)重写反序列化方法

1
2
3
4
5
6
7
8
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();

downFlow = in.readLong();

sumFlow = in.readLong();
}

(e)注意反序列化的顺序和序列化的顺序完全一致

(f)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。

(g)如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序。

1
2
3
4
5
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
  • Copyrights © 2015-2021 Movle
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信