1.需求:
用spark Streaming写一个wordcount程序,计算发往端口号1235中的信息(单词计数)
2.代码:
(1)pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.0</version>
</dependency> </dependencies>
|
(2)MyTotalNetworkWordCount.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| package day1211
import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel
object MyTotalNetworkWordCount {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "/Users/macbook/Documents/hadoop/hadoop-2.8.4") Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val conf = new SparkConf().setMaster("local[2]").setAppName("MyTotalNetworkWordCount")
val ssc = new StreamingContext(conf,Seconds(3))
ssc.checkpoint("hdfs://192.168.1.121:9000/TestFile/chkp0826")
val lines = ssc.socketTextStream("192.168.1.121", 1235, StorageLevel.MEMORY_ONLY)
val words = lines.flatMap(_.split(" "))
val wordPair = words.map((_,1))
val addFunc = (curreValues:Seq[Int],previousValues:Option[Int]) => { val currentTotal = curreValues.sum
Some( currentTotal + previousValues.getOrElse(0) )
}
val total = wordPair.updateStateByKey(addFunc)
total.print()
ssc.start() ssc.awaitTermination()
} }
|
3.运行程序,并往端口号1235发送信息:

4.结果:
