Fork me on GitHub

数据结构与算法(4)-栈与队列

一.栈的定义:

1.栈的定义:

(1).栈(stack)是限定仅在表尾进行插入和删除操作的线性表
(2).把允许插入和删除的一端称为栈顶(top),另一端称为栈底(bottom),不含任何数据元素的栈称为空栈。栈又称为后进先出(Last In First Out)的线性表,简称LIFO结构

2.进栈出栈变化形式

二.栈的抽象数据类型

三.栈的顺序存储结构及实现

1.栈的顺序存储结构
2.栈的顺序存储结构–进栈操作
3.栈的顺序存储结构–出栈操作

此时进栈出栈都没有涉及到任何循环语句,因此时间复杂度均是O(1)

四.两栈共享存储空间

五.栈的链式存储结构

1.栈的链式存储结构

(1)栈的链式存储结构,简称为链栈
(2)对于空栈来说,链表原定义是头指针指向空,那么链栈的空其实就是top=NULL的时候

2.栈的链式存储结构–进栈操作

3.栈的链式存储结构–出栈操作

(1)对比一下顺序栈与链栈,它们在时间复杂度上是一样的,均为O(1)

六.栈的应用–递归

1.递归定义:

(1)把一个直接调用自己或通过一系列的调用语句间接地调用自己的函数,称做递归函数
(2)迭代和递归的区别是:

迭代使用的是循环结构,递归使用的是选择结构。递归能使程序的结构更清晰、更简洁、更容易让人理解,从而减少读懂代码的时间。但是大量的递归调用会建立函数的副本,会耗费大量的时间和内存。迭代则不需要反复调用函数和占用额外的内存

七.队列的定义

(1)队列(queue)是只允许在一端进行插入操作,而在另一端进行删除操作的线性表
(2)队列是一种先进先出(First In First Out)的线性表,简称FIFO。允许插入的一端称为队尾,允许删除的一端称为队头

八.队列的抽象数据类型

九.循环队列

1.循环队列的定义:

把队列的这种头尾相接的顺序存储结构称为循环队列

十.队列的链式存储结构

1.含义

(1)队列的链式存储结构,其实就是线性表的单链表,只不过它只能尾进头出而已,我们把它简称为链队列

2.队列的链式存储结构–入队操作

3.队列的链式存储结构–出队操作

4.链队列与循环队列的比较:

(1)从时间上,其实它们的基本操作都是常数时间,即都为O(1)的,不过循环队列是事先申请好空间,使用期间不释放,而对于链队列,每次申请和释放结点也会存在一些时间开销

数据结构与算法(3)-线性表

线性表

一.线性表的定义

1.线性表是零个或多个具有相同类型的数据元素的有限序列

2.线性表的关键:

(1)首先线性表是一个序列,元素之间是有顺序的
(2)然后,线性表强调是有限的

二.线性表的顺序存储结构

1.顺序存储的定义:

线性表的顺序存储结构,指的是用一段地址连续的存储单元依次存储线性表的数据元素

顺序存储示意图

2.顺序存储方式:

(1)描述顺序存储结构需要三个属性:
  • 存储空间的起始位置:数组data,它的存储位置就是存储空间的存储位置
  • 线性表的最大存储容量:数组长度MaxSize
  • 线性表的当前长度:length

3.数组长度与线性表长度的区别

(1)数组的长度是存放线性表的存储空间的长度,存储分配后这个量一般是不变的
(2)线性表的长度是线性表中数据元素的个数,随着线性表插入和删除操作的进行,这个量是变化的
(3)线性表的长度应该小于等于数组的长度

4.地址计算方法

三.顺序存储结构的插入与删除

1.获得元素操作

2.插入操作

(1)插入算法的思路:
  • 如果插入位置不合理,抛出异常;
  • 如果线性表长度大于等于数组长度,则抛出异常或动态增加容量;
  • 从最后一个元素开始向前遍历到第i个位置,分别将它们都向后移动一个位置;
  • 将要插入元素填入位置i处,表长加1

3.删除操作

(1)删除算法的思路:
  • 如果删除位置不合理,抛出异常;
  • 取出删除元素;
  • 从删除元素位置开始遍历到最后一个元素位置,分别将它们都向前移动一个位置;
  • 表长减1

4.线性表顺序存储的优缺点

线性表顺序存储的优缺点

(1)线性表的顺序存储结构,在存、读数据时,不管是哪个位置,时间复杂度都是O(1);而插入或删除时,时间复杂度都是O(n)

四.线性表的链式存储结构

线性表的链式存储结构

1.线性表链式存储结构定义:

(1)线性表的链式存储结构的特点:

是用一组任意的存储单元存储线性表的数据元素,这组存储单元可以是连续的,也可以是不连续的。这就意味着,这些数据元素可以存在内存未被占用的任意位置

(2)链表的每个结点中只包含一个指针域,所以叫做单链表
(3)链表中第一个结点的存储位置叫做头指针
(4)线性链表的最后一个结点指针为“空”

2.头指针与头节点的异同

头指针与头节点的异同

五.单链表的读取

(1).获得链表第i个数据的算法思路:
  • 声明一个指针p指向链表第一个结点,初始化j从1开始;
  • 当j<i时,就遍历链表,让p的指针向后移动,不断指向下一结点,j累加1;
  • 若到链表末尾p为空,则说明第i个结点不存在;
  • 否则查找成功,返回结点p的数据

六.单链表的插入与删除

1.单链表的插入:

(1)单链表第i个数据插入结点的算法思路:
  • 声明一指针p指向链表头结点,初始化j从1开始;
  • 当j<i时,就遍历链表,让p的指针向后移动,不断指向下一结点,j累加1;
  • 若到链表末尾p为空,则说明第i个结点不存在;
  • 否则查找成功,在系统中生成一个空结点s;
  • 将数据元素e赋值给s->data;
  • 单链表的插入标准语句s->next=p->next;p->next=s;
  • 返回成功

2.单链表的删除:

(1)单链表第i个数据删除结点的算法思路:
  • 声明一指针p指向链表头结点,初始化j从1开始;
  • 当j<i时,就遍历链表,让p的指针向后移动,不断指向下一个结点,j累加1;
  • 若到链表末尾p为空,则说明第i个结点不存在;
  • 否则查找成功,将欲删除的结点p->next赋值给q;
  • 单链表的删除标准语句p->next=q->next;
  • 将q结点中的数据赋值给e,作为返回;
  • 释放q结点;
  • 返回成功

七.单链表的整表创建

1.创建单链表的过程就是一个动态生成链表的过程

2.单链表整表创建的算法思路:

(1).声明一指针p和计数器变量i;
(2).初始化一空链表L;
(3).让L的头结点的指针指向NULL,即建立一个带头结点的单链表;
(4).循环:
  • 生成一新结点赋值给p;
  • 随机生成一数字赋值给p的数据域p->data;
  • 将p插入到头结点与前一新结点之间

八.单链表的整表删除

1.单链表整表删除的算法思路如下:

(1).声明一指针p和q;
(2).将第一个结点赋值给p;
(3).循环:
  • 将下一结点赋值给q;
  • 释放p;
  • 将q赋值给p

九.单链表结构与顺序存储结构优缺点

单链表结构与顺序存储结构优缺点

(1).若线性表需要频繁查找,很少进行插入和删除操作时,宜采用顺序存储结构。若需要频繁插入和删除时,宜采用单链表结构
(2).当线性表中的元素个数变化较大或者根本不知道有多大时,最好用单链表结构

十.静态链表

1.用数组描述的链表叫做静态链表,这种描述方法还有起名叫做游标实现法

2.静态链表的优缺点:

静态链表的优缺点

十一.循环链表

1.将单链表中终端结点的指针端由空指针改为指向头结点,就使整个单链表形成一个环,这种头尾相接的单链表称为单循环链表,简称循环链表

十二.双向链表

(1).双向链表(double linkedlist)是在单链表的每个结点中,再设置一个指向其前驱结点的指针域。所以在双向链表中的结点都有两个指针域,一个指向直接后继,另一个指向直接前驱。
(2).双向链表当然也可以是循环表

数据结构与算法(2)-算法

一.算法的定义

1.算法定义:

算法是解决特定问题求解步骤的描述,在计算机中表现为指令的有限序列,并且每条指令表示一个或多个操作

