Fork me on GitHub

Spark SQL实战之UDF与UDAF的使用

1.概念:

UDF就是用户自定义的函数
UDAF就是用户自定义的聚合函数

2.代码:

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
(2)SparkSQLUDFUDAF.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package spark.sqlshizhan
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{ Row, SQLContext }
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.IntegerType
/**
* @ClassName SparkSQLUDFUDAF
* @MethodDesc: SparkSQL UDF与UDAF的使用
* @Author Movle
* @Date 5/18/20 10:44 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/
object SparkSQLUDFUDAF {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName("SparkSQLUDFUDAF")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val bigData = Array("Spark", "Spark", "Hadoop", "spark", "Hadoop", "spark", "Hadoop", "Hadoop", "spark", "spark")

//创建Dataframe
val bigDataRDD = sc.parallelize(bigData)

val bigDataRDDRow = bigDataRDD.map(item => Row(item))

val structType = StructType(Array(
new StructField("word", StringType)))

val bigDataDF = sqlContext.createDataFrame(bigDataRDDRow, structType)

bigDataDF.createOrReplaceTempView("bigDataTable")

//UDF 最多22个输入参数
sqlContext.udf.register("computeLength",(input:String,input2:String) => input.length())

sqlContext.sql("select word,computeLength(word,word) from bigDataTable").show()

sqlContext.udf.register("wordcount", new MyUDAF)

sqlContext.sql("select word,wordcount(word) as count from bigDataTable group by word").show()


sc.stop()

}
}

class MyUDAF extends UserDefinedAggregateFunction{

/**
* 该方法指定具体输入数据的类型
* @return
*/
override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))

/**
* 在进行聚合操作的时候所要处理的数据的结果的类型
* @return
*/
override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))

/**
* 指定UDAF函数计算后返回的结果类型
* @return
*/
override def dataType: DataType = IntegerType

/**
* 确保一致性,一般都用true
* @return
*/
override def deterministic: Boolean = true

/**
* 在Aggregate之前每组数据的初始化结果
* @param buffer
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0 }

/**
* 在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
* 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner
* @param buffer
* @param input
*/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0) + 1
}

/**
* 最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作
* @param buffer1
* @param buffer2
*/
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
}

/**
* 返回UDAF最后的计算结果
* @param buffer
* @return
*/
override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)
}

Spark SQL实战:SparkSQL exmple

1.需求:

使用Spark SQL,读取文件并查询数据表

2.代码:

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>

(2)SparkSQLExample.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package spark.sqlshizhan

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession


/**
* @ClassName SparkSQLExample
* @MethodDesc: TODO SparkSQLExample功能介绍
* @Author Movle
* @Date 5/18/20 9:34 下午
* @Version 1.0
* @Email movle_xjk@foxmail.com
**/
case class Student1(sno:String,sname:String,ssex:String,sbirthday:String,sclass:String)

case class Course(cno:String,cname:String,tno:String)

case class Score(sno:String,cno:String,degree:String)

case class Teacher(tno:String,tname:String,tsex:String,tbirthday:String,tprof:String,tdepart:String)

import java.text.SimpleDateFormat
object SparkSQLExample {

def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir","/Users/macbook/Documents/hadoop/hadoop-2.8.4")

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
//Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

def getDate(time: String) = {
val now: Long = System.currentTimeMillis()
val df: SimpleDateFormat = new SimpleDateFormat(time)
df.format(now)
}

val spark = SparkSession.builder().master("local").appName("SparkSQLExample").getOrCreate()

import spark.sqlContext.implicits._

spark.sparkContext.textFile("/users/macbook/TestInfo/Student.csv")
.map(_.split(","))
.map(x=>Student1(x(0),x(1),x(2),x(3),x(4)))
.toDF
.createOrReplaceTempView("Student1")

spark.sparkContext.textFile("/users/macbook/TestInfo/Course.csv")
.map(_.split(","))
.map(x => Course(x(0),x(1),x(2)))
.toDF
.createOrReplaceTempView("Course")

spark.sparkContext.textFile("/users/macbook/TestInfo/Score.csv")
.map(_.split(","))
.map(x => Score(x(0),x(1),x(2)))
.toDF
.createOrReplaceTempView("Score")
spark.sparkContext.textFile("/users/macbook/TestInfo/Teacher.csv")
.map(_.split(","))
.map(x => Teacher(x(0),x(1),x(2),x(3),x(4),x(5)))
.toDF
.createOrReplaceTempView("Teacher")


//查询整个teacher表
spark.sql("select * from Teacher").show()

// 查询Student表中所有记录的sname ssex class列
spark.sql("select sname,ssex,sclass from student").show()

//查询教师表中不重复的depart列
spark.sql("select distinct tdepart from teacher").show(false)
spark.sql("select tdepart from teacher group by tdepart").show(false)

//查询score表中成绩在60 80 之间的所有记录
spark.sql("select * from score where degree >= 60 and degree <= 80").show()
spark.sql("select * from score where degree between 60 and 80").show()

//查询score表中成绩为 85 86 或 88 的记录
spark.sql("select * from score where degree = '85' or degree='86' OR degree='88'").show()
spark.sql("select * from score where degree =85 or degree=86 OR degree=88").show()

//以class降序、升序排列查询
spark.sql("select * from student order by sclass desc").show()
spark.sql("select * from student order by sclass").show()

//以cno升序 degree降序查询score表中的数据
spark.sql("select * from score t order by t.sno asc, t.degree desc").show()

//查询Score表中的最高分的学生学号和课程
spark.sql("select * from score order by Int(degree) desc limit 1").show()
spark.sql("select * from score order by Int(degree) desc").show()

//查询每门课的平均成绩
spark.sql("select cno,avg(degree) from score group by cno").show()

//查询score表中至少有5名学生选修的课,并且名字以 3 开头的课程 的平均分数
spark.sql("select cno,avg(degree) from score where cno like '3%' group by cno having count(cno) >= 5").show()

//查询所有学生中的sname cname degree
spark.sql("select s.sname, t.degree,c.cname from score t " +
"join student s on t.sno=s.sno " +
"join course c on c.cno=t.cno").show(false)

//查询score中选择多门课程的同学中,分数为非最高分成绩的记录
spark.sql("select * from score where " +
"sno in (select sno from score t group by t.sno having count(1) > 1) " +
" and degree != (select max(degree) from score)").show()

//查询和学号为108的同学同年出生的所有学生的sno sname sbirthday 列
spark.sql("select sno,sname,sbirthday from student where substring(sbirthday,0,4) = (" +
" select substring(t.sbirthday,0,4) from student t where sno='108')").show()

//查询选修某课程的同学人数多于5人的教师姓名
spark.sql("select tname from teacher e " +
" join course c on e.tno = c.tno " +
" join (select cno from score group by cno having count(cno) > 5) t on c.cno = t.cno").show()

//查询成绩比该课程平均成绩低的同学的成绩表
spark.sql("select s.* from score s where s.degree < (select avg(degree) from score c where s.cno = c.cno)").show()

//查询所有没有讲课的教师的tname 和 depart
spark.sql("select tname , tdepart from teacher t where t.tno not in (select tno from course c where c.cno in (select cno from score))").show(false)

//查询至少有2名男生的班号
spark.sql("select sclass from student t where ssex='male' group by sclass having count(ssex) >= 2").show()

//查询student表中不姓 王 的同学记录
spark.sql("select * from student t where sname not like('Wang%')").show()

//查询student表中每个学生的姓名和年龄
spark.sql("select sname, (cast(" + getDate("yyyy") + " as int) - cast(substring(sbirthday,0,4) as int)) as age from student t").show()

spark.close()
}

}

3.结果:

Spark SQL实战:使用Spark SQL 连接hive ,将统计结果存储到 mysql中

1.需求:

使用Spark SQL 连接hive ,读取数据,将统计结果存储到 mysql中

2.将写好的代码打包上传的集群,然后提交spark运行,前提是hive,HDFS已经启动

3.代码:

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
(2)demo4.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package day1209

import org.apache.spark.sql.SparkSession
import java.util.Properties

/**
* 使用Spark SQL 连接hive ,将统计结果存储到 mysql中
*
* ./spark-submit --master spark://hadoop1:7077 --jars /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --driver-class-path /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --class day0628.Demo4 /usr/local/tmp_files/Demo1209.jar
*/
object Demo4 {
def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName("Hive2Mysql").enableHiveSupport().getOrCreate()
//.config("spark.sql.inMemoryColumnarStorage.batchSize", 10)

//执行sql
val result = spark.sql("select deptno,mgr from default.emp")

//将结果保存到mysql中
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "000000")

result.write.mode("append").jdbc(
"jdbc:mysql://hadoop2:3306/company?serverTimezone=UTC&characterEncoding=utf-8",
"emp_stat", props)

//停止Spark
spark.stop()

}
}

4.执行:

(1)启动spark
1
2
3
cd /opt/module/spark-2.1.1

./bin/spark-submit --master spark://hadoop2:7077 --jars /opt/TestFolder/mysql-connector-java-5.1.27.jar --driver-class-path /opt/TestFolder/mysql-connector-java-5.1.27.jar --class spark.sqlshizhan.Demo4 /opt/TestFolder/Scala-1.0-SNAPSHOT.jar

5.结果:

Spark SQL实战:将结果写入 Mysql

1.需求:

读取本地student.txt,并创建DataFrame,并将结果写入mysql数据库中

2.数据源:

student.txt

1
2
3
1	tom  15
2 lucy 20
3 mike 18

3.写代码:

(1)添加依赖:

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
(2)Demo3.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package day1209

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties

/**
* 将结果写入 Mysql
*/
object Demo3 {
def main(args: Array[String]): Unit = {
//创建Spark Session对象
val spark = SparkSession.builder().master("local").appName("Save to Mysql").getOrCreate()

//从指定地址读取文件 创建RDD
val personRDD = spark.sparkContext.textFile("/users/macbook/TestInfo/student.txt").map(_.split("\t"))

//指定schema
val schema = StructType(
List(
StructField("id", IntegerType),
StructField("sname", StringType),
StructField("age", IntegerType)))

//将RDD转换为 rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim(), p(2).toInt))

//创建DataFrame 将schema与row对应
val personDataFrame = spark.createDataFrame(rowRDD, schema)

//注册视图
personDataFrame.createOrReplaceTempView("t_person")

//执行SQL
val result = spark.sql("select * from t_person order by age desc")



//显示结果
//df.show()

//把结果保存在Mysql中
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "000000")

result.write.mode("append").jdbc("jdbc:mysql://192.168.1.121:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)

spark.stop()

}
}

4.结果:

执行前查询student表
执行后查询student表

Spark SQL实战:使用 case class 创建DataFrame

1.需求:

使用 case class 创建DataFrame

2.数据源:

(1)student.txt
1
2
3
1	tom  15
2 lucy 20
3 mike 18

3.编写代码

(1)添加依赖:

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
(2)Demo2.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package day1209

import org.apache.spark.sql.SparkSession

/**
* 使用 case class 创建DataFrame
*/
object Demo2 {
def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().master("local").appName("CaseClassDemo").getOrCreate()

val lineRDD = spark.sparkContext.textFile("/users/macbook/TestInfo/student.txt").map(_.split("\t"))

//RDD 和 表结构关联
val studentRDD = lineRDD.map(x => Student(x(0).toInt,x(1),x(2).toInt))

//生成DataFrame
import spark.sqlContext.implicits._
val studentDF = studentRDD.toDF

studentDF.createOrReplaceTempView("student")

spark.sql("select * from student").show

spark.stop()

}
}
//定义 case class 相当于schema
case class Student(stuId:Int,stuName:String,stuAge:Int)

4.结果:

Spark SQL实战:使用SparkSession创建DataFrame执行sql

1.需求:

在IDEA中编写代码,创建DataFrame 执行sql命令:

2.数据源:

(1)student.txt
1
2
3
1	tom  15
2 lucy 20
3 mike 18

3.编写代码:

(1)添加依赖:

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
(2)Demo1.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package day1209

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row

/**
* 创建DataFrame 执行sql
*/

object Demo1 {
def main(args: Array[String]): Unit = {
//创建Spark Session对象
val spark = SparkSession.builder().master("local").appName("UnderstandSparkSession").getOrCreate()

//从指定地址读取文件 创建RDD
val personRDD = spark.sparkContext.textFile("/users/macbook/TestInfo/student.txt").map(_.split("\t"))

//指定schema
val schema = StructType(
List(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("age", IntegerType)))

//将RDD转换为 rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim(), p(2).toInt))

//创建DataFrame 将schema与row对应
val personDataFrame = spark.createDataFrame(rowRDD, schema)

//注册视图
personDataFrame.createOrReplaceTempView("t_person")

//执行SQL
val df = spark.sql("select * from t_person order by age desc")

//显示结果
df.show()

//停止Spark
spark.stop()
}
}

4.运行结果:

Spark Core实战:使用JDBC RDD操作数据库

1.需求:

使用JDBC RDD 操作数据库

2.在数据库中建表并插入数据:

