DStreaming练习
DStream接收socket数据统计
安装并启动生产者
#在linux系统上安装nc工具,利用它向某个端口发送数据 yum -y install nc #执行发送数据命令 nc -lk port
执行streaming依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency>
一、实现单词计数WordCount
spark Streaming程序
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkContext} object Spark_Socket { def main(args: Array[String]): Unit = { //创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("socket").setMaster("local[2]") //创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //创建StreamingContext需要一个SparkContext对象,还有一个批处理时间间隔,表示没5S处理一次数据 val ssc = new StreamingContext(sc, Seconds(5)) //接受scoket数据 val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("ip", 9999) //切分每一行数据 val words:DStream[String] = socketTextStream.flatMap(_.split(" ")) //每个单词记为1 val pairs = words.map(word => (word, 1)) //向童单词出现累加1 val result = pairs.reduceByKey(_ + _) //打印输出 result.print() //开启流式计算 ssc.start() ssc.awaitTermination() } }
数据(在一个批次间隔内发送)
dada hello
hello helll
helll
结果
二、SparkStreaming之WordCount累加
spark Streaming程序
package houpu.com.SparkStream import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object SparkStreaming_Socket_Total { def main(args: Array[String]): Unit = { //currentValues:当前批次相同的单词出现的素有的1.(wangan,1)(wangan,1)(wangan,1)--->List(1,1,1) //historyValues:在之前所有批次中,相同单词出现的总次数 def updateFunc(currentValues:Seq[Int],historyValues:Option[Int]):Option[Int]={ val newValue:Int = currentValues.sum + historyValues.getOrElse(0) Some(newValue) } //创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("socket").setMaster("local[2]") //创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //创建StreamingContext需要一个SparkContext对象,还有一个批处理时间间隔,表示没5S处理一次数据 val ssc:StreamingContext = new StreamingContext(sc, Seconds(5)) //接受scoket数据 val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("ip", 9999) //切分每一行数据 val words:DStream[String] = socketTextStream.flatMap(_.split(" ")) //每个单词记为1 val pairs = words.map(word => (word, 1)) //缓存rdd过程的数据 ssc.checkpoint("hdfs://ip:9000/dict01/save/relt02") //向童单词出现累加1 val result = pairs.updateStateByKey(updateFunc) //打印输出 result.print() //启动并等待数据流的到来 ssc.start() ssc.awaitTermination() } }
数据
结果
第一个批次间隔内测试
第二个批次间隔内测试结果
三、SparkStreaming之WordCount累加-开窗函数
程序
package houpu.com.SparkStream import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object SparkStreaming_SocketWindow { def main(args: Array[String]): Unit = { //创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("socket").setMaster("local[2]") //创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //创建StreamingContext需要一个SparkContext对象,还有一个批处理时间间隔,表示没5S处理一次数据 val ssc:StreamingContext = new StreamingContext(sc, Seconds(5)) //接受scoket数据 val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("ip", 9999) //切分每一行数据 val words:DStream[String] = socketTextStream.flatMap(_.split(" ")) //每个单词记为1 val pairs = words.map(word => (word, 1)) //相同单词出现的1累加 //reduceFunc:(V,V) => V,一个函数 //windowOuration:Duration 窗口长度 //slidDuration:Duration 滑动窗口的时间间隔,表示多久计算一次 val result:DStream[(String,Int)] = pairs.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(10),Seconds(10)) //打印输出 result.print() //启动并等待数据流的到来 ssc.start() ssc.awaitTermination() } }
数据
批次间隔为5s,窗口时长是10s,滑出长度是10s,10秒内的数据才会执行统计
这个测试数据,经过两个批次间隔,在10秒内发出
结果
时间差为10s,第二次才是正确统计
四、Transform Operation
程序
package houpu.com.SparkStream import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} object TransForm { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("TransForm") val ssc = new StreamingContext(conf,Seconds(5)) //黑名单列表 val blackList = ssc.sparkContext.parallelize(Array(("Mike",true),("Bob",true))) val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("master", 9999) val result = socketTextStream.map(line => { val userinfo = line.split(" ") (userinfo(0),userinfo(1)) }) //进行黑名单过滤,之筛选出不在黑名单 result.transform(rdd =>{ rdd.leftOuterJoin(blackList).filter(_._2._2.isEmpty) })print() //启动并等待数据流的到来 ssc.start() ssc.awaitTermination() } } //leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。
数据
测试数据
hello Mike
hello Bob
结果
五、累加TopN
程序
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object SparkStream_SocketTotal { def main(args: Array[String]): Unit = { //currentValues:当前批次相同的单词出现的素有的1.(wangan,1)(wangan,1)(wangan,1)--->List(1,1,1) //historyValues:在之前所有批次中,相同单词出现的总次数 def updateFunc(currentValues:Seq[Int],historyValues:Option[Int]):Option[Int]={ val newValue:Int = currentValues.sum + historyValues.getOrElse(0) Some(newValue) } //创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("socket").setMaster("local[2]") //创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //创建StreamingContext需要一个SparkContext对象,还有一个批处理时间间隔,表示没5S处理一次数据 val ssc:StreamingContext = new StreamingContext(sc, Seconds(5)) //接受scoket数据 val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("ip", 9999) //切分每一行数据 val words:DStream[String] = socketTextStream.flatMap(_.split(" ")) //每个单词记为1 val pairs = words.map(word => (word, 1)) ssc.checkpoint("hdfs://ip:9000/tets/data02/relt01") //相同单词出现累加1 val result = pairs.updateStateByKey(updateFunc) //按照单词出现的次数降序 val finalResult:DStream[(String,Int)] = result.transform(rdd =>{ //可以使用RDD排序的方法来操作 val sortRDD:RDD[(String,Int)] = rdd.sortBy(_._2,false) //取出次数最多的前三位 val top3:Array[(String,Int)] = sortRDD.take(3) //打印 println("##############Top3 Start##############") top3.foreach(println) println("##############Top3 End##############") sortRDD }) //打印输出 finalResult.print() //打印输出 result.print() //启动并等待数据流的到来 ssc.start() ssc.awaitTermination() } }
数据
结果