二.算法的特性

算法具有五个基本特性:输入、输出、有穷性、确定性和可行性

1.输入输出

算法具有零个或多个输入 ,至少有一个或多个输出

2.有穷性

有穷性:指算法在执行有限的步骤之后,自动结束而不会出现无限循环,并且每一个步骤在可接受的时间内完成

3.确定性

确定性:算法的每一步骤都具有确定的含义,不会出现二义性。算法在一定条件下,只有一条执行路径,相同的输入只能有唯一的输出结果。算法的每个步骤被精确定义而无歧义。

4.可行性

可行性:算法的每一步都必须是可行的,也就是说,每一步都能够通过执行有限次数完成。

三.算法设计的要求

1.正确性

(1)定义:

正确性,算法的正确性是指算法至少应该具有输入、输出和加工处理无歧义性、能正确反映问题的需求、能够得到问题的正确答案

(2)算法的“正确”大体分为四个层次:
  • 一是算法程序没有语法错误
  • 二是算法程序对于合法的输入数据能够产生满足要求的输出结果
  • 三是算法程序对于非法的输入数据能够得出满足规格说明的结果
  • 四是算法程序对于精心选择的,甚至刁难的测试数据都有满足要求的输出结果

2.可读性

可读性:算法设计的另一目的是为了便于阅读、理解和交流。

3.健壮性

健壮性:当输入数据不合法时,算法也能做出相关处理,而不是产生异常或莫名其妙的结果

4.时间效率高和存储量低

好的算法还应该具备时间效率高和存储量低的特点,设计算法应该尽量满足时间效率高和存储量低的需求

四.算法效率的度量方法

1.事后统计方法

事后统计方法:这种方法主要是通过设计好的测试程序和数据,利用计算机计时器对不同算法编制的程序的运行时间进行比较,从而确定算法效率的高低

这种方法有很大缺陷,通常不予采纳

2.事前估计分析方法

事前分析估算方法:在计算机程序编制前,依据统计方法对算法进行估算。

(1)一个用高级程序语言编写的程序在计算机上运行时所消耗的时间取决于下列因素:
  • 一是算法采用的策略、方法。
  • 二是编译产生的代码质量。
  • 三是问题的输入规模。
  • 四是机器执行指令的速度
(2)事前估算方法的理论依据,通过算法时间复杂度来估算算法时间效率

五.算法时间复杂度.

1.算法时间复杂度定义

(1)定义:

算法时间复杂度表示随问题规模n的增大,算法执行时间的增长率和f(n)的增长率相同,称作算法的渐近时间复杂度,简称为时间复杂度。其中f(n)是问题规模n的某个函数

(2)用大写O( )来体现算法时间复杂度的记法,我们称之为大O记法。

2.推导大O阶方法

推导大O阶方法:

(1)用常数1取代运行时间中的所有加法常数。
(2)在修改后的运行次数函数中,只保留最高阶项。
(3)如果最高阶项存在且不是1,则去除与这个项相乘的常数。

得到的结果就是大O阶。”

3.常数阶O(1)

1
2
3
int sum = 0,n = 100;      /* 执行一次 */
sum = (1 + n) * n / 2; /* 执行一次 */
printf("%d", sum); /* 执行一次 */

4.线性阶O(n)

1
2
3
4
5
int i;
for (i = 0; i < n; i++)
{
/* 时间复杂度为O(1)的程序步骤序列 */
}”
5.对数阶O(log2n)
1
2
3
4
5
6
int count = 1;
while (count < n)
{
count = count * 2;
/* 时间复杂度为O(1)的程序步骤序列 */
}

计算运算次数
由2x=n得到x=log2n

6.平方阶O(n2)

1
2
3
4
5
6
7
8
int i, j;
for (i = 0; i < n; i++)
{
for (j = 0; j < n; j++)
{
/* 时间复杂度为O(1)的程序步骤序列 */
}
}

六.常见的时间复杂度

执行次数 函数阶 非正式术语
15 O(1) 常数阶
2n+5 O(n) 线性阶
3n2+2n+1 O(n2) 平方阶
5log2n+20 O(logn) 对数阶
2n+3nlog2n+19 O(nlogn) nlog2n
6n3+5n2+2 O(n3) 立方阶
2n O(2n) 指数阶
(1)常用的时间复杂度所耗费的时间从小到大依次是:

