spark dataFrame 相关知识点

相关函数:

操作DataFrame
1.data.printSchema:打印出每一列的名称和类型
2.data.show(numRows:Int):输出data
3.Data.head(n:int).foreach(println): 这个函数也需要输入一个参数标明需要采样的行数,而且这个函数返回的是Row数组,我们需要遍历打印。
4.Select函数可以帮助我们从DataFrame中选择需要的列,并且返回一个全新的DataFrame:(可以选择多列)
如:val age: DataFrame=data.select(“age”)
5.根据条件过滤数据:
data.filter(“id>2”).show(2)
data.fileter(“NAME=’’”).show(2)
data.fileter(“NAME=’’ OR NAME=’NULL’”).show(2)(多条件过滤)
6.排序
data.sort(data(“Name”).desc).show(7) data.sort(“NAME”, “id”).show(10)

示例代码:

//利用表格型数据结构DataFrame存储数据,data1为数据算子val df = spark.createDataFrame(data1).toDF("日期", "经度","纬度","温度","湿度", "大气压值")df2.show//输出dataframe
df2.head(1).foreach(println)//输出前一行数据
var df2 = df1.orderBy(-df1("温度差"))//根据温度差进行降序排列
var df3 = df1.orderBy(df1("温度差"))//根据温度差进行升序排列

示例数据cndctest.txt

0169501360999992018010100004+52970+122530FM-12+043399999V0201401N00101026001C9004700199-02041-02321102941ADDAA124001531AJ100003100000099GA1081+026001101GA2999+999999101GA3999+999999101GE19MSL   +99999+99999GF108991081999026001999999MA1999999097051MD1210061-0101REMSYN004BUFR
0165501360999992018010103004+52970+122530FM-12+043399999V0201101N0010122000199004900199-01651-02051102921ADDAJ100079100070099AY171031AY201031GA1021+026001101GA2999+999999101GA3999+999999101GE19MSL   +99999+99999GF102991021999026001999999MD1210021+9999MW1001REMSYN004BUFR
0187501360999992018010106004+52970+122530FM-12+043399999V0202701N00101026001C9027000199-01271-01781102841ADDAA106000131AY171061AY201061GA1081+026001101GA2999+999999101GA3999+999999101GE19MSL   +99999+99999GF108991081999026001999999KA1240N-02051MA1999999097021MD1710051+9999MW1001REMSYN004BUFR
0148501360999992018010109004+52970+122530FM-12+043399999V0202701N00101026001C9030000199-01341-01691102901ADDAA106999999AA224999999GA1081+026001101GA2999+999999101GA3999+999999101GE19MSL   +99999+99999GF108991081999026001999999MD1210061+9999REMSYN004BUFR

完整运行代码:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.LongAccumulator

object RDD_Operator {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("WeatherTest").master("local").getOrCreate()
    val sc = spark.sparkContext

    val data: RDD[String] = sc.textFile("E:\\cndctest.txt")  //数据文件cndctest.txt
    // 对数据进行处理
    //处理数据,只保留日期与温度这两项数据
    val data_temp = data.filter(line => {
      var airTemperature = line.substring(89, 92).toInt
      airTemperature != 9999
    })
      .map(line => {
        val year = line.substring(15, 23)
        var airTemperature = line.substring(89, 92).toInt
        (year,airTemperature)
      }
      )
    //对数据进行处理,数据形式为(日期,温度差)
    var group = data_temp.groupByKey()
    var group1= group.map(t=>(t._1,t._2.max-t._2.min))//聚合

    //利用表格型数据结构DataFrame存储数据
    val df1 = spark.createDataFrame(group1).toDF("日期","温度差")
    var df2 = df1.orderBy(-df1("温度差"))
    df2.show
    println("-------------------输出前一行---------------------")
    df2.head(1).foreach(println)
  }
  // 屏蔽不必要的日志显示终端上
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
}