DataFrame的理解
DataFrame不是Spark SQL提出,而是在Pandas就有
DataSet:分布式的数据集
DataFrame:以列的形式构成的分布式数据集(RDD with schema)
可以从各种source转换成,如RDD、SQL、noSQL等
做了抽象的处理
DataFrame对比RDD
DataFrame有具体的列信息
运行效率上:
RDD:java/scala => jvm
Python 自己的运行环境
DataFrame:无论哪种语言都是同一个logic plan
DataFrame 的 API:
printschema() 输出一个树形结构
show() 输出内容。括号内可限制输出的条数
Select(COLUMN_NAME) 查询某一列所有的数据
综合应用:
peopleDF.select(peopleDF.col("name"), (peopleDF.col("age") + 5).as("age after 5 years")).show()
查找两列,并对其中一列进行运算后,更改其列名
过滤:
filter()
peopleDF.filter(peopleDF.col("age") > 24).show()
分组:
groupBy()
peopleDF.groupBy("age").count().show()
转成临时视图(进行SQL操作):
createOrReplaceTempView() 即可转成sql API进行操作
DataFrame 与 RDD 的相互操作:
两种
都是要首先导入SparkSession,作为入口
val spark = SparkSession.builder().appName("DataFrameRDD").master("local[2]").getOrCreate()
第一种:反射
代码简洁,前提是需要知道schema的构成
借助case class,在这个类里定义好schema对应的字段
- 创建case class,根据schema来写
- 生成RDD,借助SparkContext的textFile,获取文件然后转成RDD,String类型
- 导入Spark.Implicits._ 隐式转换包
- 分割RDD,split方法,分割后变成String数组,并和case class相对应起来(也就是把对应的变量传入class中,记得传入前进行类型转换)
- toDF方法生成DataFrame
代码:
//定义case class
case class Info(id: Int, name: String, age: Int) {}
//生成RDD
val rdd = spark.sparkContext.textFile("file:////usr/local/mycode/info.txt")
//切割,分类,转换
val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()
ps:若分隔符是|或者其他,有可能要加上转义字符\\
第二种:直接构建Dataset
不知道schema的条件下使用
先转成Rows,结合StructType,代码量大一点
- 生成RDD
- 分割RDD,和第一种方法的第4步一样,然后转换成RowsRDD
- 定义StructType,用一个数组Array来定义,每个变量的Type用StructField来定义
- 用createDataFrame方法关联RDD和StructType
代码:
//生成RDD
val rdd = spark.sparkContext.textFile("file:////usr/local/mycode/info.txt")
//分割,转成rowRDD
val rowRdd = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))
//定义StructType
val structType = StructType(Array(StructField("id", IntegerType,true),
StructField("name", StringType, true),
StructField("age", IntegerType,true)))
//关联rowRDD和StructType
val infoDF = spark.createDataFrame(rowRdd, structType)
DataFrame API详细:
Show方法:
默认只显示前20条,可指定更大
若信息太多,默认截取显示一部分,设置成false的话就不截取了
take方法:
take() 返回前面n行记录
take().foreach 分行显示
first、head方法:
头几行
select方法:
可以选择多列
filter方法:
条件里可以加其他字段,比如说substring,可搜索行值中某几个字符等于指定值的行
studentDF.filter("substr(name, 0, 1) = 'M'").show
sort方法:
有desc排序
studentDF.sort(studentDF.col("name").desc, studentDF.col("id").desc).show
As方法:
studentDF.select(studentDF.col("name").as("studentName")).show
Join方法:
studentDF.join(studentDF2, studentDF.col("id") === studentDF2.col("id”))
判断相等时用三个=号
Dataset:
初次出现在1.6版本 有Spark SQL优化 能使用lambda表达式,但不能用python语言使用Dataset的API
DF = DS[Row]
DS 强类型 typed case class
DF:弱类型 Row
读取csv文件变成DataFrame的方法:
val salesDF = spark.read.option("header", "true").option("inferSchema", "true”).csv(path)
header是指解析头文件,这样能知道列名
inferSchema是获取每一列的属性
DF转DS的方法:
- 创建case class
- as方法
val salesDS = salesDF.as[Sales]
case class Sales(transactionId: Int, customerId: Int, itemId: Int, amountPaid: Double)
选择某列输出:
salesDS.map(line => line.itemId).show()
SQL、DF、DS的区别
报错的时机不同,DS最敏感,能够更早发现错误,即使列名写错了也会马上发现
(编译时,SQL是命令和列名写错都不会报错;DF命令写错会报错,但列名写错不会报错。前面不报错的情况会在运行时报错)