spark rdd dataset sql udf udaf
以后为了操作的便利性,把逻辑都包装成udf,udaf.
写一个包装接口,对一份数据的操作,
直接在repl给hdfs加上metadesc
在repl直接写sql.出来的结果,直接拿save.
可以积累业务逻辑。重用
importorg.apache.spark.sql.{Row,SparkSession}
importorg.apache.spark.sql.expressions.{MutableAggregationBuffer,UserDefinedAggregateFunction}
importorg.apache.spark.sql.types._
importorg.apache.spark.{SparkContext,SparkConf}
importyunzhi.utils._
objectwordCount{
defmain(args:Array[String]){
valspark=SparkSession
.builder()
.appName("SparkSQLDataSouresExample")
.config("spark.some.config.option","some-value")
.getOrCreate()
valdata=spark.sparkContext.textFile("testdata/c01/wc.txt")//读取文件
.flatMap(_.split(""))
.map((_,10))
data.cache()
//Rdd
data.reduceByKey(_+_)
.collect()
.foreach(println)//word计数
//sqlrdd=>DataFrame要加入implicits转换
importspark.implicits._
valdataDF=data.toDF()
dataDF.createOrReplaceTempView("dataDF")
spark.sql("SELECT_1,sum(_2)ascntFROMdataDFgroupby_1").show()
//registerUDF
spark.sqlContext.udf.register("strLen",(s:String)=>s.length())
spark.sql("SELECTstrLen(_1),sum(_2)ascntFROMdataDFgroupby_1").show()
//registerUDAFwordCountUDAF(String)
spark.sqlContext.udf.register("wordCountUDAF",newwordCountUDAF)
spark.sql("SELECTstrLen(_1),wordCountUDAF(_1)ascntFROMdataDFgroupby_1").show()
//registerUDAFsum(Int)
spark.sqlContext.udf.register("IntSumUDAF",newIntSumUDAF)
spark.sql("SELECT_1,wordCountUDAF(_1)ascountcnt,IntSumUDAF(_2)assumcntFROMdataDFgroupby_1").show()
}
}
packageyunzhi.utils
importorg.apache.spark.sql.Row
importorg.apache.spark.sql.expressions.{MutableAggregationBuffer,UserDefinedAggregateFunction}
importorg.apache.spark.sql.types._
/**
*Createdbylon16-10-13.
*/
classUDAFUtil{
}
classwordCountUDAFextendsUserDefinedAggregateFunction{//ctrl+I实现复写方法
/**
*该方法指定具体输入数据的类型Array类型可以输入多个参数,定义多个StructField,Array格式,由于sql中,可能传入,多个列,在udaf中都是数组.
*@return
*/
overridedefinputSchema:StructType=StructType(Array(StructField("input",StringType,true),StructField("input",StringType,true)))
/**
*在进行聚合操作的时候要处理的数据的结果的类型Array可以定义多个StructFieldArray格式
*@return
*/
overridedefbufferSchema:StructType=StructType(Array(StructField("count",IntegerType,true)))
/**
*指定UDAF函数计算后返回的结果类型
*@return
*/
overridedefdataType:DataType=IntegerType
overridedefdeterministic:Boolean=true
/**
*在Aggregate之前每组数据的初始化结果
*@parambuffer
*/
overridedefinitialize(buffer:MutableAggregationBuffer):Unit={buffer(0)=0}
/**
*在进行聚合的时候有新的值进来,对分组后的聚合如何进行计算
*本地的聚合操作,相当于HadoopMapReduce模型中的Combiner(这里的Row跟DataFrame的Row无关)worker里面先计算
*@parambuffer
*@paraminput
*/
overridedefupdate(buffer:MutableAggregationBuffer,input:Row):Unit={
buffer(0)=buffer.getAs[Int](0)+1
}
/**
*最后在分布式节点进行LocalReduce完成后需要进行全局级别的Merge操作worker之前的计算
*@parambuffer1
*@parambuffer2
*/
overridedefmerge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit={
buffer1(0)=buffer1.getAs[Int](0)+buffer2.getAs[Int](0)
}
/**
*返回UDAF最后的计算结果
*@parambuffer
*@return
*/
overridedefevaluate(buffer:Row):Any=buffer.getAs[Int](0)
}
//和aggrerate函数的参数的行为很类似.initializeupdatemerge......
classIntSumUDAFextendsUserDefinedAggregateFunction{//ctrl+I实现复写方法
/**
*该方法指定具体输入数据的类型
*@return
*/
overridedefinputSchema:StructType=StructType(Array(StructField("input",IntegerType,true)))
/**
*在进行聚合操作的时候要处理的数据的结果的类型
*@return
*/
overridedefbufferSchema:StructType=StructType(Array(StructField("count",IntegerType,true)))
/**
*指定UDAF函数计算后返回的结果类型
*@return
*/
overridedefdataType:DataType=IntegerType
overridedefdeterministic:Boolean=true
/**
*在Aggregate之前每组数据的初始化结果
*@parambuffer
*/
overridedefinitialize(buffer:MutableAggregationBuffer):Unit={buffer(0)=0}
/**
*在进行聚合的时候有新的值进来,对分组后的聚合如何进行计算
*本地的聚合操作,相当于HadoopMapReduce模型中的Combiner(这里的Row跟DataFrame的Row无关)worker里面先计算
*@parambuffer
*@paraminput
*/
overridedefupdate(buffer:MutableAggregationBuffer,input:Row):Unit={
buffer(0)=buffer.getAs[Int](0)+input.getAs[Int](0)
}
/**
*最后在分布式节点进行LocalReduce完成后需要进行全局级别的Merge操作worker之前的计算
*@parambuffer1
*@parambuffer2
*/
overridedefmerge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit={
buffer1(0)=buffer1.getAs[Int](0)+buffer2.getAs[Int](0)
}
/**
*返回UDAF最后的计算结果
*@parambuffer
*@return
*/
overridedefevaluate(buffer:Row):Any=buffer.getAs[Int](0)
}