Spark 2017 BigData Update(5)Spark Streaming in Java
Spark2017BigDataUpdate(5)SparkStreaminginJava
TheStreamingSmallexampleClass,WordCountStreamingApp.java
packagecom.sillycat.sparkjava.app;
importjava.util.Arrays;
importjava.util.Collection;
importjava.util.HashMap;
importjava.util.List;
importjava.util.Map;
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.apache.kafka.common.serialization.StringDeserializer;
importorg.apache.spark.SparkConf;
importorg.apache.spark.api.java.JavaRDD;
importorg.apache.spark.api.java.function.Function;
importorg.apache.spark.streaming.Duration;
importorg.apache.spark.streaming.api.java.JavaInputDStream;
importorg.apache.spark.streaming.api.java.JavaStreamingContext;
importorg.apache.spark.streaming.kafka010.ConsumerStrategies;
importorg.apache.spark.streaming.kafka010.KafkaUtils;
importorg.apache.spark.streaming.kafka010.LocationStrategies;
importcom.sillycat.sparkjava.base.SparkBaseApp;
publicclassWordCountStreamingAppextendsSparkBaseApp{
privatestaticfinallongserialVersionUID=7401544141510431796L;
protectedStringgetAppName(){
return"WordCountStreamingApp";
}
publicvoidexecuteTask(List<String>params){
SparkConfconf=this.getSparkConf();
//Thetimeintervalatwhichstreamingdatawillbedividedinto
//batches
logger.info("Starttohavethestreaming");
JavaStreamingContextssc=newJavaStreamingContext(conf,newDuration(30000));
ssc.checkpoint(this.getAppName());
logger.info("Preparetheresourceforstreaming");
processStream(ssc,"carl");
logger.info("Streamingisworking");
try{
ssc.start();
ssc.awaitTermination();
}catch(InterruptedExceptione){
logger.error("InterruptedException:",e);
}
}
privatevoidprocessStream(JavaStreamingContextssc,Stringkeyword){
Map<String,Object>kafkaParams=newHashMap<>();
kafkaParams.put("bootstrap.servers","fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092");
kafkaParams.put("key.deserializer",StringDeserializer.class);
kafkaParams.put("value.deserializer",StringDeserializer.class);
kafkaParams.put("group.id","WordCountStreamingApp");
kafkaParams.put("auto.offset.reset","latest");
kafkaParams.put("enable.auto.commit",true);
Collection<String>topics=Arrays.asList("sillycat-topic");
logger.info("InittheKafkaClientstofetchlines");
JavaInputDStream<ConsumerRecord<String,String>>dStream=KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams));
dStream.foreachRDD(rdd->{
processRows(rdd,keyword);
});
}
privatevoidprocessRows(JavaRDD<ConsumerRecord<String,String>>rdds,Stringkeyword){
JavaRDD<String>rows=rdds.map(record->record.value());
JavaRDD<String>lines=rows.filter(newFunction<String,Boolean>(){
privatestaticfinallongserialVersionUID=1L;
publicBooleancall(Strings)throwsException{
if(s==null||s.trim().length()<1){
returnfalse;
}
if(!s.contains(keyword)){
returnfalse;
}
returntrue;
}
});
longcount=lines.count();
logger.info("Kafkareceived"+count+""+keyword);
}
}
Hereishowweshouldrunallthesetesting:
#Runthelocal#
>java-jartarget/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.CountLinesOfKeywordApp
>java-jartarget/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.WordCountStreamingApp
#Runbinaryonlocal#
>bin/spark-submit--classcom.sillycat.sparkjava.SparkJavaApp/Users/carl/work/sillycat/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.CountLinesOfKeywordApp
>bin/spark-submit--classcom.sillycat.sparkjava.SparkJavaApp/home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.WordCountStreamingApp
#RunbinaryonRemoteYARNCluster#
>bin/spark-submit--classcom.sillycat.sparkjava.SparkJavaApp--masteryarn-client/home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.CountLinesOfKeywordApp
>bin/spark-submit--classcom.sillycat.sparkjava.SparkJavaApp--masteryarn-client/home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.WordCountStreamingApp
References: