Fork me on GitHub

Hive安装部署

1.前提:

  • mysql已安装
  • hadoop集群已配置

2.步骤

(1)把apache-hive-1.2.1-bin.tar.gz上传到linux的/opt/software目录下
(2)解压apache-hive-1.2.1-bin.tar.gz到/opt/module/目录下面
1
tar -zxvf apache-hive-1.2.1-bin.tar.gz -C /opt/module/
(3)修改apache-hive-1.2.1-bin的名称为hive-1.2.1
1
2
3
cd /opt/module

mv apache-hive-1.2.1-bin/ hive-1.2.1
(4)修改环境变量:
1
vi /etc/profile

添加内容:

1
2
export HIVE_HOME=/opt/module/hive-1.2.1
export PATH=$PATH:$HIVE_HOME/bin

使环境变量生效:

1
source /etc/profile
(5)修改/opt/module/hive/conf目录下的hive-env.sh.template名称为hive-env.sh
1
mv hive-env.sh.template hive-env.sh
(6)配置hive-env.sh文件
1
vi hive-env.sh

配置HADOOP_HOME路径

1
2
export HADOOP_HOME=/opt/module/hadoop-2.8.4
export HIVE_CONF_DIR=/opt/module/hive-1.2.1/conf

(7)注:Hive的log默认存放在/tmp/itstar/hive.log目录下(当前用户名下)。
(a)修改hive的log存放日志到/opt/module/hive/logs
(b)修改conf/hive-log4j.properties.template文件名称为hive-log4j.properties
1
2
3
pwd

mv hive-log4j.properties.template hive-log4j.properties

image.png

(c )在hive-log4j.properties文件中修改log存放位置
1
2
3
cd /opt/module/hive-1.2.1

mkdir logs

1
2
3
cd /opt/module/hive-1.2.1/conf

vi hive-log4j.properties

修改内容:

1
hive.log.dir=/opt/module/hive-1.2.1/logs

(8)Hadoop 配置
(a)必须启动hdfs和yarn
1
2
3
sbin/start-dfs.sh

sbin/start-yarn.sh
(b)在HDFS上创建/tmp和/user/hive/warehouse两个目录并修改他们的同组权限可写
1
2
3
4
5
hadoop fs -mkdir /tmp
hadoop fs -mkdir -p /user/hive/warehouse

hadoop fs -chmod g+w /tmp
hadoop fs -chmod g+w /user/hive/warehouse

image.png

(9).将hive元数据配置到mysql
(a)将mysql-connector-java-5.1.27.jar上传到/opt/module/hive-1.2.1/lib中:

(b)创建hive-site.xml
1
2
3
4
5
cd /opt/module/hive-1.2.1/conf

touch hive-site.xml

vi hive-site.xml

添加内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop1:3306/metastore1?createDatabaseIfNotExist=true&amp;characterEncoding=utf-8&amp;useSSL=false</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>

<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>

<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>

<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>000000</value>
<description>password to use against metastore database</description>
</property>
</configuration>

注意:最好是在mysql所在的服务器中安装hive

(c ).配置完毕后,如果启动hive异常,可以重新启动虚拟机。(重启后,别忘了启动hadoop集群)
(d)在hive的bin目录下执行
1
./schematool -dbType mysql -initSchema


创建成功

(10).添加其他配置信息:

命令:

1
vi /opt/module/hive-1.2.1/conf/hive-site.xml

添加内容:

(a)修改default数据仓库原始位置
1
2
3
4
5
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
(b)添加如下配置信息,就可以实现显示当前数据库,以及查询表的头信息配置
1
2
3
4
5
6
7
8
9
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>

<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>

配置
配置后启动hive

Hive基本概念

一.Hive基本概念:

1. Hive

(1).官网:http://hive.apache.org/
(2).Apache HiveTM数据仓库软件有助于使用SQL读取,编写和管理驻留在分布式存储中的 大型数据集。可以将结构投影到已存储的数据中。提供了命令行工具和JDBC驱动程序以 将用户连接到Hive。
(3).hive提供了SQL查询功能 hdfs分布式存储
(4).hive本质HQL转化为MapReduce程序。
(5).环境前提:
  • 启动hdfs集群
  • 启动yarn集群

