spark rdd dataset sql udf udaf

以后为了操作的便利性,把逻辑都包装成udf,udaf.

写一个包装接口,对一份数据的操作,

直接在repl给hdfs加上metadesc

在repl直接写sql.出来的结果,直接拿save.

可以积累业务逻辑。重用

importorg.apache.spark.sql.{Row,SparkSession}

importorg.apache.spark.sql.expressions.{MutableAggregationBuffer,UserDefinedAggregateFunction}

importorg.apache.spark.sql.types._

importorg.apache.spark.{SparkContext,SparkConf}

importyunzhi.utils._

objectwordCount{

defmain(args:Array[String]){

valspark=SparkSession

.builder()

.appName("SparkSQLDataSouresExample")

.config("spark.some.config.option","some-value")

.getOrCreate()

valdata=spark.sparkContext.textFile("testdata/c01/wc.txt")//读取文件

.flatMap(_.split(""))

.map((_,10))

data.cache()

//Rdd

data.reduceByKey(_+_)

.collect()

.foreach(println)//word计数

//sqlrdd=>DataFrame要加入implicits转换

importspark.implicits._

valdataDF=data.toDF()

dataDF.createOrReplaceTempView("dataDF")

spark.sql("SELECT_1,sum(_2)ascntFROMdataDFgroupby_1").show()

//registerUDF

spark.sqlContext.udf.register("strLen",(s:String)=>s.length())

spark.sql("SELECTstrLen(_1),sum(_2)ascntFROMdataDFgroupby_1").show()

//registerUDAFwordCountUDAF(String)

spark.sqlContext.udf.register("wordCountUDAF",newwordCountUDAF)

spark.sql("SELECTstrLen(_1),wordCountUDAF(_1)ascntFROMdataDFgroupby_1").show()

//registerUDAFsum(Int)

spark.sqlContext.udf.register("IntSumUDAF",newIntSumUDAF)

spark.sql("SELECT_1,wordCountUDAF(_1)ascountcnt,IntSumUDAF(_2)assumcntFROMdataDFgroupby_1").show()

}

}

packageyunzhi.utils

importorg.apache.spark.sql.Row

importorg.apache.spark.sql.expressions.{MutableAggregationBuffer,UserDefinedAggregateFunction}

importorg.apache.spark.sql.types._

/**

*Createdbylon16-10-13.

*/

classUDAFUtil{

}

classwordCountUDAFextendsUserDefinedAggregateFunction{//ctrl+I实现复写方法

/**

*该方法指定具体输入数据的类型Array类型可以输入多个参数,定义多个StructField,Array格式,由于sql中,可能传入,多个列,在udaf中都是数组.

*@return

*/

overridedefinputSchema:StructType=StructType(Array(StructField("input",StringType,true),StructField("input",StringType,true)))

/**

*在进行聚合操作的时候要处理的数据的结果的类型Array可以定义多个StructFieldArray格式

*@return

*/

overridedefbufferSchema:StructType=StructType(Array(StructField("count",IntegerType,true)))

/**

*指定UDAF函数计算后返回的结果类型

*@return

*/

overridedefdataType:DataType=IntegerType

overridedefdeterministic:Boolean=true

/**

*在Aggregate之前每组数据的初始化结果

*@parambuffer

*/

overridedefinitialize(buffer:MutableAggregationBuffer):Unit={buffer(0)=0}

/**

*在进行聚合的时候有新的值进来,对分组后的聚合如何进行计算

*本地的聚合操作,相当于HadoopMapReduce模型中的Combiner(这里的Row跟DataFrame的Row无关)worker里面先计算

*@parambuffer

*@paraminput

*/

overridedefupdate(buffer:MutableAggregationBuffer,input:Row):Unit={

buffer(0)=buffer.getAs[Int](0)+1

}

/**

*最后在分布式节点进行LocalReduce完成后需要进行全局级别的Merge操作worker之前的计算

*@parambuffer1

*@parambuffer2

*/

overridedefmerge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit={

buffer1(0)=buffer1.getAs[Int](0)+buffer2.getAs[Int](0)

}

/**

*返回UDAF最后的计算结果

*@parambuffer

*@return

*/

overridedefevaluate(buffer:Row):Any=buffer.getAs[Int](0)

}

//和aggrerate函数的参数的行为很类似.initializeupdatemerge......

classIntSumUDAFextendsUserDefinedAggregateFunction{//ctrl+I实现复写方法

/**

*该方法指定具体输入数据的类型

*@return

*/

overridedefinputSchema:StructType=StructType(Array(StructField("input",IntegerType,true)))

/**

*在进行聚合操作的时候要处理的数据的结果的类型

*@return

*/

overridedefbufferSchema:StructType=StructType(Array(StructField("count",IntegerType,true)))

/**

*指定UDAF函数计算后返回的结果类型

*@return

*/

overridedefdataType:DataType=IntegerType

overridedefdeterministic:Boolean=true

/**

*在Aggregate之前每组数据的初始化结果

*@parambuffer

*/

overridedefinitialize(buffer:MutableAggregationBuffer):Unit={buffer(0)=0}

/**

*在进行聚合的时候有新的值进来,对分组后的聚合如何进行计算

*本地的聚合操作,相当于HadoopMapReduce模型中的Combiner(这里的Row跟DataFrame的Row无关)worker里面先计算

*@parambuffer

*@paraminput

*/

overridedefupdate(buffer:MutableAggregationBuffer,input:Row):Unit={

buffer(0)=buffer.getAs[Int](0)+input.getAs[Int](0)

}

/**

*最后在分布式节点进行LocalReduce完成后需要进行全局级别的Merge操作worker之前的计算

*@parambuffer1

*@parambuffer2

*/

overridedefmerge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit={

buffer1(0)=buffer1.getAs[Int](0)+buffer2.getAs[Int](0)

}

/**

*返回UDAF最后的计算结果

*@parambuffer

*@return

*/

overridedefevaluate(buffer:Row):Any=buffer.getAs[Int](0)

}

相关推荐