Fork me on GitHub

Spark实战-用Scala编写WordCount程序

一.添加pom依赖:

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<build>
<plugins>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.11.4</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>

二.编写代码:

1.本地模式:

WordCount.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package WordCoutScala

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object WordCount {

//定义主方法
def main(args: Array[String]): Unit = {

//创建SparkConf对象
//如果Master是local,表示运行在本地模式上,即可以在开发工具中直接运行
//如果要提交到集群中运行,不需要设置Master
//集群模式
//val conf = new SparkConf().setAppName("My Scala Word Count")

//本地模式
val conf = new SparkConf().setAppName("My Scala Word Count").setMaster("local")

//创建SparkContext对象
val sc = new SparkContext(conf)

val result = sc.textFile("hdfs://192.168.1.120:9000/TestFile/test_WordCount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)

result.foreach(println)

//集群模式
// val result = sc.textFile(args(0))
// .flatMap(_.split(" "))
// .map((_,1))
// .reduceByKey(_+_)
// .saveAsTextFile(args(1))

sc.stop()
}
}

结果

2.集群模式:

(1)编写WordCount.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object WordCount {

//定义主方法
def main(args: Array[String]): Unit = {

//创建SparkConf对象
//如果Master是local,表示运行在本地模式上,即可以在开发工具中直接运行
//如果要提交到集群中运行,不需要设置Master
//集群模式
val conf = new SparkConf().setAppName("My Scala Word Count")

//本地模式
//val conf = new SparkConf().setAppName("My Scala Word Count").setMaster("local")

//创建SparkContext对象
val sc = new SparkContext(conf)

// val result = sc.textFile("hdfs://192.168.1.120:9000/TestFile/test_WordCount.txt")
// .flatMap(_.split(" "))
// .map((_,1))
// .reduceByKey(_+_)
//
// result.foreach(println)

//集群模式
val result = sc.textFile(args(0))
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile(args(1))

sc.stop()
}
}
(2)打包

打包

(3)上传到Spark节点:

上传

(4)运行:
1
bin/spark-submit --master spark://hadoop:7077 --class WordCoutScala.WordCount /opt/TestFile/ScalaProject-1.0-SNAPSHOT.jar hdfs://hadoop:9000/TestFile/test_WordCount.txt hdfs://hadoop:9000/output/1209/demo1

运行

(5)结果:

image.png

Spark实战-在Spark Shell中开发一个wordcount程序

1.读取一个本地文件,将结果打印到屏幕上。

注意:示例必须只有一个worker 且本地文件与worker在同一台服务器上。

1
2
3
4
5
6
7
8
scala> sc.textFile("/opt/TestFile/test_WordCount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
```
![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly91cGxvYWQtaW1hZ2VzLmppYW5zaHUuaW8vdXBsb2FkX2ltYWdlcy80MzkxNDA3LWUwZmVmMTQwZDMzOTYzZjQucG5n?x-oss-process=image/format,png)
#### 2.读取一个hdfs文件,进行WordCount操作,并将结果写回hdfs
```shell
scala> sc.textFile("hdfs://hadoop:9000/TestFile/test_WordCount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hadoop:9000/output/1208")

[root@hadoop sbin]# hadoop dfs -ls /output/1208

输入命令
结果

3.单步运行WordCount —–> RDD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala> val rdd1 = sc.textFile("/opt/TestFile/test_WordCount.txt")

scala> rdd1.collect

scala> val rdd2 = rdd1.flatMap(_.split(" "))

scala> rdd2.collect

scala> val rdd3 = rdd2.map((_,1))

scala> rdd3.collect

scala> val rdd4 = rdd3.reduceByKey(_+_)

scala> rdd4.collect

(1)RDD说明:RDD 弹性分布式数据集
(2)特性:
(a)依赖关系:rdd2依赖于rdd1
(b)算子
  • Transformation 延时计算: flatMap,map,reduceByKey
  • Action 触发计算:collect

Spark SQL:性能优化

1.在内存中缓存数据

    性能调优主要是将数据放入内存中操作。通过spark.cacheTable(“tableName”)或者dataFrame.cache()。使用spark.uncacheTable(“tableName”)来从内存中去除table。

Demo案例:

(1)从Oracle数据库中读取数据,生成DataFrame
1
2
3
4
5
val mysqlDF = spark.read.format("jdbc")
.option("url","jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8")
.option("dbtable","company.emp")
.option("user","root")
.option("password","000000").load
1
scala> val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","000000").option("dbtable","emp").option("driver","com.mysql.jdbc.Driver").load
(2)将DataFrame注册成表:
1
mysqlDF.registerTempTable("emp")

注意:必须注册成一张表,才可以缓存

(3)执行查询,并通过Web Console监控执行的时间
1
spark.sql("select * from emp").show

(4)将表进行缓存,并查询两次,并通过Web Console监控执行的时间
1
2
3
spark.sqlContext.cacheTable("emp")

spark.sql("select * from emp").show

(5)清空缓存:
1
2
3
spark.sqlContext.cacheTable("emp")

spark.sqlContext.clearCache

2.性能优化相关参数

(1)将数据缓存到内存中的相关优化参数
1
2
3
4
5
6
7
spark.sql.inMemoryColumnarStorage.compressed
//默认为 true
//Spark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式。

spark.sql.inMemoryColumnarStorage.batchSize
//默认值:10000
//缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险。
(2)其他性能相关的配置选项(不过不推荐手动修改,可能在后续版本自动的自适应修改)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spark.sql.files.maxPartitionBytes
//默认值:128 MB
//读取文件时单个分区可容纳的最大字节数

spark.sql.files.openCostInBytes
//默认值:4M
//打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)。

spark.sql.autoBroadcastJoinThreshold
//默认值:10M
//用于配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为 -1 可以禁用广播。注意,当前数据统计仅支持已经运行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。

spark.sql.shuffle.partitions
//默认值:200
//用于配置 join 或聚合操作混洗(shuffle)数据时使用的分区数。

Spark SQL:使用数据源

    在Spark SQL中,可以使用各种各样的数据源进行操作。Spark SQL 用于处理结构化的数据

一.通用的Load/Save函数(load函数式加载数据,save函数式存储数据)

   注意:使用load或者save函数时,默认的数据源都是 Parquet文件。列式存储文件

1.通用的Load/Save函数

(1)读取Parquet文件
1
val usersDF = spark.read.load("/root/resources/users.parquet")
(2)查询Schema和数据
1
2
3
scala> usersDF.printSchema

scala> usersDF.show

(3)查询用户的name和喜爱颜色,并保存
1
usersDF.select($"name",$"favorite_color").write.save("/root/result/parquet")
(4)验证结果
1
2
3
4
5
scala >val testResult = spark.read.load("/root/result/parquet/part-00000-8ffaac2e-aa81-4e63-89aa-15a8e4948a37.snappy.parquet")

scala >testResult.printSchema

scala >testResult.show

2.显式指定文件格式:加载json格式

(1)直接加载:
1
val usersDF = spark.read.load("/root/resources/people.json")  //会出错

上面这个会出错

1
val usersDF = spark.read.format("json").load("/root/resources/people.json")

3.存储模式(Save Modes)

    可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表:


Demo:

1
usersDF.select($"name").write.save("/root/result/parquet1")

上面出错:因为/root/result/parquet1已经存在

1
usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")

4.将结果保存为表

1
usersDF.select($"name").write.saveAsTable("table1")

也可以进行分区、分桶等操作:partitionBy、bucketBy

二.Parquet文件(列式存储文件,是Spark SQL默认的数据源)

1.什么是parquet文件?

(1)Parquet是列式存储格式的一种文件类型,列式存储有以下的核心:
  • 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
  • 压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。
  • 只读取需要的列,支持向量运算,能够获取更好的扫描性能。
  • Parquet格式是Spark SQL的默认数据源,可通过spark.sql.sources.default配置
(2)Parquet是一个列格式而且用于多个数据处理系统中。

Spark SQL提供支持对于Parquet文件的读写,也就是自动保存原始数据的schema。当写Parquet文件时,所有的列被自动转化为nullable,因为兼容性的缘故。

2.把其他文件,转换成Parquet文件()

    读入json格式的数据,将其转换成parquet格式,并创建相应的表来使用SQL进行查询。

1
2
3
scala >val empDF = spark.read.json("/opt/module/datas/TestFile/emp.json")

scala >empDF.show

1
2
3
4
5
6
7
8
scala >empDF.write.mode("overwrite").save("/opt/module/datas/TestFile/myresult/parquet")

//save和parquet都可以写入,是一样的
scala >empDF.write.mode("overwrite").parquet("/opt/module/datas/TestFile/myresult/parquet")

scala >val emp1=spark.read.parquet("/opt/module/datas/TestFile/myresult/parquet")

scala >emp1.show

1
2
3
scala >emp1.createOrReplaceTempView("emptable")

scala >spark.sql("select * from emptable where deptno =10 and sal >1500").show

3.支持Schema的合并:

    Parquet支持Schema evolution(Schema演变,即:合并)。用户可以先定义一个简单的Schema,然后逐渐的向Schema中增加列描述。通过这种方式,用户可以获取多个有不同Schema但相互兼容的Parquet文件。
Demo:

1
2
3
4
5
6
7
8
9
val df1= sc.makeRDD(1 to 5).map(i => (i,i*2)).toDF("single","double")

df1.show

sc.makeRDD(1 to 5)

sc.makeRDD(1 to 5).collect

df1.write.parquet("/opt/module/datas/TestFile/test_table/key=1")

1
2
3
4
5
val df2 = sc.makeRDD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")

df2.show

df2.write.parquet("/opt/module/datas/TestFile/test_table/key=2")

1
2
3
val df3 = spark.read.parquet("/opt/module/datas/TestFile/test_table")

df3.show

1
2
3
val df3 = spark.read.option("mergeSchema",true).parquet("/opt/module/datas/TestFile/test_table")

df3.show

4.JSON 文件

    Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。

    需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。读取JSON数据集示例如下:

(1)Demo1:使用Spark自带的示例文件 –> people.json 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//定义路径:
val path ="/opt/module/datas/TestFile/people.json"

//读取Json文件,生成DataFrame:
val peopleDF = spark.read.json(path)

//打印Schema结构信息:
peopleDF.printSchema()

//创建临时视图:
peopleDF.createOrReplaceTempView("people")

//执行查询
spark.sql("SELECT name FROM people WHERE age=18").show

5.使用JDBC(通过JDBC操作关系型数据库,mysql中的数据,通过JDBC加载到Spark中进行分析和处理)

    Spark SQL同样支持通过JDBC读取其他数据库的数据作为数据源。

(一)Demo演示:使用Spark SQL读取Oracle数据库中的表。

(1)启动Spark Shell的时候,指定Oracle数据库的驱动
1
bin/spark-shell --master spark://hadoop:7077 --jars /opt/soft/mysql-connector-java-5.1.27.jar --driver-class-path /opt/soft/mysql-connector-java-5.1.27.jar 
(2)读取mysql数据库中的数据
(a)方式一:
1
2
3
4
5
6
7
val mysqlDF = spark.read.format("jdbc").
option("url","jdbc:mysql://192.168.1.120:3306/company?serverTimezone=UTC&characterEncoding=utf-8").
option("dbtable","emp").
option("user","root").
option("password","000000").load

mysqlDF.show

(b)方式二:定义 Properities类
1
2
3
4
5
6
7
8
9
10
11
12
//导入需要的类:
import java.util.Properties

//定义属性:
val mysqlProps = new Properties()
mysqlProps.setProperty("user","root")
mysqlProps.setProperty("password","000000")
//读取数据:

val mysqlDF1 = spark.read.jdbc("jdbc:mysql://192.168.1.120:3306/company?serverTimezone=UTC&characterEncoding=utf-8","emp",mysqlProps)

mysqlDF1.show

6.使用Hive Table(把Hive中的数据,读取到Spark SQL 中)

(1)首先,搭建好Hive的环境(需要Hadoop)
(a)搭建台Hive,一台Hive Server(hadoop2),一台Hive Client(hadoop1)

这两台hive中其他配置文件一样,知识hive-site.xml有区别
其中Hive Server的hive-site.xml配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<configuration> 
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop1:3306/hive?serverTimezone=UTC</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>hive.querylog.location</name>
<value>/data/hive/iotmp</value>
</property>
<property>
<name>hive.server2.logging.operation.log.location</name>
<value>/data/hive/operation_logs</value>
</property>
<property>
<name>datanucleus.readOnlyDatastore</name>
<value>false</value>
</property>
<property>
<name>datanucleus.fixedDatastore</name>
<value>false</value>
</property>
<property>
<name>datanucleus.autoCreateSchema</name>
<value>true</value>
</property>
<property>
<name>datanucleus.autoCreateTables</name>
<value>true</value>
</property>
<property>
<name>datanucleus.autoCreateColumns</name>
<value>true</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
</configuration>

Hive Client 中hive-site.xml配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.1.122:9083</value>
</property>
</configuration>

Hive Client

(2)配置Spark SQL支持Hive
(a)只需要将以下文件拷贝到$SPARK_HOME/conf的目录下,即可
  • $HIVE_HOME/conf/hive-site.xml(拷贝Hive Client中的hive-site.xml)
  • $HADOOP_CONF_DIR/core-site.xml
  • $HADOOP_CONF_DIR/hdfs-site.xml
(3)启动hive:

启动Hive Server

1
2
3
cd /opt/module/hive-1.2.1

bin/hive --service metastore

Hive Server

启动Hive Client

1
2
3
cd /opt/module/hive-1.2.1

bin/hive

Hive Client

(4)使用Spark Shell操作Hive
(a)启动Spark Shell的时候,需要使用–jars指定mysql的驱动程序

启动Spark

1
2
3
4
5
cd /opt/module/spark-2.1.0-bin-hadoop2.7

bin/spark-shell --master://hadoop1:7077

spark.sql("select * from default.emp").show

查询Hive中的表

(b)创建表
1
spark.sql("create table movle.src (key INT, value STRING) row format 	delimited fields terminated by ','")

创建表1

创建表2

(c )导入数据
1
spark.sql("load data local path '/root/temp/data.txt' into table src")
(d)查询数据
1
spark.sql("select * from src").show
(4)使用spark-sql操作Hive
(a)启动spark-sql的时候,需要使用–jars指定mysql的驱动程序
(b)操作Hive
1
spark.sql("show tables").show

Spark SQL:基础

一.Spark SQL简介

    Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
    为什么要学习Spark SQL?Hive,它将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。

二.Spark SQL的特点:

1.容易整合(集成):

安装Spark的时候,已经集成好了。不需要单独安装

2.统一的数据访问方式

JDBC、JSON、Hive、parquet文件(一种列式存储文件,是SparkSQL默认的数据源)

3.兼容Hive:

可以将Hive中的数据,直接读取到Spark SQL中处理。

4.标准的数据连接:JDBC

三.基本概念:表:Datasets和DataFrames

1.表 = 表结构 + 数据

    DataFrame = Schema(表结构) + RDD(代表数据)

2.DataFrame

    DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。DataFrames可以从各种来源构建,

例如:

  • 结构化数据文件
  • hive中的表
  • 外部数据库或现有RDDs

DataFrame API支持的语言有Scala,Java,Python和R

    从上图可以看出,DataFrame多了数据的结构信息,即schema。RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化

3.Datasets

    Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化,使用强大的lambda函数的能力)以及Spark SQL优化后的执行引擎的优点。一个Dataset 可以从JVM对象构造,然后使用函数转换(map, flatMap,filter等)去操作。 Dataset API 支持Scala和Java。 Python不支持Dataset API。

四.创建DataFrames

1.第一种方式:使用case class样本类创建DataFrames

(1)定义表的Schema

注意:由于mgr和comm列中包含null值,简单起见,将对应的case class类型定义为String

1
scala> case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,depno:Int)
(2)读入数据
1
2
3
4
5
//从hdfs中读入
scala> val lines = sc.textFile("hdfs://hadoop1:9000/emp.csv").map(_.split(","))

