DataFrame的理解

DataFrame不是Spark SQL提出,而是在Pandas就有

DataSet:分布式的数据集

DataFrame:以列的形式构成的分布式数据集(RDD with schema)

可以从各种source转换成,如RDD、SQL、noSQL等

做了抽象的处理

DataFrame对比RDD

DataFrame有具体的列信息

运行效率上:

RDD:java/scala => jvm

Python 自己的运行环境

DataFrame:无论哪种语言都是同一个logic plan

DataFrame 的 API:

printschema() 输出一个树形结构

show() 输出内容。括号内可限制输出的条数

Select(COLUMN_NAME) 查询某一列所有的数据

综合应用:

peopleDF.select(peopleDF.col("name"), (peopleDF.col("age") + 5).as("age after 5 years")).show()

查找两列,并对其中一列进行运算后,更改其列名

过滤:

filter()

peopleDF.filter(peopleDF.col("age") > 24).show()

分组:

groupBy()

peopleDF.groupBy("age").count().show()

转成临时视图(进行SQL操作):

createOrReplaceTempView() 即可转成sql API进行操作

DataFrame 与 RDD 的相互操作:

两种

都是要首先导入SparkSession,作为入口

val spark = SparkSession.builder().appName("DataFrameRDD").master("local[2]").getOrCreate()

第一种:反射

代码简洁,前提是需要知道schema的构成

借助case class,在这个类里定义好schema对应的字段

  1. 创建case class,根据schema来写
  2. 生成RDD,借助SparkContext的textFile,获取文件然后转成RDD,String类型
  3. 导入Spark.Implicits._ 隐式转换包
  4. 分割RDD,split方法,分割后变成String数组,并和case class相对应起来(也就是把对应的变量传入class中,记得传入前进行类型转换)
  5. toDF方法生成DataFrame

代码:

//定义case class

case class Info(id: Int, name: String, age: Int) {}

//生成RDD

val rdd = spark.sparkContext.textFile("file:////usr/local/mycode/info.txt")

//切割,分类,转换

val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()

ps:若分隔符是|或者其他,有可能要加上转义字符\\

第二种:直接构建Dataset

不知道schema的条件下使用

先转成Rows,结合StructType,代码量大一点

  1. 生成RDD
  2. 分割RDD,和第一种方法的第4步一样,然后转换成RowsRDD
  3. 定义StructType,用一个数组Array来定义,每个变量的Type用StructField来定义
  4. 用createDataFrame方法关联RDD和StructType

代码:

//生成RDD

val rdd = spark.sparkContext.textFile("file:////usr/local/mycode/info.txt")

//分割,转成rowRDD

val rowRdd = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))

//定义StructType

val structType = StructType(Array(StructField("id", IntegerType,true),

StructField("name", StringType, true),

StructField("age", IntegerType,true)))

//关联rowRDD和StructType

val infoDF = spark.createDataFrame(rowRdd, structType)

DataFrame API详细:

Show方法:

默认只显示前20条,可指定更大

若信息太多,默认截取显示一部分,设置成false的话就不截取了

take方法:

take() 返回前面n行记录

take().foreach 分行显示

first、head方法:

头几行

select方法:

可以选择多列

filter方法:

条件里可以加其他字段,比如说substring,可搜索行值中某几个字符等于指定值的行

studentDF.filter("substr(name, 0, 1) = 'M'").show

sort方法:

有desc排序

studentDF.sort(studentDF.col("name").desc, studentDF.col("id").desc).show

As方法:

studentDF.select(studentDF.col("name").as("studentName")).show

Join方法:

studentDF.join(studentDF2, studentDF.col("id") === studentDF2.col("id”))

判断相等时用三个=号

Dataset:

初次出现在1.6版本 有Spark SQL优化 能使用lambda表达式,但不能用python语言使用Dataset的API

DF = DS[Row]

DS 强类型 typed case class

DF:弱类型 Row

读取csv文件变成DataFrame的方法:

val salesDF = spark.read.option("header", "true").option("inferSchema", "true”).csv(path)

header是指解析头文件,这样能知道列名

inferSchema是获取每一列的属性

DF转DS的方法:

  1. 创建case class
  2. as方法

val salesDS = salesDF.as[Sales]

case class Sales(transactionId: Int, customerId: Int, itemId: Int, amountPaid: Double)

选择某列输出:

salesDS.map(line => line.itemId).show()

SQL、DF、DS的区别

报错的时机不同,DS最敏感,能够更早发现错误,即使列名写错了也会马上发现

(编译时,SQL是命令和列名写错都不会报错;DF命令写错会报错,但列名写错不会报错。前面不报错的情况会在运行时报错)

相关推荐