如果想用hive的话,需要提前安装部署好hadoop集群。

2.为什么要学习Hive

(1).简化开发
(2).优势:
(a)操作接口采用类SQL语法,提供快速开发的能力(简单、容易上手)
(b)避免了去写MapReduce,减少开发人员的学习成本。
(c )Hive的执行延迟比较高,因此Hive常用于数据分析,对实时性要求不高的场合
(d)Hive优势在于处理大数据,对于处理小数据没有优势,因为Hive的执行延迟比较高
(e)Hive支持用户自定义函数,用户可以根据自己的需求来实现自己的函数。

3.劣势:

(1)Hive的HQL表达能力有限
  • 迭代式算法无法表达
  • 数据挖掘方面不擅长
(2)Hive的效率比较低
  • Hive自动生成的MapReduce作业,通常情况下不够智能化
  • Hive调优比较困难,粒度较粗

3.Hive架构原理:

sql->转换->mapreduce->job

Hive架构原理

Hive架构

    如图中所示,Hive通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口。

(1).用户接口:Client

CLI(hive shell)、JDBC/ODBC(java访问hive)、WEBUI(浏览器访问hive)

(2).元数据:Metastore

    元数据包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;
    默认存储在自带的derby数据库中,推荐使用MySQL存储Metastore

(3).Hadoop

    使用HDFS进行存储,使用MapReduce进行计算。

(4).驱动器:Driver
(a)解析器(SQL Parser):

将SQL字符串转换成抽象语法树AST,这一步一般都用第三方工具库完成,比如antlr;对AST进行语法分析,比如表是否存在、字段是否存在、SQL语义是否有误。

(b)编译器(Physical Plan):

将AST编译生成逻辑执行计划。

(c )优化器(Query Optimizer):

对逻辑执行计划进行优化。

(d)执行器(Execution):

把逻辑执行计划转换成可以运行的物理计划。对于Hive来说,就是MR/Spark。

4.Hive和数据库比较

    由于 Hive 采用了类似SQL 的查询语言 HQL(Hive Query Language),因此很容易将 Hive 理解为数据库。其实从结构上来看,Hive 和数据库除了拥有类似的查询语言,再无类似之处。本文将从多个方面来阐述 Hive 和数据库的差异。数据库可以用在 Online 的应用中,但是Hive 是为数据仓库而设计的,清楚这一点,有助于从应用角度理解 Hive 的特性。

(1).查询语言

    由于SQL被广泛的应用在数据仓库中,因此,专门针对Hive的特性设计了类SQL的查询语言HQL。熟悉SQL开发的开发者可以很方便的使用Hive进行开发。

(2).数据存储位置

    Hive 是建立在 Hadoop 之上的,所有 Hive 的数据都是存储在 HDFS 中的。而数据库则可以将数据保存在块设备或者本地文件系统中。

(3). 数据更新

    由于Hive是针对数据仓库应用设计的,而数据仓库的内容是读多写少的。因此,Hive中不支持对数据的改写和添加,所有的数据都是在加载的时候确定好的。而数据库中的数据通常是需要经常进行修改的,因此可以使用 INSERT INTO … VALUES 添加数据,使用 UPDATE … SET修改数据。

(4). 索引

    Hive在加载数据的过程中不会对数据进行任何处理,甚至不会对数据进行扫描,因此也没有对数据中的某些Key建立索引。Hive要访问数据中满足条件的特定值时,需要暴力扫描整个数据,因此访问延迟较高。由于 MapReduce 的引入, Hive 可以并行访问数据,因此即使没有索引,对于大数据量的访问,Hive 仍然可以体现出优势。数据库中,通常会针对一个或者几个列建立索引,因此对于少量的特定条件的数据的访问,数据库可以有很高的效率,较低的延迟。由于数据的访问延迟较高,决定了 Hive 不适合在线数据查询。

(5). 执行

    Hive中大多数查询的执行是通过 Hadoop 提供的 MapReduce 来实现的。而数据库通常有自己的执行引擎。