//从本地读入
scala> val lines = sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))

/opt/module/datas/TestFile

(3)把每行数据映射到Emp中。把表结构和数据,关联。
1
scala> val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
(4)生成DataFrame
1
2
3
4
scala> val allEmpDF = allEmp.toDF

//展示
scala> allEmpDF.show

2.第二种方式:使用SparkSession

(1)什么是SparkSession

    Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。
    在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext。然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。

(2)使用StructType,来创建Schema
1
2
3
4
5
6
7
8
9
10
11
12
import org.apache.spark.sql.types._

val myschema = StructType(
List(
StructField("empno", DataTypes.IntegerType),
StructField("ename", DataTypes.StringType),
StructField("job", DataTypes.StringType),
StructField("mgr", DataTypes.IntegerType),
StructField("hiredate", DataTypes.StringType),
StructField("sal", DataTypes.IntegerType),
StructField("comm", DataTypes.IntegerType),
StructField("deptno", DataTypes.IntegerType)))

注意,需要:import org.apache.spark.sql.types._

(3)读取文件:
1
val lines= sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))

(4)数据与表结构匹配
1
2
3
import org.apache.spark.sql.Row

val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))

注意,需要:import org.apache.spark.sql.Row

(5)创建DataFrames
1
2
3
val df2 = spark.createDataFrame(allEmp,myschema)

df2.show

3.方式三,直接读取一个带格式的文件:Json

(1)读取文件:
1
2
3
val df3 = spark.read.json("/opt/module/datas/TestFile/emp.json")

df3.show

(2)另一种方式
1
2
3
val df4 = spark.read.format("json").load("/opt/module/datas/TestFile/emp.json")

df4.show

五.操作DataFrame

DataFrame操作也称为无类型的Dataset操作

1.DSL语句

(1)
1
2
3
df1.show

df1.printSchema

(2)
1
2
3
df1.select("ename","sal").show

df1.select($"ename",$"sal",$"sal"+100).show

(3)$代表 取出来以后,再做一些操作
1
2
3
df1.filter($"sal">2000).show

df1.groupBy($"depno").count.show

完整的例子,请参考:
http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset

2.SQL语句

注意:不能直接执行sql。需要生成一个视图,再执行SQL。

(1)将DataFrame注册成表(视图):
1
df1.createOrReplaceTempView("emp")
(2)执行查询:
1
2
3
spark.sql("select * from emp").show

spark.sql("select * from emp where sal > 2000").show

1
2
3
4
5
spark.sql("select * from emp where depno=10").show

spark.sql("select depno,count(1) from emp group by depno").show

spark.sql("select depno,sum(sal) from emp group by depno").show

image.png

1
2
3
df1.createOrReplaceTempView("emp12345")

spark.sql("select e.depno from emp12345 e").show

3.多表查询

1
2
3
4
5
6
7
8
9
10
11
12
13
case class Dept(deptno:Int,dname:String,loc:String)

val lines = sc.textFile("/opt/module/datas/TestFile/dept.csv").map(_.split(","))

val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))

val df2 = allDept.toDF

df2.create

df2.createOrReplaceTempView("dept")

spark.sql("select dname,ename from emp12345,dept where emp12345.depno=dept.deptno").show

六.视图

1.视图是一个虚表,不存储数据

2.两种类型视图:

(1)普通视图(本地视图):只在当前Session有效
(2)全局视图:在不同Session中都有用。全局视图创建在命名空间中:global_temp 类似于一个库。

    上面使用的是一个在Session生命周期中的临时views。在Spark SQL中,如果你想拥有一个临时的view,并想在不同的Session中共享,而且在application的运行周期内可用,那么就需要创建一个全局的临时view。并记得使用的时候加上global_temp作为前缀来引用它,因为全局的临时view是绑定到系统保留的数据库global_temp上。

(a)创建一个普通的view和一个全局的view
1
2
3
df1.createOrReplaceTempView("emp1")

df1.createGlobalTempView("emp2")
(b)在当前会话中执行查询,均可查询出结果。
1
2
spark.sql("select * from emp1").show
spark.sql("select * from global_temp.emp2").show

(c )开启一个新的会话,执行同样的查询
1
2
spark.newSession.sql("select * from emp1").show     //(运行出错)
spark.newSession.sql("select * from global_temp.emp2").show

七.创建Datasets

    DataFrame的引入,可以让Spark更好的处理结构数据的计算,但其中一个主要的问题是:缺乏编译时类型安全。为了解决这个问题,Spark采用新的Dataset API (DataFrame API的类型扩展)。

    Dataset是一个分布式的数据收集器。这是在Spark1.6之后新加的一个接口,兼顾了RDD的优点(强类型,可以使用功能强大的lambda)以及Spark SQL的执行器高效性的优点。所以可以把DataFrames看成是一种特殊的Datasets,即:Dataset(Row)

1.方式一:使用序列

(1)定义case class
1
scala >case class MyData(a:Int,b:String)
(2).生成序列,并创建DataSet
1
scala >val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
(3).查看结果
1
2
3
scala >ds.show

ds.collect

2.方式二:使用JSON数据

(1)定义case class
1
case class Person(name: String, age: BigInt)
(2)通过JSON数据生成DataFrame
1
val df = spark.read.format("json").load("/opt/module/datas/TestFile/people.json")
(3)将DataFrame转成DataSet
1
2
df.as[Person].show
df.as[Person].collect

3.方式三:使用其他数据(RDD的操作和DataFrame操作结合)

(1)需求:分词;查询出长度大于3的单词
(a)读取数据,并创建DataSet
1
val linesDS = spark.read.text("/opt/module/datas/TestFile/test_WordCount.txt").as[String]
(b)对DataSet进行操作:分词后,查询长度大于3的单词
1
2
3
4
5
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)

words.show

words.collect

(2)需求:执行WordCount程序
1
2
3
val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count

result.show

排序:

1
2
3
result.orderBy($"value").show

result.orderBy($"count(1)").show

八.Datasets的操作案例

1.使用emp.json 生成DataFrame

(1)数据:emp.json
(2)使用emp.json 生成DataFrame
1
2
3
val empDF = spark.read.json("/opt/module/datas/TestFile/emp.json")

emp.show

查询工资大于3000的员工

1
empDF.where($"sal" >= 3000).show

(3)创建case class,生成DataSets
1
2
3
case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)

val empDS = empDF.as[Emp]
(4)查询数据
1
2
3
4
5
//查询工资大于3000的员工
empDS.filter(_.sal > 3000).show

//查看10号部门的员工
empDS.filter(_.deptno == 10).show

2.多表查询

(1)创建部门表
1
2
3
4
5
6
7
val deptRDD=sc.textFile("/opt/module/datas/TestFile/dept.csv").map(_.split(","))

case class Dept(deptno:Int,dname:String,loc:String)

val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS

deptDS.show

(2)创建员工表
1
2
3
4
5
6
7
case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)

val empRDD = sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))

val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS

empDS.show

(3)执行多表查询:等值链接
1
2
3
val result = deptDS.join(empDS,"deptno")

result.show

(4)另一种写法:注意有三个等号
1
2
3
val result1 = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))

result1.show

joinWith和join的区别是连接后的新Dataset的schema会不一样

(5)多表条件查询:
1
2
3
val result = deptDS.join(empDS,"deptno").where("deptno==10") 

result.show

Spark Streaming:性能优化

1.减少批数据的执行时间

在Spark中有几个优化可以减少批处理的时间:

(1)数据接收的并行水平

    通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上)接收单个数据流。创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream可以被切分为两个kafka输入流,每个接收一个topic。这将在两个worker上运行两个receiver,因此允许数据并行接收,提高整体的吞吐量。多个DStream可以被合并生成单个DStream,这样运用在单个输入DStream的transformation操作可以运用在合并的DStream上。

(2)数据处理的并行水平

    如果运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。默认的并发任务数通过配置属性来确定spark.default.parallelism。

(3)数据序列化

    可以通过改变序列化格式来减少数据序列化的开销。在流式传输的情况下,有两种类型的数据会被序列化:

  • 输入数据
  • 由流操作生成的持久RDD

    在上述两种情况下,使用Kryo序列化格式可以减少CPU和内存开销。

2.设置正确的批容量

    为了Spark Streaming应用程序能够在集群中稳定运行,系统应该能够以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这可以通过流的网络UI观察得到。批处理时间应该小于批间隔时间。

    根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率可以通过应用程序维持。可以考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能可以每2秒打印一次单词计数(批间隔时间为2秒),但无法每500毫秒打印一次单词计数。所以,为了在生产环境中维持期望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。

    找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。

3.内存调优

在这一节,重点介绍几个强烈推荐的自定义选项,它们可以减少Spark Streaming应用程序垃圾回收的相关暂停,获得更稳定的批处理时间。

(1)Default persistence level of DStreams:

    和RDDs不同的是,默认的持久化级别是序列化数据到内存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。即使保存数据为序列化形态会增加序列化/反序列化的开销,但是可以明显的减少垃圾回收的暂停。

(2)Clearing persistent RDDs:

    默认情况下,通过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。如果spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化RDD将会被定时的清理掉。正如前面提到的那样,这个值需要根据Spark Streaming应用程序的操作小心设置。然而,可以设置配置选项spark.streaming.unpersist为true来更智能的去持久化(unpersist)RDD。这个配置使系统找出那些不需要经常保有的RDD,然后去持久化它们。这可以减少Spark RDD的内存使用,也可能改善垃圾回收的行为。

(3)Concurrent garbage collector:

    使用并发的标记-清除垃圾回收可以进一步减少垃圾回收的暂停时间。尽管并发的垃圾回收会减少系统的整体吞吐量,但是仍然推荐使用它以获得更稳定的批处理时间。

Spark Streaming:高级数据源

一.Spark Streaming接收Flume数据

1.基于Flume的Push模式

    Flume被用于在Flume agents之间推送数据.在这种方式下,Spark Streaming可以很方便的建立一个receiver,起到一个Avro agent的作用.Flume可以将数据推送到改receiver.

(1)第一步:Flume的配置文件

(2)第二步:Spark Streaming程序

(3)第三步:注意除了需要使用Flume的lib的jar包以外,还需要以下jar包:
1
spark-streaming-flume_2.1.0.jar
(4)第四步:测试
  • 启动Spark Streaming程序
  • 启动Flume
  • 拷贝日志文件到/root/training/logs目录
  • 观察输出,采集到数据

2.基于Custom Sink的Pull模式

    不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume sink。Flume将数据推送到sink中,并且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从sink拉取数据。只要当数据被接收并且被Spark Streaming备份后,转换器才运行成功。

    这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式需要为Flume配置一个正常的sink。

以下为配置步骤:

(1)第一步:Flume的配置文件

(2)第二步:Spark Streaming程序

(3)第三步:需要的jar包
  • 将Spark的jar包拷贝到Flume的lib目录下
  • 下面的这个jar包也需要拷贝到Flume的lib目录下,同时加入IDEA工程的classpath
1
spark-streaming-flume-sink_2.1.0.jar
(4)第四步:测试
  • 启动Flume
  • 在IDEA中启动FlumeLogPull
  • 将测试数据拷贝到/root/training/logs
  • 观察IDEA中的输出

二.Spark Streaming接收Kafka数据

Apache Kafka是一种高吞吐量的分布式发布订阅消息系统。

Kafka

1.搭建ZooKeeper(Standalone):

(1)配置/root/training/zookeeper-3.4.10/conf/zoo.cfg文件
1
2
dataDir=/root/training/zookeeper-3.4.10/tmp
server.1=spark81:2888:3888
(2)在/root/training/zookeeper-3.4.10/tmp目录下创建一个myid的空文件
1
echo 1 > /root/training/zookeeper-3.4.6/tmp/myid

2.搭建Kafka环境(单机单broker):

(1)修改server.properties文件

(2)启动Kafka
1
bin/kafka-server-start.sh config/server.properties &

出现以下错误:

需要修改bin/kafka-run-class.sh文件,将这个选项注释掉。

(3)测试Kafka
  • 创建Topic
    1
    bin/kafka-topics.sh --create --zookeeper spark81:2181 -replication-factor 1 --partitions 3 --topic mydemo1
  • 发送消息
    1
    bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
  • 接收消息
    1
    bin/kafka-console-consumer.sh --zookeeper spark81:2181 --topic mydemo1

3.搭建Spark Streaming和Kafka的集成开发环境

    由于Spark Streaming和Kafka集成的时候,依赖的jar包比较多,而且还会产生冲突。强烈建议使用Maven的方式来搭建项目工程。
下面是依赖的pom.xml文件:

4.基于Receiver的方式

    这个方法使用了Receivers来接收数据。Receivers的实现使用到Kafka高层次的消费者API。对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据。

(1)开发Spark Streaming的Kafka Receivers

(2)测试
  • 启动Kafka消息的生产者
    1
    bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
  • 在IDEA中启动任务,接收Kafka消息

5.直接读取方式

    和基于Receiver接收数据不一样,这种方式定期地从Kafka的topic+partition中查询最新的偏移量,再根据定义的偏移量范围在每个batch里面处理数据。当作业需要处理的数据来临时,spark通过调用Kafka的简单消费者API读取一定范围的数据。

