Stream整合Flume
package com.bawei.stream import java.net.InetSocketAddress import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object StreamFlume { def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount =runningCount.getOrElse(0)+newValues.sum Some(newCount) } def main(args: Array[String]): Unit = { //配置sparkConf参数 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]") //构建sparkContext对象 val sc: SparkContext = new SparkContext(sparkConf) sc.setLogLevel("WARN") //构建StreamingContext对象,每个批处理的时间间隔 val scc: StreamingContext = new StreamingContext(sc, Seconds(5)) //设置checkpoint scc.checkpoint("C:\\Users\\Desktop\\checkpoint2") //设置flume的地址,可以设置多台 val address=Seq(new InetSocketAddress("192.168.182.147",8888)) // 从flume中拉取数据 val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK) //获取flume中数据,数据存在event的body中,转化为String val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array())) //实现单词汇总 val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction) result.print() scc.start() scc.awaitTermination() } }
相关推荐
chenguangchun 2020-07-26
myt0 2020-07-18
IT影风 2020-07-18
chenguangchun 2020-06-28
jiaomrswang 2020-06-26
myt0 2020-06-16
xiaoxiaojavacsdn 2020-06-08
zzjmay 2020-06-07
strongyoung 2020-06-04
ErixHao 2020-05-20
啦啦啦啦啦 2020-05-15
wanfuchun 2020-05-14
xiaoxiaojavacsdn 2020-05-01
chenguangchun 2020-04-18
QAnyang 2020-03-14
wsong 2020-03-13
硅步至千里 2020-02-22