spark streaming 如何在 start 之后,修改闭包对象 。计算更新
spark streaming 在 start 之后 ,我想改变计算规则,系统报告不能修改. 异常如下
Exception in thread "Thread-14" java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)
at org.apache.spark.streaming.dstream.FlatMappedDStream.<init>(FlatMappedDStream.scala:29)
at org.apache.spark.streaming.dstream.DStream$$anonfun$flatMap$1.apply(DStream.scala:554)
我的版本是 2.0. 但是我们的业务需求,就是要根据 用户定义的报表规则来统计报表。 经过分析, 报表的计算规则 ,可以包装成 对象, 来在 driver 中更新,用数据驱动的方式,来完成这一需求。
但是 spark streaming 一段 start 之后,闭包里面的 函数 和对象都是不能再修改的。 所有需要自己动手修源码。 修改后 打包 测试通过。
修改如下:
**************************************************************************************************************
org.apache.spark.streaming.DStreamGraph
//yunzhi.lyz val -> var
private var outputStreams = new ArrayBuffer[DStream[_]]()
def start(time: Time) {
this.synchronized {
// yunzhi.lyz delete
//require(zeroTime == null, "DStream graph computation already started")
zeroTime = time
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validateAtStart)
inputStreams.par.foreach(_.start())
}
}
// yunzhi.lyz + def
def clearOutputStreams()= this.synchronized{
outputStreams = new ArrayBuffer[DStream[_]]()
}
**************************************************************************************************************
org.apache.spark.streaming.StreamingContext
// yunzhi.lyz +def
def getGraph = graph
def getScheduler() = scheduler
**************************************************************************************************************
org.apache.spark.streaming.dstream.DStream
// yunzhi.lyz + def
def getGraph() = graph
private[streaming] def initialize(time: Time) {
// yunzhi.lyz delete
// if (zeroTime != null && zeroTime != time) {
// throw new SparkException("ZeroTime is already initialized to " + zeroTime
// + ", cannot initialize it again to " + time)
// }
zeroTime = time
......
private def validateAtInit(): Unit = {
ssc.getState() match {
case StreamingContextState.INITIALIZED =>
// good to go
// yunzhil.lyz --
case StreamingContextState.ACTIVE =>
// throw new IllegalStateException(
// "Adding new inputs, transformations, and output operations after " +
// "starting a context is not supported")
case StreamingContextState.STOPPED =>
throw new IllegalStateException(
"Adding new inputs, transformations, and output operations after " +
"stopping a context is not supported")
}
}
/**
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
*/
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
// yunzhi.lyz --
// if (isTimeValid(time)) {
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD)
}
rddOption
// yunzhi.lyz --
// } else {
// None
// }
}
}
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
// yunzhi.lyz --
// case None => None
}
}
private[streaming] def clearMetadata(time: Time) {
val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
// yunzhi.lyz change
val oldRDDs = generatedRDDs
.filter(_._1 != null)
.filter(_._2 != null)
.filter(_._1 <= (time - rememberDuration))
//val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
logDebug("Clearing references to old RDDs: [
**************************************************************************************************************
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl
/** Report error to the receiver tracker */
def reportError(message: String, error: Throwable) {
// yunzhi.lyz --
// val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("")
// trackerEndpoint.send(ReportError(streamId, message, errorString))
logWarning("Reported error " + message + " - " + error)
}
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
// yunzhi.lyz --
//val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, ""))
logInfo("Stopped receiver " + streamId)
}
**************************************************************************************************************
org.apache.spark.streaming.scheduler.JobGenerator
// yunzhi.lyz ++
private var nextTime = -1L
def getTimer() = timer
// yunzhi.lyz -- private
def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
**************************************************************************************************************
org.apache.spark.streaming.scheduler.JobScheduler
// yunzhi.lyz +
def getjobGenerator() = jobGenerator
**************************************************************************************************************
org.apache.spark.streaming.util.RecurringTimer
//yunzhi.lyz change val -> var
private var thread = new Thread("RecurringTimer - " + name) {
setDaemon(true)
override def run() { loop }
}
// yunzhi.lyz ++ def
def getReStartTime(t:Int): Long = {
(math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
}
def getNextTime() :Long= {nextTime}
// yunzhi.lyz ++ def
def resume(startTime:Long):Long = synchronized{
nextTime = startTime
thread.resume()
logInfo("ReStarted timer for " + name + " at time " + nextTime)
nextTime
}
def stop(interruptTimer: Boolean): Long = synchronized {
if (!stopped) {
stopped = true
if (interruptTimer) {
thread.interrupt()
}
// yunzhi.lyz
//thread.join()
thread.suspend()
logInfo("Stopped timer for " + name + " after time " + prevTime)
}
//prevTime
nextTime
}
修改之后 ,重新编译 打包。
测试代码
def exchangeworkbatch(tp: String, st: String) {
if (tp.equals("+")) workbatch += (st) if (tp.equals("-")) workbatch += (st) } cnt = cnt + 1 val cntn = cnt val wbArray = workbatch.toArray workbatch.foreach(x => println("*************" + x)) TTObject.ssc.getGraph.clearOutputStreams wordCounts5 = TTObject.lines.flatMap(x => flatmapchange(wbArray, x)) .map(x => ("wordCounts4** " + cntn + " **" + x, 1)) .reduceByKey(_ + _) wordCounts5.print wordCounts3 = TTObject.lines.flatMap(x => flatmapchange(wbArray, x)) .map(x => ("wordCounts3** " + cntn + " **" + x, 1)) .reduceByKey(_ + _) wordCounts3.print TTObject.ssc.getGraph.start(new Time(TTObject.ssc.getScheduler.getjobGenerator.getTimer.getNextTime))
测试代码:
package scalasrc
import org.apache.spark.streaming.ttstream.TTUtils import org.apache.spark.SparkConf import org.apache.spark.streaming.{Time, Seconds, StreamingContext} import scala.collection._ object TTWordCountFreshRule { val splitTag = new mutable.ArrayBuffer[String] def main(args: Array[String]) { val tagName = "" val subId = "" val accessKey = "" val sparkConf = new SparkConf().setAppName("TTWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //ssc.checkpoint("checkpoint") val numStreams = 3 var offSet = 1478448000l if (offSet == 0) offSet = System.currentTimeMillis() / 1000 val ttStreams = (1 to numStreams).map { i => TTUtils.createStream(ssc, tagName, subId, accessKey, offSet, i - 1).map(_._2) } val lines = ssc.union(ttStreams).filter(x => x != null) splitTag += "dd" val splitTagarray = splitTag.toArray val wordCounts5 = lines .flatMap(x => splitString(x, splitTagarray)) .map(x => (x, 1)).reduceByKey(_ + _) wordCounts5.count().print wordCounts5.print runJObThread ssc.start() ssc.awaitTermination() def runJObThread() { new Thread() { override def run { while (true) { try { println("runJObThread begin.....") Thread.sleep(20000) splitTag += "cc" // 重复提交 var cnt = 0 splitTag.foreach(x => { println(s" cnt: $cnt ,value : $x "); cnt += 1 }) ssc.getGraph.clearOutputStreams val splitTagarray = splitTag.toArray val wordCounts5 = lines.flatMap(x => splitString(x, splitTagarray)).map(x => (x, 1)).reduceByKey(_ + _) wordCounts5.count().print wordCounts5.print ssc.getGraph.start(new Time(ssc.getScheduler.getjobGenerator.getTimer.getNextTime)) } catch { case ex: Exception => println(ex.getMessage) } } } }.start } } def splitString(st: String, splitTagarray: Array[String]) = { var splitArray = new mutable.ArrayBuffer[String] splitTagarray.foreach(x => splitArray ++= st.split(x)) splitArray.toArray } }
如果有定制化报表,不停止 streaming 作业, 只需要在 Thread 里面 去 读存储,数据驱动,就可以完成计算规则动态化的需求。
问题,发现这么改 checkpoint ,有状态的计算 有问题。
************************************************************************************
现在想想,不用改的那么蛋疼,在 worker 里面 引用个 object , 里面起个线程,去动态在库中刷配置就好了。这个唯一可能导致的问题, 各个 worker 获取 元数据时间不一致, 导致 准确性存在问题。 不过如果是 刚建立的报表 也有一段调试期的,不影响业务。
虽然 spark 的 map ,reduce 的算子里面 没有 setup ,但是 work 上可以 调 object 静态方法, object 可以起进程。 map ,reduce 等算子中的函数,可以是object 中的函数, 在work 上 第一次 调用 object 的函数时候。会执行 除方法以外的所有语句。
package streaming import scala.collection._ import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext, _} object workerObjectGetMeta { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(10)) val lines = ssc.socketTextStream("192.168.0.140", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(x => CalUtils.splitS(x)) val wordCounts = words.map(x => (x, 1)) wordCounts.reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } } object CalUtils{ val splitArray = new mutable.ArrayBuffer[String] splitArray += "aa" // new Thread() {}.start() 作为语句, 线程 在object 第一次被调用的时候自动起来 new Thread() { override def run { while (true) { try { println("runJObThread begin.....") Thread.sleep(5000) splitArray += "cc" // 重复提交 var cnt = 0 splitArray.foreach(x => { println(s" cnt: $cnt ,value : $x "); cnt += 1 }) } } } }.start() def splitS(s:String):Array[String] = { val rArray = new mutable.ArrayBuffer[String] splitArray.foreach( x => { rArray ++= s.split(x) }) println(s"splitArray: ${splitArray.mkString(",")} ") println(s"aArray: ${rArray.mkString(",")} ") rArray.toArray } }