(1)开发Spark Streaming的程序

(2)测试
  • 启动Kafka消息的生产者
    1
    bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
  • 在IDEA中启动任务,接收Kafka消息

Spark Streaming:进阶

一.StreamingContext对象详解

1.初始化StreamingContext

(1)方式一:从SparkConf对象中创建
1
2
3
val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")

val src = new StreamContext(conf, Second(5))

(2)方式二:从一个现有的SparkContext实例中创建
1
2
3
scala >import org.apache.spark.streaming.{Second,StreamingContext}

scala >val ssc=new StreamContext(sc,Second(1))

2.程序中的几点说明:

  • appName参数是应用程序在集群UI上显示的名称。

  • master是Spark,Mesos或YARN集群的URL,或者一个特殊的“local [*]”字符串来让程序以本地模式运行。

  • 当在集群上运行程序时,不需要在程序中硬编码master参数,而是使用spark-submit提交应用程序并将master的URL以脚本参数的形式传入。但是,对于本地测试和单元测试,您可以通过“local[*]”来运行Spark Streaming程序(请确保本地系统中的cpu核心数够用)

  • StreamingContext会内在的创建一个SparkContext的实例(所有Spark功能的起始点),你可以通过ssc.sparkContext访问到这个实例。

  • 批处理的时间窗口长度必须根据应用程序的延迟要求和可用的集群资源进行设置。

3.请务必记住以下几点:

  • 一旦一个StreamingContextt开始运作,就不能设置或添加新的流计算。

  • 一旦一个上下文被停止,它将无法重新启动。

  • 同一时刻,一个JVM中只能有一个StreamingContext处于活动状态。

  • StreamingContext上的stop()方法也会停止SparkContext。 要仅停止StreamingContext(保持SparkContext活跃),请将stop() 方法的可选参数stopSparkContext设置为false。

  • 只要前一个StreamingContext在下一个StreamingContext被创建之前停止(不停止SparkContext),SparkContext就可以被重用来创建多个StreamingContext。

二.离散流(DStreams):Discretized Streams

1.DiscretizedStream或DStream 是Spark Streaming对流式数据的基本抽象。

它表示连续的数据流,这些连续的数据流可以是从数据源接收的输入数据流,也可以是通过对输入数据流执行转换操作而生成的经处理的数据流。在内部,DStream由一系列连续的RDD表示,如下图:

2.举例分析:

在之前的NetworkWordCount的例子中,我们将一行行文本组成的流转换为单词流,具体做法为:将flatMap操作应用于名为lines的 DStream中的每个RDD上,以生成words DStream的RDD。如下图所示:

但是DStream和RDD也有区别,下面画图说明:

RDD的结构

DStream的结构

三.DStream中的转换操作(transformation)

最后两个transformation算子需要重点介绍一下:

1.transform(func)

(1)通过RDD-to-RDD函数作用于源DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD

(2)举例:在NetworkWordCount中,也可以使用transform来生成元组对

2.updateStateByKey(func)

(1)操作允许不断用新信息更新它的同时保持任意状态。
  • 定义状态-状态可以是任何的数据类型
  • 定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态
(2)重写NetworkWordCount程序,累计每个单词出现的频率(注意:累计)

(3)输出结果:

3.注意:

如果在IDEA中,不想输出log4j的日志信息,可以将log4j.properties文件(放在src的目录下)的第一行改为:

1
log4j.rootCategory=ERROR,console

四.窗口操作

    Spark Streaming还提供了窗口计算功能,允许您在数据的滑动窗口上应用转换操作。下图说明了滑动窗口的工作方式:

    如图所示,每当窗口滑过originalDStream时,落在窗口内的源RDD被组合并被执行操作以产生windowed DStream的RDD。在上面的例子中,操作应用于最近3个时间单位的数据,并以2个时间单位滑动。这表明任何窗口操作都需要指定两个参数。

  • 窗口长度(windowlength) - 窗口的时间长度(上图的示例中为:3)。
  • 滑动间隔(slidinginterval) - 两次相邻的窗口操作的间隔(即每次滑动的时间长度)(上图示例中为:2)。

    这两个参数必须是源DStream的批间隔的倍数(上图示例中为:1)。

    我们以一个例子来说明窗口操作。 假设您希望对之前的单词计数的示例进行扩展,每10秒钟对过去30秒的数据进行wordcount。为此,我们必须在最近30秒的pairs DStream数据中对(word, 1)键值对应用reduceByKey操作。这是通过使用reduceByKeyAndWindow操作完成的。

    一些常见的窗口操作如下表所示。所有这些操作都用到了上述两个参数 - windowLength和slideInterval。

(1)window(windowLength, slideInterval)
  • 基于源DStream产生的窗口化的批数据计算一个新的DStream
(2)countByWindow(windowLength, slideInterval)
  • 返回流中元素的一个滑动窗口数
(3)reduceByWindow(func, windowLength, slideInterval)
  • 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数必须是相关联的以使计算能够正确的并行计算。
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
  • 应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每一个key的值均由给定的reduce函数聚集起来。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。你可以用numTasks参数设置不同的任务数
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
  • 上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce计算结果递增地计算每个窗口的reduce值。这是通过对进入滑动窗口的新数据进行reduce操作,以及“逆减(inverse reducing)”离开窗口的旧数据来完成的。一个例子是当窗口滑动时对键对应的值进行“一加一减”操作。但是,它仅适用于“可逆减函数(invertible reduce functions)”,即具有相应“反减”功能的减函数(作为参数invFunc)。 像reduceByKeyAndWindow一样,通过可选参数可以配置reduce任务的数量。 请注意,使用此操作必须启用检查点。
(6)countByValueAndWindow(windowLength, slideInterval, [numTasks])
  • 应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。

五.输入DStreams和接收器

1.输入DStreams表示从数据源获取输入数据流的DStreams。

在NetworkWordCount例子中,lines表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。
输入DStreams表示从数据源获取的原始数据流

2.Spark Streaming拥有两类数据源:

(1)基本源(Basic sources):这些源在StreamingContext API中直接可用。例如文件系统、套接字连接、Akka的actor等
(2)高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等。

3.下面通过具体的案例,详细说明:

(1)文件流:通过监控文件系统的变化,若有新文件添加,则将它读入并作为数据流

需要注意的是:

  • 这些文件具有相同的格式
  • 这些文件通过原子移动或重命名文件的方式在dataDirectory创建
  • 如果在文件中追加内容,这些追加的新数据也不会被读取。

注意:要演示成功,需要在原文件中编辑,然后拷贝一份。

(2)RDD队列流

    使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream,用于调试Spark Streaming应用程序。

1

(3)套接字流:通过监听Socket端口来接收数据

六.DStreams的输出操作

1.输出操作允许DStream的操作推到如数据库、文件系统等外部系统中。

因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。

2.目前,定义了下面几种输出操作:

(1)foreachRDD的设计模式

DStream.foreachRDD是一个强大的原语,发送数据到外部系统中。

(a)第一步:创建连接,将数据写入外部数据库(使用之前的NetworkWordCount,改写之前输出结果的部分,如下)

出现以下Exception:

    原因是:Connection对象不是一个可被序列化的对象,不能RDD的每个Worker上运行;即:Connection不能在RDD分布式环境中的每个分区上运行,因为不同的分区可能运行在不同的Worker上。所以需要在每个RDD分区上单独创建Connection对象。

(b)第二步:在每个RDD分区上单独创建Connection对象,如下:

七.DataFrame和SQL操作

    可以很方便地使用DataFrames和SQL操作来处理流数据。必须使用当前的StreamingContext对应的SparkContext创建一个SparkSession。此外,必须这样做的另一个原因是使得应用可以在driver程序故障时得以重新启动,这是通过创建一个可以延迟实例化的单例SparkSession来实现的。

    在下面的示例中,使用DataFrames和SQL来修改之前的wordcount示例并对单词进行计数。我们将每个RDD转换为DataFrame,并注册为临时表,然后在这张表上执行SQL查询。

八.缓存/持久化

    与RDD类似,DStreams还允许开发人员将流数据保留在内存中。也就是说,在DStream上调用persist() 方法会自动将该DStream的每个RDD保留在内存中。如果DStream中的数据将被多次计算(例如,相同数据上执行多个操作),这个操作就会很有用。对于基于窗口的操作,如reduceByWindow和reduceByKeyAndWindow以及基于状态的操作,如updateStateByKey,数据会默认进行持久化。 因此,基于窗口的操作生成的DStream会自动保存在内存中,而不需要开发人员调用persist()。

    对于通过网络接收数据(例如Kafka,Flume,sockets等)的输入流,默认持久化级别被设置为将数据复制到两个节点进行容错。

    请注意,与RDD不同,DStreams的默认持久化级别将数据序列化保存在内存中。

九.检查点支持

    流数据处理程序通常都是全天候运行,因此必须对应用中逻辑无关的故障(例如,系统故障,JVM崩溃等)具有弹性。为了实现这一特性,Spark Streaming需要checkpoint足够的信息到容错存储系统,以便可以从故障中恢复。

1.一般会对两种类型的数据使用检查点:

(1)元数据检查点(Metadatacheckpointing)

将定义流计算的信息保存到容错存储中(如HDFS)。这用于从运行streaming程序的driver程序的节点的故障中恢复。元数据包括以下几种:

  • 配置(Configuration) - 用于创建streaming应用程序的配置信息。
  • DStream操作(DStream operations) - 定义streaming应用程序的DStream操作集合。
  • 不完整的batch(Incomplete batches) - jobs还在队列中但尚未完成的batch。
(2)数据检查点(Datacheckpointing)

将生成的RDD保存到可靠的存储层。对于一些需要将多个批次之间的数据进行组合的stateful变换操作,设置数据检查点是必需的。在这些转换操作中,当前生成的RDD依赖于先前批次的RDD,这导致依赖链的长度随时间而不断增加,由此也会导致基于血统机制的恢复时间无限增加。为了避免这种情况,stateful转换的中间RDD将定期设置检查点并保存到到可靠的存储层(例如HDFS)以切断依赖关系链。

    总而言之,元数据检查点主要用于从driver程序故障中恢复,而数据或RDD检查点在任何使用stateful转换时是必须要有的。

2.何时启用检查点:

对于具有以下任一要求的应用程序,必须启用检查点:

(1)使用状态转:

如果在应用程序中使用updateStateByKey或reduceByKeyAndWindow(具有逆函数),则必须提供检查点目录以允许定期保存RDD检查点。

(2)从运行应用程序的driver程序的故障中恢复:

元数据检查点用于使用进度信息进行恢复。

3.如何配置检查点:

    可以通过在一些可容错、高可靠的文件系统(例如,HDFS,S3等)中设置保存检查点信息的目录来启用检查点。这是通过使用streamingContext.checkpoint(checkpointDirectory)完成的。设置检查点后,您就可以使用上述的有状态转换操作。此外,如果要使应用程序从驱动程序故障中恢复,您应该重写streaming应用程序以使程序具有以下行为:

(a)当程序第一次启动时,它将创建一个新的StreamingContext,设置好所有流数据源,然后调用start()方法。

(b)当程序在失败后重新启动时,它将从checkpoint目录中的检查点数据重新创建一个StreamingContext。
使用StreamingContext.getOrCreate可以简化此行为

4.改写之前的WordCount程序,使得每次计算的结果和状态都保存到检查点目录下

通过查看HDFS中的信息,可以看到相关的检查点信息,如下:

Spark Streaming:基础

1.Spark Streaming简介

    Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。而且,您还可以在数据流上应用Spark提供的机器学习和图处理算法。

2.Spark Streaming的特点

(1)易用:集成在Spark中
(2)容错性:底层RDD,RDD本身就具备容错机制。
(3)支持多种编程语言:Java Scala Python