O(1) < O(logn) < O(n) < O(nlogn) < O(n2) < O(n3) < O(2n) < O(n!)< O(nn)

七.最坏情况与平均情况

(1)平均运行时间是所有情况中最有意义的,因为它是期望的运行时间
(2)对算法的分析,一种方法是计算所有情况的平均值,这种时间复杂度的计算方法称为平均时间复杂度
(3)另一种方法是计算最坏情况下的时间复杂度,这种方法称为最坏时间复杂度。一般在没有特殊说明的情况下,都是指最坏时间复杂度

八.算法空间复杂度

(1).算法空间复杂度的计算公式记作:

S(n)=O(f(n))
其中,n为问题的规模,f(n)为语句关于n所占存储空间的函数

(2).通常,我们都使用“时间复杂度”来指运行时间的需求,使用“空间复杂度”指空间需求

数据结构与算法(1)-数据结构绪论

一.基本概念和术语

1.数据

(1)定义:

数据,是描述客观事物的符号,是计算机中可以操作的对象,是能被计算机识别,并输入给计算机处理的符号集合。数据不仅仅包括整型、实型等数值类型,还包括字符及声音、图像、视频等非数值类型

(2)这些符号必须具备两个前提:
  • 可以输入到计算机中
  • 能被计算机程序处理

2.数据元素

(1)定义:

数据元素,是组成数据的、有一定意义的基本单位,在计算机中通常作为整体处理。也被称为记录

3.数据项

(1)定义:

数据项,一个数据元素可以由若干个数据项组成

(2)数据项是数据不可分割的最小单位

4.数据对象

(1)定义:

数据对象,是性质相同的数据元素的集合,是数据的子集

(2)数据对象是数据的子集

在实际应用中,处理的数据元素通常具有相同性质,在不产生混淆的情况下,都将数据对象简称为数据

5.数据结构

(1)定义:

数据结构,是相互之间存在一种或多种特定关系的数据元素的集合

关系

二.逻辑结构与物理结构

    按照视点的不同,我们把数据结构分为逻辑结构和物理结构

1.逻辑结构

(1)定义:

逻辑结构,是指数据对象中数据元素之间的相互关系

(2)集合结构:

集合结构:集合结构中的数据元素除了同属于一个集合外,它们之间没有其他关系。各个数据元素是“平等”的,它们的共同属性是“同属于一个集合”。数据结构中的集合关系就类似于数学中的集合(如图所示)

集合结构

(3)线性结构:

线性结构:线性结构中的数据元素之间是一对一的关系(如图所示)

线性结构

(4)树形结构:

树形结构:树形结构中的数据元素之间存在一种一对多的层次关系(如图所示)

树形结构

(5)图形结构:

图形结构:图形结构的数据元素是多对多的关系(如图所示)

图形结构

2.物理结构

    物理结构:是指数据的逻辑结构在计算机中的存储形式

    数据元素的存储结构形式有两种:顺序存储和链式存储

(1)顺序存储结构

顺序存储结构:是把数据元素存放在地址连续的存储单元里,其数据间的逻辑关系和物理关系是一致的(如图所示)

顺序存储结构

(2)链式存储结构

链式存储结构:是把数据元素存放在任意的存储单元里,这组存储单元可以是连续的,也可以是不连续的。数据元素的存储关系并不能反映其逻辑关系,因此需要用一个指针存放数据元素的地址,这样通过地址就可以找到相关联数据元素的位置

链式存储结构

3.结构关系图

逻辑结构与物理结构

三.数据类型

1.数据类型

(1)定义:

数据类型,是指一组性质相同的值的集合及定义在此集合上的一些操作的总称

(2)数据类型是按照值的不同进行划分的。

在高级语言中,每个变量、常量和表达式都有各自的取值范围。类型就用来说明变量或表达式的取值范围和所能进行的操作

2.抽象数据类型

(1)定义:

抽象数据类型(Abstract Data Type,ADT),是指一个数学模型及定义在该模型上的一组操作。抽象数据类型的定义仅取决于它的一组逻辑特性,而与其在计算机内部如何表示和实现无关。

