Spark 基础 —— 创建 DataFrame 的三种方式
1.自定义 schema(Rdd[Row] => DataSet[Row])
import org.apache.spark.sql.types._ val peopleRDD = spark.sparkContext.textFile("README.md") val schemaString = "name age" val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) rowRDD.collect().foreach(println) val df = spark.createDataFrame(rowRDD, schema)
2.借助 case class 隐式转换(Rdd[Person] => DataSet[Row])
object DFTest { case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("DataFrame Application"). master("local") .getOrCreate() import spark.implicits._ val peopleRDD = spark.sparkContext.textFile("README.md") val personRDD = peopleRDD .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).toInt)) personRDD.collect().foreach(println) personRDD.toDF().show() } }
3.直接从数据源创建
val df = spark .read .option("header", value = true) .csv("/home/lg/Documents/data/1987.csv")
此外
spark.read.jdbc spark.read.json spark.read.parquet
233
相关推荐
Johnson0 2020-07-28
Oeljeklaus 2020-05-29
登峰小蚁 2020-05-11
Hhanwen 2020-05-04
Oeljeklaus 2020-04-19
Hhanwen 2020-07-26
zhixingheyitian 2020-07-19
yanqianglifei 2020-07-07
Hhanwen 2020-07-05
Hhanwen 2020-06-25
rongwenbin 2020-06-15
sxyhetao 2020-06-12
hovermenu 2020-06-10
Oeljeklaus 2020-06-10
zhixingheyitian 2020-06-08
Johnson0 2020-06-08