spark streaming窗口函数的使用和理解

spark streaming中的窗口函数虽然不如flink那么丰富,但是特别有用,看下面例子:


  1. kafkaStream.transform { rdd =>
  2. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  3. rdd}.map(_._2).map((_, 1)).reduceByKeyAndWindow((v1: Int, v2: Int) => {
  4. v1 + v2
  5. }, Seconds(8),
  6. Seconds(4))

表示每隔4秒(后面的4秒),计算最近8秒(前面的8秒)的数据。

第一个时间称为窗口长度,第二个时间称为滑动长度,其含义表示每隔4秒计算最近最近8秒的数据,这可以用于一些业务场景,例如网站记录,每隔1个小时计算最近两个小时的pv量,还有一种业务场景的话先在内存中做累加再更新到redis中做累加,比如说每隔5秒统计最近5秒的数据的总和,再刷到redis中做累加,因为频繁操作redis的话会存在问题,还有一个时间如下:

val ssc = new StreamingContext(sparkConf, Seconds(4))

此处设置的batch Interval是在spark streaming中生成基本Job的时间单位,窗口和滑动时间间隔一定是该batch Interval的整数倍,若要在内存中做简单的累加只要设置窗口长度和滑动长度相同即可。

持久化:

因为要窗口函数要用前面所用到的rdd,在这里必须checkpoint,

看下面一个例子:


  1. package com.jingde.sparkstreamlast
  2. import kafka.serializer.StringDecoder
  3. import org.apache.log4j.{ Level, Logger }
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.rdd.RDD
  6. import org.apache.spark.streaming.kafka._
  7. import org.apache.spark.streaming.{ Seconds, StreamingContext }
  8. import org.apache.spark.streaming.kafka.KafkaUtils
  9. import org.apache.spark.streaming.kafka.OffsetRange
  10. import org.apache.log4j.{ Level, Logger }
  11. import org.I0Itec.zkclient.ZkClient
  12. import org.I0Itec.zkclient.exception.ZkMarshallingError
  13. import org.I0Itec.zkclient.serialize.ZkSerializer
  14. import kafka.utils.ZkUtils
  15. import kafka.utils.ZKGroupTopicDirs
  16. import org.apache.spark.streaming.dstream.InputDStream
  17. import kafka.common.TopicAndPartition
  18. import kafka.message.MessageAndMetadata
  19. import kafka.api.OffsetRequest
  20. import kafka.api.PartitionOffsetRequestInfo
  21. import kafka.consumer.SimpleConsumer
  22. import kafka.api.TopicMetadataRequest
  23. object StreamingFromKafka {
  24. val groupId = "logs"
  25. val topic = "streaming"
  26. val zkClient = new ZkClient("localhost:9999", 60000, 60000, new ZkSerializer {
  27. override def serialize(data: Object): Array[Byte] = {
  28. try {
  29. return data.toString().getBytes("UTF-8")
  30. } catch {
  31. case e: ZkMarshallingError => return null
  32. }
  33. }
  34. override def deserialize(bytes: Array[Byte]): Object = {
  35. try {
  36. return new String(bytes, "UTF-8")
  37. } catch {
  38. case e: ZkMarshallingError => return null
  39. }
  40. }
  41. })
  42. val topicDirs = new ZKGroupTopicDirs("spark_streaming_test", topic)
  43. val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
  44. def main(args: Array[String]): Unit = {
  45. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  46. Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
  47. val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
  48. sparkConf.setMaster("local[*]")
  49. sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "2")
  50. sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  51. val ssc = new StreamingContext(sparkConf, Seconds(5))
  52. val kafkaParams = Map("metadata.broker.list" -> "localhost:9092", "group.id" -> groupId, "zookeeper.connect" -> "localhost:9999",
  53. "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString)
  54. val topics = Set(topic)
  55. val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")
  56. var kafkaStream: InputDStream[(String, String)] = null
  57. var fromOffsets: Map[TopicAndPartition, Long] = Map()
  58. ssc.checkpoint("D:\tmp\storm-hdfs") //这里是hdfs路径,因为要做窗口函数,需要用到前面的rdd,这里必须要用checkpoint
  59. if (children > 0) {
  60. //---get partition leader begin----
  61. val topicList = List(topic)
  62. val req = new TopicMetadataRequest(topicList, 0) //得到该topic的一些信息,比如broker,partition分布情况
  63. val getLeaderConsumer = new SimpleConsumer("localhost", 9092, 10000, 10000, "OffsetLookup") // brokerList的host 、brokerList的port、过期时间、过期时间
  64. val res = getLeaderConsumer.send(req) //TopicMetadataRequest topic broker partition 的一些信息
  65. val topicMetaOption = res.topicsMetadata.headOption
  66. val partitions = topicMetaOption match {
  67. case Some(tm) =>
  68. tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]
  69. case None =>
  70. Map[Int, String]()
  71. }
  72. for (i <- 0 until children) {
  73. val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
  74. val tp = TopicAndPartition(topic, i)
  75. //---additional begin-----
  76. val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))) // -2,1
  77. val consumerMin = new SimpleConsumer(partitions(i), 9092, 10000, 10000, "getMinOffset")
  78. val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
  79. var nextOffset = partitionOffset.toLong
  80. if (curOffsets.length > 0 && nextOffset < curOffsets.head) { //如果下一个offset小于当前的offset
  81. nextOffset = curOffsets.head
  82. }
  83. //---additional end-----
  84. fromOffsets += (tp -> nextOffset)
  85. fromOffsets += (tp -> partitionOffset.toLong) //将不同 partition 对应的 offset 增加到 fromOffsets 中
  86. }
  87. val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()) //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
  88. kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
  89. } else {
  90. kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
  91. }
  92. var offsetRanges = Array[OffsetRange]()
  93. kafkaStream.transform { rdd =>
  94. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  95. rdd
  96. }.map(_._2).map((_, 1)).reduceByKeyAndWindow((v1: Int, v2: Int) => {
  97. v1 + v2
  98. }, Seconds(5), //每隔5秒(后面的5秒),计算最近5秒(前面的5秒)的数据
  99. Seconds(5)).foreachRDD {
  100. rdd =>
  101. rdd.foreachPartition { element => element.foreach {
  102. println } }
  103. for (o <- offsetRanges) {
  104. ZkUtils.updatePersistentPath(zkClient, s"${topicDirs.consumerOffsetDir}/${o.partition}", o.fromOffset.toString)
  105. }
  106. }
  107. ssc.start()
  108. ssc.awaitTermination()
  109. }
  110. }

spark streaming窗口函数的使用和理解

相关推荐