DStreaming练习

DStream接收socket数据统计

安装并启动生产者

#在linux系统上安装nc工具,利用它向某个端口发送数据
yum -y install nc

#执行发送数据命令
nc -lk port

执行streaming依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

一、实现单词计数WordCount

spark Streaming程序

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}

object Spark_Socket {
  def main(args: Array[String]): Unit = {
    //创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("socket").setMaster("local[2]")
    //创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")

    //创建StreamingContext需要一个SparkContext对象,还有一个批处理时间间隔,表示没5S处理一次数据
    val ssc = new StreamingContext(sc, Seconds(5))

    //接受scoket数据
    val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("ip", 9999)

    //切分每一行数据
    val words:DStream[String] = socketTextStream.flatMap(_.split(" "))

    //每个单词记为1
    val pairs = words.map(word => (word, 1))

    //向童单词出现累加1
    val result = pairs.reduceByKey(_ + _)

    //打印输出
    result.print()


    //开启流式计算
    ssc.start()
    ssc.awaitTermination()


  }

}

数据(在一个批次间隔内发送)

dada hello

hello helll

helll

结果

DStreaming练习

二、SparkStreaming之WordCount累加

spark Streaming程序

package houpu.com.SparkStream

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object SparkStreaming_Socket_Total {
  def main(args: Array[String]): Unit = {
    //currentValues:当前批次相同的单词出现的素有的1.(wangan,1)(wangan,1)(wangan,1)--->List(1,1,1)
    //historyValues:在之前所有批次中,相同单词出现的总次数
    def updateFunc(currentValues:Seq[Int],historyValues:Option[Int]):Option[Int]={
      val newValue:Int = currentValues.sum + historyValues.getOrElse(0)
      Some(newValue)
    }

    //创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("socket").setMaster("local[2]")
    //创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")

    //创建StreamingContext需要一个SparkContext对象,还有一个批处理时间间隔,表示没5S处理一次数据
    val ssc:StreamingContext = new StreamingContext(sc, Seconds(5))

    //接受scoket数据
    val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("ip", 9999)


    //切分每一行数据
    val words:DStream[String] = socketTextStream.flatMap(_.split(" "))

    //每个单词记为1
    val pairs = words.map(word => (word, 1))
    //缓存rdd过程的数据
    ssc.checkpoint("hdfs://ip:9000/dict01/save/relt02")

    //向童单词出现累加1
    val result = pairs.updateStateByKey(updateFunc)

    //打印输出
    result.print()

    //启动并等待数据流的到来
    ssc.start()
    ssc.awaitTermination()

  }

}

数据

DStreaming练习

结果

第一个批次间隔内测试

DStreaming练习

第二个批次间隔内测试结果

DStreaming练习

三、SparkStreaming之WordCount累加-开窗函数

程序

package houpu.com.SparkStream

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object SparkStreaming_SocketWindow {
  def main(args: Array[String]): Unit = {
    //创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("socket").setMaster("local[2]")
    //创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")

    //创建StreamingContext需要一个SparkContext对象,还有一个批处理时间间隔,表示没5S处理一次数据
    val ssc:StreamingContext = new StreamingContext(sc, Seconds(5))

    //接受scoket数据
    val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("ip", 9999)

    //切分每一行数据
    val words:DStream[String] = socketTextStream.flatMap(_.split(" "))

    //每个单词记为1
    val pairs = words.map(word => (word, 1))

    //相同单词出现的1累加
    //reduceFunc:(V,V) => V,一个函数
    //windowOuration:Duration 窗口长度
    //slidDuration:Duration 滑动窗口的时间间隔,表示多久计算一次
    val result:DStream[(String,Int)] = pairs.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(10),Seconds(10))

    //打印输出
    result.print()

    //启动并等待数据流的到来
    ssc.start()
    ssc.awaitTermination()

  }

}

 数据

批次间隔为5s,窗口时长是10s,滑出长度是10s,10秒内的数据才会执行统计

这个测试数据,经过两个批次间隔,在10秒内发出

DStreaming练习

结果

时间差为10s,第二次才是正确统计

DStreaming练习

四、Transform Operation

程序

package houpu.com.SparkStream



import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}


object TransForm {
  def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local[2]").setAppName("TransForm")
    val ssc = new StreamingContext(conf,Seconds(5))

    //黑名单列表
    val blackList = ssc.sparkContext.parallelize(Array(("Mike",true),("Bob",true)))
    val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("master", 9999)

    val result = socketTextStream.map(line => {
      val userinfo = line.split(" ")
      (userinfo(0),userinfo(1))

    })
    //进行黑名单过滤,之筛选出不在黑名单
    result.transform(rdd =>{
      rdd.leftOuterJoin(blackList).filter(_._2._2.isEmpty)
    })print()

    //启动并等待数据流的到来
    ssc.start()
    ssc.awaitTermination()


  }
}
//leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。

数据

测试数据

hello Mike

hello Bob

结果

 DStreaming练习

五、累加TopN

程序

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object SparkStream_SocketTotal {
  def main(args: Array[String]): Unit = {
    //currentValues:当前批次相同的单词出现的素有的1.(wangan,1)(wangan,1)(wangan,1)--->List(1,1,1)
    //historyValues:在之前所有批次中,相同单词出现的总次数
    def updateFunc(currentValues:Seq[Int],historyValues:Option[Int]):Option[Int]={
      val newValue:Int = currentValues.sum + historyValues.getOrElse(0)
      Some(newValue)
    }

    //创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("socket").setMaster("local[2]")
    //创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")

    //创建StreamingContext需要一个SparkContext对象,还有一个批处理时间间隔,表示没5S处理一次数据
    val ssc:StreamingContext = new StreamingContext(sc, Seconds(5))

    //接受scoket数据
    val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("ip", 9999)


    //切分每一行数据
    val words:DStream[String] = socketTextStream.flatMap(_.split(" "))

    //每个单词记为1
    val pairs = words.map(word => (word, 1))

    ssc.checkpoint("hdfs://ip:9000/tets/data02/relt01")

    //相同单词出现累加1
    val result = pairs.updateStateByKey(updateFunc)

    //按照单词出现的次数降序
    val finalResult:DStream[(String,Int)] = result.transform(rdd =>{
      //可以使用RDD排序的方法来操作
      val sortRDD:RDD[(String,Int)] = rdd.sortBy(_._2,false)
      //取出次数最多的前三位
      val top3:Array[(String,Int)] = sortRDD.take(3)
      //打印
      println("##############Top3 Start##############")
      top3.foreach(println)
      println("##############Top3 End##############")

      sortRDD
    })

    //打印输出
    finalResult.print()
    //打印输出
    result.print()

    //启动并等待数据流的到来
    ssc.start()
    ssc.awaitTermination()

  }

}

数据

DStreaming练习

结果

DStreaming练习