(6). 执行延迟

    Hive 在查询数据的时候,由于没有索引,需要扫描整个表,因此延迟较高。另外一个导致 Hive 执行延迟高的因素是 MapReduce框架。由于MapReduce 本身具有较高的延迟,因此在利用MapReduce 执行Hive查询时,也会有较高的延迟。相对的,数据库的执行延迟较低。当然,这个低是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。

(7).可扩展性

    由于Hive是建立在Hadoop之上的,因此Hive的可扩展性是和Hadoop的可扩展性是一致的(世界上最大的Hadoop 集群在 Yahoo!,2009年的规模在4000 台节点左右)。而数据库由于 ACID 语义的严格限制,扩展行非常有限。目前最先进的并行数据库Oracle在理论上的扩展能力也只有100台左右。

(8). 数据规模

    由于Hive建立在集群上并可以利用MapReduce进行并行计算,因此可以支持很大规模的数据;对应的,数据库可以支持的数据规模较小。

二.Hive数据类型

1.基本数据类型

Hive数据类型 Java数据类型 长度 例子
TINYINT byte 1byte有符号整数 20
SMALINT short 2byte有符号整数 20
INT int 4byte有符号整数 20
BIGINT long 8byte有符号整数 20
BOOLEAN boolean 布尔类型,true或者false TRUE FALSE
FLOAT float 单精度浮点数 3.14159
DOUBLE double 双精度浮点数 3.14159
STRING string 字符系列。可以指定字符集。可以使用单引号或者双引号 ‘now is the time’ “for all good men”
TIMESTAMP 时间类型
BINARY 字节数组

    对于Hive的String类型相当于数据库的varchar类型,该类型是一个可变的字符串,不过它不能 其中最多能存储多少个字符,理论上它可以存储2GB的字符数。

2.集合数据类型

数据类型 描述 语法示例
STRUCT 和c语言中的struct类似,都可以通过“点”符号访问元素内容。例如,如果某个列的数据类型是STRUCT{first STRING, last STRING},那么第1个元素可以通过字段.first来引用 struct()
MAP MAP是一组键-值对元组集合,使用数组表示法可以访问数据。例如,如果某个列的数据类型是MAP,其中键->值对是’first’->’John’和’last’->’Doe’,那么可以通过字段名[‘last’]获取最后一个元素 map()
ARRAY 数组是一组具有相同类型和名称的变量的集合。这些变量称为数组的元素,每个数组元素都有一个编号,编号从零开始。例如,数组值为[‘John’, ‘Doe’],那么第2个元素可以通过数组名[1]进行引用 Array()

    Hive有三种复杂数据类型ARRAY、MAP 和 STRUCT。ARRAY和MAP与Java中的Array和Map类似,而STRUCT与C语言中的Struct类似,它封装了一个命名字段集合,复杂数据类型允许任意层次的嵌套。

1
create table heheee(`来试试` string,`来试试1` string,`来试试2` string) row format delimited fields terminated by '\t';

案例实操

(1)假设某表有如下一行,我们用JSON格式来表示其数据结构。在Hive下访问的格式为
1
2
3
4
5
6
7
8
9
10
11
12
{
"name": "songsong",
"friends": ["bingbing" , "lili"] , //列表Array,
"children": { //键值Map,
"xiao song": 18 ,
"xiaoxiao song": 19
}
"address": { //结构Struct,
"street": "hai dian qu" ,
"city": "beijing"
}
}
(2)基于上述数据结构,我们在Hive里创建对应的表,并导入数据。

创建本地测试文件test.txt

1
2
3
cd /opt/module/datas/

touch text.txt

添加内容:

1
2
3
songsong,bingbing_lili,xiao song:18_xiaoxiao song:19,hai dian qu_beijing

yangyang,caicai_susu,xiao yang:18_xiaoxiao yang:19,chao yang_beijing

    注意,MAP,STRUCT和ARRAY里的元素间关系都可以用同一个字符表示,这里用“_”。

