Spark 2017 BigData Update(5)Spark Streaming in Java

Spark2017BigDataUpdate(5)SparkStreaminginJava

TheStreamingSmallexampleClass,WordCountStreamingApp.java

packagecom.sillycat.sparkjava.app;

importjava.util.Arrays;

importjava.util.Collection;

importjava.util.HashMap;

importjava.util.List;

importjava.util.Map;

importorg.apache.kafka.clients.consumer.ConsumerRecord;

importorg.apache.kafka.common.serialization.StringDeserializer;

importorg.apache.spark.SparkConf;

importorg.apache.spark.api.java.JavaRDD;

importorg.apache.spark.api.java.function.Function;

importorg.apache.spark.streaming.Duration;

importorg.apache.spark.streaming.api.java.JavaInputDStream;

importorg.apache.spark.streaming.api.java.JavaStreamingContext;

importorg.apache.spark.streaming.kafka010.ConsumerStrategies;

importorg.apache.spark.streaming.kafka010.KafkaUtils;

importorg.apache.spark.streaming.kafka010.LocationStrategies;

importcom.sillycat.sparkjava.base.SparkBaseApp;

publicclassWordCountStreamingAppextendsSparkBaseApp{

privatestaticfinallongserialVersionUID=7401544141510431796L;

protectedStringgetAppName(){

return"WordCountStreamingApp";

}

publicvoidexecuteTask(List<String>params){

SparkConfconf=this.getSparkConf();

//Thetimeintervalatwhichstreamingdatawillbedividedinto

//batches

logger.info("Starttohavethestreaming");

JavaStreamingContextssc=newJavaStreamingContext(conf,newDuration(30000));

ssc.checkpoint(this.getAppName());

logger.info("Preparetheresourceforstreaming");

processStream(ssc,"carl");

logger.info("Streamingisworking");

try{

ssc.start();

ssc.awaitTermination();

}catch(InterruptedExceptione){

logger.error("InterruptedException:",e);

}

}

privatevoidprocessStream(JavaStreamingContextssc,Stringkeyword){

Map<String,Object>kafkaParams=newHashMap<>();

kafkaParams.put("bootstrap.servers","fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092");

kafkaParams.put("key.deserializer",StringDeserializer.class);

kafkaParams.put("value.deserializer",StringDeserializer.class);

kafkaParams.put("group.id","WordCountStreamingApp");

kafkaParams.put("auto.offset.reset","latest");

kafkaParams.put("enable.auto.commit",true);

Collection<String>topics=Arrays.asList("sillycat-topic");

logger.info("InittheKafkaClientstofetchlines");

JavaInputDStream<ConsumerRecord<String,String>>dStream=KafkaUtils.createDirectStream(ssc,

LocationStrategies.PreferConsistent(),

ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams));

dStream.foreachRDD(rdd->{

processRows(rdd,keyword);

});

}

privatevoidprocessRows(JavaRDD<ConsumerRecord<String,String>>rdds,Stringkeyword){

JavaRDD<String>rows=rdds.map(record->record.value());

JavaRDD<String>lines=rows.filter(newFunction<String,Boolean>(){

privatestaticfinallongserialVersionUID=1L;

publicBooleancall(Strings)throwsException{

if(s==null||s.trim().length()<1){

returnfalse;

}

if(!s.contains(keyword)){

returnfalse;

}

returntrue;

}

});

longcount=lines.count();

logger.info("Kafkareceived"+count+""+keyword);

}

}

Hereishowweshouldrunallthesetesting:

#Runthelocal#

>java-jartarget/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.CountLinesOfKeywordApp

>java-jartarget/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.WordCountStreamingApp

#Runbinaryonlocal#

>bin/spark-submit--classcom.sillycat.sparkjava.SparkJavaApp/Users/carl/work/sillycat/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.CountLinesOfKeywordApp

>bin/spark-submit--classcom.sillycat.sparkjava.SparkJavaApp/home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.WordCountStreamingApp

#RunbinaryonRemoteYARNCluster#

>bin/spark-submit--classcom.sillycat.sparkjava.SparkJavaApp--masteryarn-client/home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.CountLinesOfKeywordApp

>bin/spark-submit--classcom.sillycat.sparkjava.SparkJavaApp--masteryarn-client/home/ec2-user/users/carl/sillycat-spark-java/target/sillycat-spark-java-1.0-jar-with-dependencies.jarcom.sillycat.sparkjava.app.WordCountStreamingApp

References: