spark sql自定义函数udf

def visitview(vtimes : Iterable[String]): Long ={
    var times = 0L
    if(vtimes.size == 0){
    }else{
      val lb = scala.collection.mutable.ListBuffer.empty[String]
      for(vtime <- vtimes){
        lb.append(vtime)
      }
      times = 1L
      val list = lb.toList.sorted
      for(i <- 0 to list.size-2){
        if(list(i+1).toLong - list(i).toLong > DEFAULT_VISIT_TIMEOUT){
          times = times + 1
        }
      }
    }
    times
  }
val sparkConf = new SparkConf().setAppName("ChexunHourCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    sqlContext.udf.register("visitview",ConstantUtil.visitview _)
case class Loging(vtime:Long, userIp:Long, muid:String, uref:String, ucp:String)
val df = file.map(_.split("\t")).filter(x=>ConstantUtil.isNotPromote(x(3))).filter(y=>ConstantUtil.isNotPromote(y(2))).map(t => Loging(t(9).toLong,t(8).toLong,t(1),t(2),t(3))).toDF()
df.registerTempTable("loging")
val vvCount = sqlContext.sql("select sum(visitview(vtime)) from loging group by muid  limit 10 ").collect()

    1、自定义函数

    2、注册函数

    3、在spark sql中使用自定义函数

相关推荐