(3)Hive上创建测试表test
1
2
3
4
5
6
7
8
9
10
create table test(
name string,
friends array<string>,
children map<string, int>,
address struct<street:string, city:string>
)
row format delimited fields terminated by ','
collection items terminated by '_'
map keys terminated by ':'
lines terminated by '\n';

字段解释:

1
2
3
4
row format delimited fields terminated by ','     -- 列分隔符
collection items terminated by '_' --MAP STRUCT 和 ARRAY 的分隔符(数据分割符号)
map keys terminated by ':' -- MAP中的key与value的分隔符
lines terminated by '\n'; -- 行分隔符
(4)导入文本数据到测试表
1
load data local inpath '/opt/module/datas/text.txt' into table test;
(5)访问三种集合列里的数据,以下分别是ARRAY,MAP,STRUCT的访问方式
1
select friends[1],children['xiao song'],address.city from test where name="songsong";

如果起别名:

1
select friends[1] as pengyou from test where name="songsong";

3.类型转化

    Hive的原子数据类型是可以进行隐式转换的,类似于Java的类型转换,例如某表达式使用INT类型,TINYINT会自动转换为INT类型,但是Hive不会进行反向转化,例如,某表达式使用TINYINT类型,INT不会自动转换为TINYINT类型,它会返回错误,除非使用CAST操作。

(1).隐式类型转换规则如下。
(a)任何整数类型都可以隐式地转换为一个范围更广的类型,如TINYINT可以转换成INT,INT可以转换成BIGINT。
(b)所有整数类型、FLOAT和STRING类型都可以隐式地转换成DOUBLE。
(c )TINYINT、SMALLINT、INT都可以转换为FLOAT。
(d)BOOLEAN类型不可以转换为任何其它的类型。
(2).可以使用CAST操作显示进行数据类型转换,

    例如CAST(‘1’ AS INT)将把字符串’1’ 转换成整数1;如果强制类型转换失败,如执行CAST(‘X’ AS INT),表达式返回空值 NULL。

Flume拦截器-自定义拦截器

1.写一个小写字母变成大写的拦截器;

2.编写代码:

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<dependencies>
<!-- flume核心依赖 -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
(2)拦截器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class MyInterceptor implements Interceptor {
@Override
public void initialize() {
}

@Override
public void close() {
}

/**
* 拦截source发送到通道channel中的消息
*
* @param event 接收过滤的event
* @return event 根据业务处理后的event
*/
@Override
public Event intercept(Event event) {
// 获取事件对象中的字节数据
byte[] arr = event.getBody();
// 将获取的数据转换成大写
event.setBody(new String(arr).toUpperCase().getBytes());
// 返回到消息中
return event;
}
// 接收被过滤事件集合
@Override
public List<Event> intercept(List<Event> events) {
List<Event> list = new ArrayList<>();
for (Event event : events) {
list.add(intercept(event));
}
return list;
}

public static class Builder implements Interceptor.Builder {
// 获取配置文件的属性
@Override
public Interceptor build() {
return new MyInterceptor();
}

@Override
public void configure(Context context) {

}
}

3.使用Maven做成Jar包,在flume的目录下mkdir jar,上传此jar到jar目录中

4.新建flume配置文件:

(1)ToUpCase.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#1.agent 
a1.sources = r1
a1.sinks =k1
a1.channels = c1


# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/plus
a1.sources.r1.interceptors = i1
#全类名$Builder
a1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop2:9000/ToUpCase1
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.运行:

1
bin/flume-ng agent -c conf/ -n a1 -f jar/ToUpCase.conf -C jar/Flume-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console

Flume拦截器-正则抽取拦截器

1.extractor.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/plus
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
# hostname is bigdata111 ip is 192.168.20.111
a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
#hostname(自定义)= (.*?)->bigdata111
a1.sources.r1.interceptors.i1.serializers.s1.name = hostname
#ip(自定义) = (.*)->192.168.20.111
a1.sources.r1.interceptors.i1.serializers.s2.name = ip

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.启动命令:

1
bin/flume-ng agent -c conf/ -f jobconf/extractor.conf -n a1 -Dflume.root.logger=INFO,console

Flume拦截器-正则过滤拦截器

1.filter.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/plus
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^A.*
#如果excludeEvents设为false,表示过滤掉不是以A开头的events。如果excludeEvents设为true,则表示过滤掉以A开头的events。
a1.sources.r1.interceptors.i1.excludeEvents = true

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.启动命令:

1
bin/flume-ng agent -c conf/ -f jobconf/filter.conf -n a1 -Dflume.root.logger=INFO,console

Flume拦截器-查询替换拦截器

1.search.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/plus
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace

#遇到数字改成itstar,A123会替换为Aitstar
a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
a1.sources.r1.interceptors.i1.replaceString = ***
a1.sources.r1.interceptors.i1.charset = UTF-8

#3 sink
a1.sinks.k1.type = logger

#4 Chanel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#5 bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.启动命令:

1
bin/flume-ng agent -c conf/ -f jobconf/search.conf -n a1 -Dflume.root.logger=INFO,console

Flume拦截器-UUID拦截器

1.uuid.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/plus
a1.sources.r1.interceptors = i1
#type的参数不能写成uuid,得写具体,否则找不到类
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
#如果UUID头已经存在,它应该保存
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.interceptors.i1.prefix = UUID_

#如果sink类型改为HDFS,那么在HDFS的文本中没有headers的信息数据
a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.启动命令:

1
bin/flume-ng agent -c conf/ -f jobconf/uuid.conf -n a1 -Dflume.root.logger==INFO,console

Flume拦截器-主机名拦截器

0.功能作用:

将时间戳放到event的header(Map<key,value>)

1.Host.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#1.定义agent
a1.sources= r1
a1.sinks = k1
a1.channels = c1

#2.定义source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/plus
#拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

#参数为true时用IP192.168.1.111,参数为false时用主机名,默认为true
a1.sources.r1.interceptors.i1.useIP = false
a1.sources.r1.interceptors.i1.hostHeader = agentHost

#3.定义sinks
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop2:9000/flumehost/%{agentHost}
a1.sinks.k1.hdfs.filePrefix = plus_%{agentHost}
#往生成的文件加后缀名.log
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.启动命令:

1
bin/flume-ng agent -c conf/ -f jobconf/host.conf -n a1 -Dflume.root.logger=INFO,console

Flume拦截器-时间戳拦截器

0.功能作用:

将时间戳放到event的header(Map<key,value>)

1.Timestamp.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#1.定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1

#2.具体定义source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /opt/module/flume-1.8.0/upload

#定义拦截器,为文件最后添加时间戳
a4.sources.r1.interceptors = timestamp
a4.sources.r1.interceptors.timestamp.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100


#具体定义sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://hadoop2:9000/flume-interceptors/%H
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream

#不按照条数生成文件
a4.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件达到128M时生成一个文件
a4.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件达到60秒生成一个文件
a4.sinks.k1.hdfs.rollInterval = 60

#组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1

2.启动命令:

1
bin/flume-ng agent -c conf/ -f jobconf/uuid.conf -n a4 -Dflume.root.logger==INFO,console

Flume的使用:扇入-flume与flume之间数据传递,多flume汇总数据到单flume

1.图示

扇入

2.新建配置文件:

(1)flume-1.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 2 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop2
a1.sources.r1.port = 55555

#3 sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop4
a1.sinks.k1.port = 4141

# 4 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 5 Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(2)flume-2.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# 2 source
a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /opt/plus
a2.sources.r1.shell = /bin/bash -c

# 3 sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop4
a2.sinks.k1.port = 4141

# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# 5. Bind
a2.sources.tail-file.channels = c1
a2.sinks.avro4.channel = c1
(3)flume-3.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop4
a3.sources.r1.port = 4141

# 3 sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://hadoop2:9000/flume3/%H
#上传文件的前缀
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照时间滚动文件夹
a3.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余数
a3.sinks.k1.hdfs.minBlockReplicas = 1

# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

3.测试

(1)依次启动flume-3.conf,flume-2.conf,flume-1.conf
(2)改变文件,查看结果
  • Copyrights © 2015-2021 Movle
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信