spark dataFrame 相关知识点
相关函数:
操作DataFrame 1.data.printSchema:打印出每一列的名称和类型 2.data.show(numRows:Int):输出data 3.Data.head(n:int).foreach(println): 这个函数也需要输入一个参数标明需要采样的行数,而且这个函数返回的是Row数组,我们需要遍历打印。 4.Select函数可以帮助我们从DataFrame中选择需要的列,并且返回一个全新的DataFrame:(可以选择多列) 如:val age: DataFrame=data.select(“age”) 5.根据条件过滤数据: data.filter(“id>2”).show(2) data.fileter(“NAME=’’”).show(2) data.fileter(“NAME=’’ OR NAME=’NULL’”).show(2)(多条件过滤) 6.排序 data.sort(data(“Name”).desc).show(7) data.sort(“NAME”, “id”).show(10)
示例代码:
//利用表格型数据结构DataFrame存储数据,data1为数据算子val df = spark.createDataFrame(data1).toDF("日期", "经度","纬度","温度","湿度", "大气压值")df2.show//输出dataframe df2.head(1).foreach(println)//输出前一行数据 var df2 = df1.orderBy(-df1("温度差"))//根据温度差进行降序排列 var df3 = df1.orderBy(df1("温度差"))//根据温度差进行升序排列
示例数据cndctest.txt:
0169501360999992018010100004+52970+122530FM-12+043399999V0201401N00101026001C9004700199-02041-02321102941ADDAA124001531AJ100003100000099GA1081+026001101GA2999+999999101GA3999+999999101GE19MSL +99999+99999GF108991081999026001999999MA1999999097051MD1210061-0101REMSYN004BUFR 0165501360999992018010103004+52970+122530FM-12+043399999V0201101N0010122000199004900199-01651-02051102921ADDAJ100079100070099AY171031AY201031GA1021+026001101GA2999+999999101GA3999+999999101GE19MSL +99999+99999GF102991021999026001999999MD1210021+9999MW1001REMSYN004BUFR 0187501360999992018010106004+52970+122530FM-12+043399999V0202701N00101026001C9027000199-01271-01781102841ADDAA106000131AY171061AY201061GA1081+026001101GA2999+999999101GA3999+999999101GE19MSL +99999+99999GF108991081999026001999999KA1240N-02051MA1999999097021MD1710051+9999MW1001REMSYN004BUFR 0148501360999992018010109004+52970+122530FM-12+043399999V0202701N00101026001C9030000199-01341-01691102901ADDAA106999999AA224999999GA1081+026001101GA2999+999999101GA3999+999999101GE19MSL +99999+99999GF108991081999026001999999MD1210061+9999REMSYN004BUFR
完整运行代码:
import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.LongAccumulator object RDD_Operator { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("WeatherTest").master("local").getOrCreate() val sc = spark.sparkContext val data: RDD[String] = sc.textFile("E:\\cndctest.txt") //数据文件cndctest.txt // 对数据进行处理 //处理数据,只保留日期与温度这两项数据 val data_temp = data.filter(line => { var airTemperature = line.substring(89, 92).toInt airTemperature != 9999 }) .map(line => { val year = line.substring(15, 23) var airTemperature = line.substring(89, 92).toInt (year,airTemperature) } ) //对数据进行处理,数据形式为(日期,温度差) var group = data_temp.groupByKey() var group1= group.map(t=>(t._1,t._2.max-t._2.min))//聚合 //利用表格型数据结构DataFrame存储数据 val df1 = spark.createDataFrame(group1).toDF("日期","温度差") var df2 = df1.orderBy(-df1("温度差")) df2.show println("-------------------输出前一行---------------------") df2.head(1).foreach(println) } // 屏蔽不必要的日志显示终端上 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) }