3.Spark Streaming的内部结构

    在内部,它的工作原理如下。Spark Streaming接收实时输入数据流,并将数据切分成批,然后由Spark引擎对其进行处理,最后生成“批”形式的结果流。

    Spark Streaming将连续的数据流抽象为discretizedstream或DStream。在内部,DStream 由一个RDD序列表示。

Spark Core:Spark RDD的高级算子

1.mapPartitionsWithIndex

把每个partition中的分区号和对应的值拿出来

1
def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U])
(1)参数说明:
  • f是一个函数参数,需要自定义。
  • f 接收两个参数,第一个参数是Int,代表分区号。第二个Iterator[T]代表分区中的元素。
(2)通过这两个参数,可以定义处理分区的函数。

Iterator[U] : 操作完成后,返回的结果。

(3)示例:将每个分区中的元素和分区号打印出来。
(a)
1
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
(b)创建一个函数返回RDD中的每个分区号和元素:
1
2
3
def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={
iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator
}
(c )调用:
1
rdd1.mapPartitionsWithIndex(func1).collect

2.aggregate

含义:先对局部聚合,再对全局聚合

1
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

举例

(1)第一个例子:
1
2
3
4
5
6
7
8
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)

def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={
iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator
}

//查看每个分区中的元素:
rdd1.mapPartitionsWithIndex(func1).collect

(a)需求:将每个分区中的最大值求和,注意:初始值是0;

分析过程

1
2
3
4
5
//如果初始值为0,结果为7
rdd1.aggregate(0)(max(_,_),_+_)

//如果初始值为10,则结果为:30
rdd1.aggregate(10)(max(_,_),_+_)

(b)需求:如果是求和,注意:初始值是0:
1
2
3
4
5
//如果初始值是0
rdd1.aggregate(0)(_+_,_+_)

//如果初始值是10,则结果是:45
rdd1.aggregate(10)(_+_,_+_)

(2)第二个例子:一个字符串的例子:
1
2
3
4
5
6
7
8
9
val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)

//修改一下刚才的查看分区元素的函数
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}

//查看两个分区中的元素:
rdd2.mapPartitionsWithIndex(func2).collect

运行结果:

1
2
3
rdd2.aggregate("")(_+_,_+_)

rdd2.aggregate("*")(_+_,_+_)

结果

(3)例三:更复杂一点的例子
1
2
3
4
5
val rdd3 = sc.parallelize(List("12","23","345","4567"),2)

rdd3.mapPartitionsWithIndex(func2).collect

rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)

结果

程序执行分析:

1
2
3
4
5
6
7
第一个分区: "12"  "23"
第一次比较:"" 和 "12" 比,求长度的最大值: 2 。 2 ---> "2"
第二次比较:"2" 和 "23" 比,求长度的最大值: 2。 2 ---> "2"

第二个分区:"345" "4567"
第一次比较:"" 和 "345" 比,求长度的最大值: 3 。 3 ---> "3"
第二次比较:"3" 和 "4567" 比,求长度的最大值: 4。 4 ---> "4"

结果可能是:”24”,也可能是:”42”

(4)例四:
1
2
3
4
5
val rdd4 = sc.parallelize(List("12","23","345",""),2)

rdd4.mapPartitionsWithIndex(func2).collect

rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

结果

程序执行分析:

1
2
3
4
5
6
7
第一个分区: "12"  "23"
第一次比较:"" 和 "12" 比长度,求长度的最小值。 0 。 0 ---> "0"
第二次比较:"0" 和 "23" 比长度,求长度的最小值。 1。 1 ---> "1"

第二个分区:"345" ""
第一次比较:"" 和 "345" 比,求长度的最小值。 0 。 0 ---> "0"
第二次比较:"0" 和 "" 比,求长度的最小值。 0。 0 ---> "0"

结果是:”10”,也可能是”01”,

原因:注意有个初始值””,其长度0,然后0.toString变成字符串

(5)例5:
1
2
3
4
5
val rdd5 = sc.parallelize(List("12","23","","345"),2)

rdd5.mapPartitionsWithIndex(func2).collect

rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

结果

结果是:”11”,原因同上

程序执行分析:

1
2
3
4
5
6
7
第一个分区: "12"  "23"
第一次比较:"" 和 "12" 比长度,求长度的最小值。 0 。 0 ---> "0"
第二次比较:"0" 和 "23" 比长度,求长度的最小值。 1。 1 ---> "1"

第二个分区:"" "345"
第一次比较:"" 和 "" 比,求长度的最小值。 0 。 0 ---> "0"
第二次比较:"0" 和 "345" 比,求长度的最小值。 1。 1 ---> "1"

3.aggregateByKey:类似于aggregate操作,区别:操作的 的数据

(1)准备数据:
1
2
3
4
5
6
7
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}

pairRDD.mapPartitionsWithIndex(func3).collect

准备数据

(2)两个分区中的元素:

(3) 示例:
(a)将每个分区中的动物最多的个数求和
1
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect

结果

(b)将每种动物个数求和
1
pairRDD.aggregateByKey(0)(_+_, _ + _).collect

结果

(c )这个例子也可以使用:reduceByKey
1
pairRDD.reduceByKey(_+_).collect

结果

与reduceByKey相比,aggregateByKey 效率更高

4.coalesce与repartition

(1)都是将RDD中的分区进行重分区。
(2)区别是:coalesce默认不会进行shuffle(false);而repartition会进行shuffle(true),即:会将数据真正通过网络进行重分区。
(3)示例:
1
2
3
4
5
6
7
def func4(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)

rdd1.mapPartitionsWithIndex(func4).collect

数据准备

下面两句话是等价的:

1
2
3
val rdd2 = rdd1.repartition(3)

val rdd3 = rdd1.coalesce(3,true) //--->如果是false,查看RDD的length依然是2

结果

5、其他高级算子

参考:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

  • Copyrights © 2015-2021 Movle
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信