spark 数据分析
//练习sparkstreaming监听socket端口
//手写wordcount java代码
package com.swust.streaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class TestSparkStreaming { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local[2]").setAppName("stream"); JavaSparkContext jsc = new JavaSparkContext(conf); // jsc.setLogLevel("error"); JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(5000)); //监听端口 JavaReceiverInputDStream<String> lines = ssc.socketTextStream("data005", 9999); // word count JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { String[] splits = s.split(" "); return Arrays.asList(splits).iterator(); } }); JavaPairDStream<String, Integer> wordRdd = (JavaPairDStream<String, Integer>) words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { String key = word; int value = 1; Tuple2<String, Integer> tp = new Tuple2<>(key, value); return tp; } }); JavaPairDStream<String, Integer> resultRdd = wordRdd.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); resultRdd.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() { @Override public void call(JavaPairRDD<String, Integer> pairRDD, Time time) throws Exception { pairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> tp) throws Exception { System.out.println(tp._1+"-----------------"+tp._2); } }); } }); ssc.start(); try { ssc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } ssc.stop(false); } }
相关推荐
hanwentan 2020-07-21
Lzs 2020-10-23
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
jacktangj 2020-10-14
ChaITSimpleLove 2020-10-06
Andrea0 2020-09-18
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26