1
2
3
4
5
6
7
8
9
10
create table emp(
id int(11),
ename varchar(20),
deptno int(11),
sal int(11));

insert into emp values(1,"Tom",10,2500);
insert into emp values(2,"Movle",11,1000);
insert into emp values(2,"Mike",10,1500);
insert into emp values(2,"jack",11,500);

3.添加JDBC驱动

4.写代码:

(1)MyJDBCRddDemo.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
import java.sql.DriverManager

/**
* 使用JDBC RDD 操作数据库
*/


object MyJDBCRddDemo {

val connection = () => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
}

def main(args: Array[String]): Unit = {

//创建Spark对象
val conf = new SparkConf().setAppName("My JDBC Rdd Demo").setMaster("local")
val sc = new SparkContext(conf)

val mysqlRDD = new JdbcRDD(sc,connection,"select * from emp where sal > ? and sal <= ?",900,2000, 2, r=>{
//获取员工的姓名和薪水
val ename = r.getString(2)
val sal = r.getInt(4)
(ename,sal)
})

val result = mysqlRDD.collect()
println(result.toBuffer)
sc.stop
}
}

5.结果:

Spark Core实战-将Tomcat日志分析的结果写入mysql数据库

1.Tomcat日志和前面一样

2.需求:

将Tomcat日志分析的结果:jps的名称和个数统计,并插入mysql数据库

3.在mysql(本地,我的是MacOS)中建库建表:

1
2
3
4
5
create database company;

create table mydata(
jsname varchar(50),
countNumber int(11));

4.编写代码:

(1)添加pom依赖:
1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
(2)在项目中加入JDBC驱动包

JDBC驱动包

(3)MyTomcatLogCountToMysql.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.DriverManager


object MyTomcatLogCountToMysql {

def main(args: Array[String]): Unit = {

//创建Spark对象
val conf = new SparkConf().setMaster("local").setAppName("My Tomcat Log Count To Mysql")
val sc = new SparkContext(conf)

/**
* 读入日志,解析,找到访问jsp网页
* 192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
*/

val rdd1 = sc.textFile("/users/macbook/TestInfo/localhost_access_log.txt")
.map(
/**
* 找到网页名字
*
* 并计数
*
* line 代表读进来的每一行数据
*/

line => {
//解析字符串,找到jsp名字
//得到两个双引号之间的东西
val index1 = line.indexOf("\"")
val index2 = line.lastIndexOf("\"")
val line1 = line.substring(index1 + 1, index2) // GET /MyDemoWeb/web.jsp HTTP/1.1

//得到两个空格之间的东西
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3 + 1, index4) // /MyDemoWeb/web.jsp

//得到jsp的名字
val jspName = line2.substring(line2.lastIndexOf("/") + 1)

(jspName, 1)
})

rdd1.foreachPartition(saveToMysql)

sc.stop()
}

//定义一个函数 针对分区进行操作
def saveToMysql(it: Iterator[(String, Int)]) = {

var conn: Connection = null
var pst: PreparedStatement = null

//创建连接
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")

//把数据保存到mysql中
pst = conn.prepareStatement("insert into mydata values (?,?) ")

it.foreach(data => {
pst.setString(1, data._1)
pst.setInt(2,data._2)
pst.executeUpdate()
})

}

}

5.结果:

1
2

Spark Core实战-创建自定义分区

1.Tomcat日志格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/ HTTP/1.1" 200 259
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/head.jsp HTTP/1.1" 200 713
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/body.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] "GET /MyDemoWeb/mysql.jsp HTTP/1.1" 200 241
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:53 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] "GET /MyDemoWeb/mysql.jsp HTTP/1.1" 200 241
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:55:00 +0800] "GET /MyDemoWeb/mysql.jsp HTTP/1.1" 200 241
192.168.88.1 - - [30/Jul/2017:12:55:00 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242

2.需求:按照jsp的名字,将访问日志进行分区

3.分析:

一个文件就是一个分区,并且一个文件中只包含一个jsp的名字
jsp名字看成key 访问日志看成value

4.代码:

(1)添加依赖:

pom.xml

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
(2)MyTomcatLogPartitioner.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package day1208

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Partitioner
import scala.collection.mutable.HashMap

/**
*
* 创建自定义分区
*
* 需求:按照jsp的名字,将访问日志进行分区。
*
* 一个文件就是一个分区,并且一个文件中只包含一个jsp的名字
*
* jsp名字看成key 访问日志看成value
*
*/

object MyTomcatLogPartitioner {

def main(args: Array[String]): Unit = {
//创建Spark对象
System.setProperty("hadoop.home.dir", "/Users/macbook/Documents/hadoop/hadoop-2.8.4")

val conf = new SparkConf().setMaster("local").setAppName("My Tomcat Log Partitioner")
val sc = new SparkContext(conf)

/**
* 读入日志 解析
* 192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
*
*/
val rdd1 = sc.textFile("/users/macbook/TestInfo/localhost_access_log.txt")
.map(
line => {
//解析字符串,找到jsp名字
//得到两个双引号之间的东西
val index1 = line.indexOf("\"")
val index2 = line.lastIndexOf("\"")
val line1 = line.substring(index1 + 1, index2) // GET /MyDemoWeb/web.jsp HTTP/1.1

//得到两个空格之间的东西
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3 + 1, index4) // /MyDemoWeb/web.jsp

//得到jsp的名字
val jspName = line2.substring(line2.lastIndexOf("/") + 1)

(jspName, line)
})

//自定义分区规则 新建一个类

val rdd2 = rdd1.map(_._1).distinct().collect()

//创建分区规则
val myPartitioner = new MyWebPartitioner(rdd2)

//对rdd1进行分区
val rdd3 = rdd1.partitionBy(myPartitioner)

//输出
rdd3.saveAsTextFile("/users/macbook/TestInfo/1208/test_partition")

sc.stop()

}

}

class MyWebPartitioner(jspList : Array[String]) extends Partitioner{

//定义一个集合来保存分区的条件 即保存jsp分到哪个区
val partitionMap = new HashMap[String,Int]()

var partId = 0 //分区号

for(jsp <- jspList){
partitionMap.put(jsp,partId)
partId += 1 //分区号加一
}

//返回有多少个分区
def numPartitions :Int = partitionMap.size

//根据jsp 返回对应的分区
def getPartition(key:Any):Int = partitionMap.getOrElse(key.toString(), 0)



}

5.结果:

Spark Core实战:解析Tomcat日志

1.Tomcat日志格式:

localhost_access_log.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/ HTTP/1.1" 200 259
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/head.jsp HTTP/1.1" 200 713
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/body.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] "GET /MyDemoWeb/mysql.jsp HTTP/1.1" 200 241
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:53 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] "GET /MyDemoWeb/mysql.jsp HTTP/1.1" 200 241
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:55:00 +0800] "GET /MyDemoWeb/mysql.jsp HTTP/1.1" 200 241
192.168.88.1 - - [30/Jul/2017:12:55:00 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242

2.需求:

找到访问量最高的两个网页

3.分析:

  • 第一步:对网页的访问量求和 和WordCount类似
  • 第二步:排序,降序

4.编写代码:

(1)添加依赖:

pom.xml

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
(2)MyTomcatLogCount.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package day1208

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

/**
* 解析Tomcat日志
* 192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239

需求:
找到访问量最高的两个网页

第一步:对网页的访问量求和 和WordCount类似
第二步:排序,降序
*
*
*/

object MyTomcatLogCount {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName("My Tomcat Log Count")
val sc = new SparkContext(conf)

/**
* 读入日志,解析,找到访问jsp网页
* 192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
*/

val rdd1 = sc.textFile("/users/macbook/TestInfo/localhost_access_log.txt")
.map(
/**
* 找到网页名字
*
* 并计数
*
* line 代表读进来的每一行数据
*/

line => {
//解析字符串,找到jsp名字
//得到两个双引号之间的东西
val index1 = line.indexOf("\"")
val index2 = line.lastIndexOf("\"")
val line1 = line.substring(index1+1, index2) // GET /MyDemoWeb/web.jsp HTTP/1.1

//得到两个空格之间的东西
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3+1, index4) // /MyDemoWeb/web.jsp

//得到jsp的名字
val jspName = line2.substring(line2.lastIndexOf("/")+1)

(jspName,1)
}
)

/**
* 按照jsp的名字 进行聚合操作
*/

val rdd2 = rdd1.reduceByKey(_+_)//得到每个jsp的访问量

//使用value排序
val rdd3 = rdd2.sortBy(_._2,false)

//取出访问量最高的两个网页
rdd3.take(2).foreach(println)

sc.stop()

//销售 : 勇气输出岗
//技术 : 头脑
}

}

5.运行结果:

运行结果

  • Copyrights © 2015-2021 Movle
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信