(2)一个抽象数据类型定义了:一个数据对象、数据对象中各数据元素之间的关系及对数据元素的操作

MapReduce实战:将统计结果按照手机归属地不同省份输出到不同文件中(Partitioner)

1.需求:

  • 将上次实战(统计手机号耗费的总上行流量和下行流量)的统计结果按照手机归属地不同省份输出到不同文件中(分区)

2.分析:

(1)Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。默认的分发规则为:根据key的hashcode%reducetask数来分发

(2)如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner

    自定义一个CustomPartitioner继承抽象类:Partitioner
(3)在job驱动中,设置自定义partitioner:

3.编写代码:

(1)在实战(统计手机号耗费的总上行流量和下行流量)需求的基础上,增加一个分区类

ProvincePartitioner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package phoneData;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 获取电话号码的前三位
String preNum = key.toString().substring(0, 3);

//注:如果设置的分区数小于下面的分区数,如3、则最后一个分区是混数据分区
//注:如何设置的分区数大于下面的分区数,如3
int partition = 4;

// 2 判断是哪个省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else {
partition = 3;
}
return partition;
}
}
(2)编写Mapper,Reducer,和实战(统计手机号耗费的总上行流量、下行流量、总流量(序列化))的Mapper,Reducer一样。
(3)编写驱动:

FlowsumDriver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package phoneData;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

//args = new String[]{"/Users/macbook/TestInfo/phone_data.txt", "/Users/macbook/TestInfo/MovlePhone2"};

// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowsumDriver.class);

// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(6);

// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
job.waitForCompletion(true);
}
}

4.运行结果:

运行结果

MapReduce实战-多表案例:Map端表合并(Distributedcache)

1.分析

    适用于关联表中有小表的情形;
    可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行合并并输出最终结果,可以大大提高合并操作的并发度,加快处理速度。

2.实战案例:

(1)先在驱动模块中添加缓存文件

DistributedCacheDriver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package MapJoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.net.URI;

public class DistributedCacheDriver {

public static void main(String[] args) throws Exception {

//args = new String[]{"/Users/macbook/TestInfo/mapjoin/order.txt","/Users/macbook/TestInfo/mapjoin3"};

// 1 获取job信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2 设置加载jar包路径
job.setJarByClass(DistributedCacheDriver.class);

// 3 关联map
job.setMapperClass(DistributedCacheMapper.class);

// 4 设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//这个地方添加的是小表的数据,不是setup

//job.addCacheFile(new URI("file:///f:/date/A/mapjoin/lov/pd.txt"));
//这里是hdfs上的pd.txt地址
job.addCacheFile(new URI("/pd.txt"));

//无reduce,设置为0
job.setNumReduceTasks(0);

// 8 提交
job.waitForCompletion(true);

}
}
(2)读取缓存的文件数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package MapJoin;


import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.*;
import java.util.HashMap;
import java.util.Map;

/**
* 学习点1:两个表join
* 学习点2:小表加缓存
* 学习点3:setup方法的使用
*/
public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

Map<String, String> pdMap = new HashMap<>();

//ctrl + o 选中两个方法

/**
* 初始化方法
* 把pd.txt加载进来
*
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//把pd.txt文件加载进来,这里是本地pd.txt地址
BufferedReader reader = new BufferedReader(new InputStreamReader(
new FileInputStream(new File("/Users/macbook/TestInfo/lov/pd.txt")), "UTF-8"));

String line;
//import org.apache.commons.lang.StringUtils; 导包注意,是common包,不是hadoop的
while (StringUtils.isNotEmpty(line = reader.readLine())) {
String[] fields = line.split("\t");
//产品id
String pid = fields[0];
//产品名字
String pname = fields[1];
pdMap.put(pid, pname);
}
reader.close();
}

Text k = new Text();

/**
* order.txt的处理
*
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
//转类型
String line = value.toString();
//切分
String[] fields = line.split("\t");

//订单id,产品id,产品数量
String orderId = fields[0];
String pid = fields[1];
String amount = fields[2];

//通过pid(key),拿到pname(value)
String pname = pdMap.get(pid);

//数据字段拼接
k.set(orderId + "\t" + pname + "\t" + amount);

context.write(k,NullWritable.get());
}
}

3.结果:

结果

MapReduce实战:将统计结果按照总流量倒序排序(全排序)

1.需求:

  • 根据需求1产生的结果再次对总流量进行排序

2.数据准备:

3.分析

(1)把程序分两步走,第一步正常统计总流量,第二步再把结果进行排序
(2)context.write(总流量,手机号)
(3)FlowBean实现WritableComparable接口重写compareTo方法
1
2
3
4
5
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

4.编写代码:

(1)编写FlowBean

FlowBean.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package phoneDataComparable;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

// 1 实现writable接口,因为要排序,所以这里使用的是WritableComparable
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
public class FlowBean implements WritableComparable<FlowBean> {

private long upFlow;
private long downFlow;
private long sumFlow;

public void set(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

/**
* 比较方法
*
* @param o
* @return
*/
@Override
public int compareTo(FlowBean o) {
//降序是-1,升序是1
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

/**
* 序列化
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

/**
* 反序列化
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
(2)编写Mapper:

FlowCountMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package phoneDataComparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//<sum,phoneNum> => <FlowBean,Text>
public class FlowCountMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

FlowBean k = new FlowBean();
Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

// 1 获取一行
String line = value.toString();

// 2 切割字段
String[] fields = line.split("\t");

// 3 封装对象
// 取出手机号码
String phoneNum = fields[0];
// 取出上行流量和下行流量
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);

k.set(upFlow, downFlow);
v.set(phoneNum);

// 4 写出
context.write(k, v);
}
}
(3)编写Reducer:

FlowCountReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package phoneDataComparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
//<Sum,电话号> ===> <电话号,Sum>
public class FlowCountReducer extends Reducer<FlowBean,Text , Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text text : values) {
context.write(text, key);
}
}
}
(4)编写驱动:

FlowsumDriver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package phoneDataComparable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowsumDriver {

public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

//args = new String[]{"/Users/macbook/TestInfo/MovlePhone1/","/Users/macbook/TestInfo/MovlePhoneCompare"};

// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowsumDriver.class);

// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
job.waitForCompletion(true);
}
}

5.运行结果:

运行结果

MapReduce实战:统计手机号耗费的总上行流量和下行流量

1.需求:

  • 统计每一个手机号耗费的总上行流量、下行流量、总流量

2.数据准备:

(1)输入数据格式:
1
时间戳、电话号码、基站的物理地址、访问网址的ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码

输入的数据

(2)最终输出的数据格式:
1
手机号码		上行流量        下行流量		总流量

输出的数据

3.分析:

基本思路:

(1)Map阶段:
  • (a)读取一行数据,切分字段
  • (b)抽取手机号、上行流量、下行流量
  • (c)以手机号为key,bean对象为value输出,即context.write(手机号,bean);
(2)Reduce阶段:
  • (a)累加上行流量和下行流量得到总流量。
  • (b)实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输
  • (c)MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key

所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable。
然后重写key的compareTo方法。

4.程序代码:

(1)编写流量统计的bean对象FlowBean

FlowBean.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package phoneData;

import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

// 1 实现writable接口
@Setter
@Getter
public class FlowBean implements Writable {
//上传流量
private long upFlow;
//下载流量
private long downFlow;
//流量总和
private long sumFlow;

//必须要有,反序列化要调用空参构造器
public FlowBean() {
}

public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

public void set(long upFlow, long downFlow){
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}


/**
* 序列化
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

/**
* 反序列化
* 注:字段属性顺序必须一致
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
(2)编写Mapper

FlowCountMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package phoneData;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* LongWritable, Text ===> Map输入 <偏移量,手机号>
* Text, FlowBean ======> Map的输出:<手机号、流量上传下载总和>
*/
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
Text k = new Text();
FlowBean v = new FlowBean();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取每一行数据
String line = value.toString();

//切割字段
//1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
String[] fields = line.split("\t");
//手机号
String phoneNum = fields[1];

//上传和下载 upFlow downFlow
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);

k.set(phoneNum);

context.write(k,new FlowBean(upFlow,downFlow));
}
}
(3)编写Reducer

FlowCountReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package phoneData;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
//上传和下载的总和初始化
long sum_upFlow = 0;
long sum_downFlow = 0;

// 1 遍历所用bean,将其中的上行流量,下行流量分别累加
for (FlowBean flowBean : values) {
//所有的上传的流量加在一起
sum_upFlow += flowBean.getUpFlow();
//所有的下载的流量加在一起
sum_downFlow += flowBean.getDownFlow();
}
// 2 封装对象
FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
// 3 写出
context.write(key, resultBean);
}
}
(4)编写驱动

FlowsumDriver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package phoneData;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

//args = new String[]{"/Users/macbook/TestInfo/phone_data.txt", "/Users/macbook/TestInfo/MovlePhone1"};

// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowsumDriver.class);

// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// job.setPartitionerClass(ProvincePartitioner.class);
// job.setNumReduceTasks(6);

// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
job.waitForCompletion(true);
}
}

5.运行结果:

(1)打包上传到hadoop集群:
(2)运行:
1
hadoop jar Hadoop-1.0-SNAPSHOT.jar phoneData.FlowSumDriver /phone_data.txt /out2

运行结果

MapReduce实战,简单清洗日志

1.情况,利用MapReduce将web.txt文件进行简单的清洗,去除脏数据

2.日志文件web.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:50:08 +0000] "-" 400 0 "-" "-"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
66.249.66.84 - - [18/Sep/2013:06:50:28 +0000] "GET /page/6/ HTTP/1.1" 200 27777 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
221.130.41.168 - - [18/Sep/2013:06:50:37 +0000] "GET /feed/ HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
157.55.35.40 - - [18/Sep/2013:06:51:13 +0000] "GET /robots.txt HTTP/1.1" 200 150 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)"
50.116.27.194 - - [18/Sep/2013:06:51:35 +0000] "POST /wp-cron.php?doing_wp_cron=1379487095.2510800361633300781250 HTTP/1.0" 200 0 "-" "WordPress/3.6; http://blog.fens.me"
58.215.204.118 - - [18/Sep/2013:06:51:35 +0000] "GET /nodejs-socketio-chat/ HTTP/1.1" 200 10818 "http://www.google.com/url?sa=t&rct=j&q=nodejs%20%E5%BC%82%E6%AD%A5%E5%B9%BF%E6%92%AD&source=web&cd=1&cad=rja&ved=0CCgQFjAA&url=%68%74%74%70%3a%2f%2f%62%6c%6f%67%2e%66%65%6e%73%2e%6d%65%2f%6e%6f%64%65%6a%73%2d%73%6f%63%6b%65%74%69%6f%2d%63%68%61%74%2f&ei=rko5UrylAefOiAe7_IGQBw&usg=AFQjCNG6YWoZsJ_bSj8kTnMHcH51hYQkAA&bvm=bv.52288139,d.aGc" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"

web.txt

2.代码

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.4</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>

<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
(2).LogBean代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package ETLLOG;

import lombok.*;

@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class LogBean {
//客户端IP
private String remote_addr;
//用户名称 忽略<->
private String remote_user;
//时间
private String time_local;
//URL请求
private String request;
//返回状态码
private String status;
//文件内容大小
private String body_bytes_sent;
//链接页面
private String http_referer;
//浏览的相关信息
private String http_user_agent;

//判断数据是否合法
private boolean valid = true;

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\001").append(this.remote_addr);
sb.append("\001").append(this.remote_user);
sb.append("\001").append(this.time_local);
sb.append("\001").append(this.request);
sb.append("\001").append(this.status);
sb.append("\001").append(this.body_bytes_sent);
sb.append("\001").append(this.http_referer);
sb.append("\001").append(this.http_user_agent);

return sb.toString();
}
}
(3).LogDriver代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package ETLLOG;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogDriver {
public static void main(String[] args) throws Exception {

//获取配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

//加载反射类
job.setJarByClass(LogDriver.class);
job.setMapperClass(LoggerMapper.class);
//job.setReducerClass();

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

//输入数据和输出数据的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//job提交
job.waitForCompletion(true);
}
}
(4).LogMapper代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package ETLLOG2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LoggerMapper extends Mapper<LongWritable,Text,Text,NullWritable>{

Text k = new Text();
@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
//1.Text ===> String
String line = value.toString();

//2.解析日志
LogBean bean = parseLog(line);

//3.判断非法数据
if (!bean.isValid()) {
return;
}

k.set(bean.toString());

//4.输出
context.write(k,NullWritable.get());
}

/**
* 解析日志
*
* @param line
* @return
*/
private LogBean parseLog(String line) {
LogBean logBean = new LogBean();

//1.截取后的字段属性
String[] fields = line.split(" ");
//筛选条件1:字段长度大于11
if (fields.length > 11) {
//2.封装数据
logBean.setRemote_addr(fields[0]);
logBean.setRemote_user(fields[1]);
logBean.setTime_local(fields[3].substring(1));
logBean.setRequest(fields[6]);
logBean.setStatus(fields[8]);
logBean.setBody_bytes_sent(fields[9]);
logBean.setHttp_referer(fields[10]);

//如果字段长度大于12,就拼接浏览器来源
//筛选条件2:如果有浏览器来源就拼接上,看是否大于12
if (fields.length > 12) {
logBean.setHttp_user_agent(fields[11] + " " + fields[12]);
}else {
//浏览的相关信息
logBean.setHttp_user_agent(fields[11]);
}

//判断状态码
//筛选条件3:状态码大于等于400,是非法数据
if (Integer.parseInt(logBean.getStatus()) >= 400) {
logBean.setValid(false);
}
}else {
logBean.setValid(false);
}
return logBean;
}
}

3.运行

(1)将程序打包,上传到hadoop集群,将web.txt发送到hdfs
(2)运行:
1
hadoop jar Hadoop-1.0-SNAPSHOT.jar ETLLOG.LogDriver /web.txt /out

在这里插入图片描述

(3)运行结果

在这里插入图片描述

MapReduce实战:第一个wordcount程序

1.运行环境:

  • linux Hadoop集群
  • IDEA
  • jdk8

2.在IDEA中创建项目

(1)创建项目,添加依赖:

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.4</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>

<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
(2)map代码:

WordCountMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package WordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {

@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {

//1 将maptask传给我们的文本内容先转换成String
String line = value.toString();

// 2 根据空格将这一行切分成单词
// new String[][I,wish,to,wish]
String[] words = line.split(" ");

// 3 将单词输出为<单词,1>
for(String word:words){
context.write(new Text(word),new IntWritable(1));
}

}
}

(3)reduce代码

WordCountReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package WordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

@Override
protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int count =0;

// 1 汇总各个key的个数
for(IntWritable value:values){
count+= value.get();
}

// 2输出该key的总次数
context.write(key,new IntWritable(count));
}
}
(4)partitioner代码

WordCoutPartitioner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package WordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {

// 1.获取单词key
String word = key.toString();
// 2.计算单词长度
int length=word.getLength();
// 3.按照规则自定义分区
if(length%2==0){
return 1;
}else{
return 0;
}
}
}
(5)driver代码:

WordCountDriver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package WordCount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* @ClassName WordCountDriver
* @MethodDesc: TODO WordCountDriver功能介绍
* @Author Movle
* @Date 5/6/20 10:07 上午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/


public class WordCountDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//args = new String[]{"/Users/macbook/TestInfo/a.txt","/Users/macbook/TestInfo/WC"};

//1.获取配置信息,或job对象实例
Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

//map输出的k,v,reduce的输入k,v
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//reduce输出的k,v
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//设置分区,也要设置运算分区的reduce 的task数,因为分区数是2,所以这里设置为2
job.setPartitionerClass(WordCountPartitioner.class);
job.setNumReduceTasks(2);

//指定job的输入原始文件所在目录,以及输出目录
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));

//提交job,waitForCompletion包含job.submit
job.waitForCompletion(true);

}
}

3.运行:

(1).打包:
(2).上传到hadoop集群:

在这里插入图片描述

(3)执行

注意:下面的路径是hdfs路径

1
hadoop jar Hadoop-1.0-SNAPSHOT.jar WordCount.WordCountDriver /a.txt /out

运行

(4)结果:

在这里插入图片描述
结果

  • Copyrights © 2015-2021 Movle
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信