Spark-Streaming 和 Kafka 做实时计算需要注意的点
流式计算中最重要的消息的消费
当我们使用spark做准实时计算的时候,很大场景都是和kafka的通信,总结下spark使用kafka的注意事项,下面上代码
1
package com.aura.bigdata.spark.scala.streaming.p1
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.JavaConversions
/**
* 使用的zookeeper来管理sparkdriver读取的offset偏移量
* 将kafka对应的topic的offset保存到的路径
*
* 约定,offset的保存到路径
* /xxxxx/offsets/topic/group/partition/
* 0
* 1
* 2
*
* bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka
*/
object _07SparkKafkaDriverHAZooKeeperOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
if(args == null || args.length < 4) {
println(
"""
|Parameter Errors! Usage: <batchInterval> <zkQuorum> <groupId> <topics>
|batchInterval : 批次间隔时间
|zkQuorum : zookeeper url地址
|groupId : 消费组的id
|topic : 读取的topic
""".stripMargin)
System.exit(-1)
}
val Array(batchInterval, zkQuorum, group, topic) = args
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092",
"auto.offset.reset"-> "smallest"
)
val conf = new SparkConf().setMaster("local[2]").setAppName("_06SparkKafkaDirectOps2")
def createFunc():StreamingContext = {
val ssc = new StreamingContext(conf, Seconds(batchInterval.toLong))
//读取kafka的数据
val messages = createMessage(ssc, kafkaParams, topic, group)
//业务操作
messages.foreachRDD((rdd, bTime) => {
if(!rdd.isEmpty()) {
println("###########################->RDD count: " + rdd.count)
println("###########################->RDD count: " + bTime)
//所有的业务操作只能在这里完成 这里的处理逻辑和rdd的操作一模一样
}
//处理完毕之后将偏移量保存回去
storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, topic, group)
})
ssc
}
//开启的高可用的方式 要从失败中恢复过来
val ssc = StreamingContext.getActiveOrCreate(createFunc _)
ssc.start()
ssc.awaitTermination()
}
def storeOffsets(offsetRanges: Array[OffsetRange], topic: String, group: String): Unit = {
val zkTopicPath = s"/offsets/${topic}/${group}"
for (range <- offsetRanges) {//每一个range中都存储了当前rdd中消费之后的偏移量
val path = s"${zkTopicPath}/${range.partition}"
ensureZKExists(path)
client.setData().forPath(path, (range.untilOffset + "").getBytes())
}
}
/*
* 约定,offset的保存到路径 ----->zookeeper
* /xxxxx/offsets/topic/group/partition/
* 0
* 1
* 2
*/
def createMessage(ssc:StreamingContext, kafkaParams:Map[String, String], topic:String, group:String):InputDStream[(String, String)] = {
//从zookeeper中读取对应的偏移量,返回值适应fromOffsets和flag(标志位)
val (fromOffsets, flag) = getFromOffsets(topic, group)
var message:InputDStream[(String, String)] = null
if(!flag) {
//有数据-->zookeeper中是否保存了SparkStreaming程序消费kafka的偏移量信息
//处理第一次以外,从这个接口读取kafka对应的数据
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
} else {
//第一次读取的时候
message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic.split(",").toSet)
}
message
}
//从zookeeper中读取kafka对应的offset --->
def getFromOffsets(topic:String, group:String): (Map[TopicAndPartition, Long], Boolean) = {
///xxxxx/offsets/topic/group/partition/
val zkTopicPath = s"/offsets/${topic}/${group}"
ensureZKExists(zkTopicPath)
//如果有直接读取对应的数据
val offsets = for{p <- JavaConversions.asScalaBuffer(
client.getChildren.forPath(zkTopicPath))} yield {
// p --->分区所对应的值
val offset = client.getData.forPath(s"${zkTopicPath}/${p}")
(TopicAndPartition(topic, p.toInt), new String(offset).toLong)
}
if(!offsets.isEmpty) {
(offsets.toMap, false)
} else {
(offsets.toMap, true)
}
}
def ensureZKExists(zkTopicPath:String): Unit = {
if(client.checkExists().forPath(zkTopicPath) == null) {//zk中没有没写过数据
client.create().creatingParentsIfNeeded().forPath(zkTopicPath)
}
}
val client = {//代码块编程 zk(servlet)--->Curator(SpringMVC/Struts2)
val client = CuratorFrameworkFactory.builder()
.namespace("mykafka")//命名空间就是目录意思
.connectString("bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build()
client.start()
client
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
总结
在spark的使用是特别注意使用kafka的时候要处理消息的偏移量。
---------------------
作者:梧桐听夜雨
原文:https://blog.csdn.net/qq_16457097/article/details/85341057
版权声明:本文为博主原创文章,转载请附上博文链接!