sparkSQL学习
参考:
spark官方中文文档
1、综述1.1版本
SparkSQL允许在Spark中执行使用SQL,HiveQL或Scala表示的关系型查询。核心组件为一个新类型的RDD--SchemaRDD。SchemaRDDs由行对象以及用来描述每行中各列数据类型的模式组成。每个SchemaRDD类似于关系型数据库中的一个表。SchemaRDD的创建可以来自于已存在的RDD或Parquet文件,或JSON数据集或运行HiveQL而不将结果存于Hive。
---------------------------
参考:spark1.3.1版本
重点掌握啊:
http://www.infoq.com/cn/articles/apache-spark-sql
通过SparkSQL,可以针对不同格式的数据执行ETL操作(如JSON,Parquet,数据库)然后完成特定的查询操作。
许多新的功能特性:
数据框架(DataFrame):Spark新版本中提供了可以作为分布式SQL查询引擎的程序化抽象DataFrame。
数据源(DataSources):随着数据源API的增加,SparkSQL可以便捷地处理以多种不同格式存储的结构化数据,如Parquet,JSON以及ApacheAvro库。
JDBC服务器(JDBCServer):内置的JDBC服务器可以便捷地连接到存储在关系型数据库表中的结构化数据并利用传统的商业智能(BI)工具进行大数据分析。
SparkSQL组件
使用SparkSQL时,最主要的两个组件就是DataFrame和SQLContext。
DataFrame
DataFrame是一个分布式的,按照命名列的形式组织的数据集合。DataFrame基于R语言中的dataframe概念,与关系型数据库中的数据库表类似。
之前版本的SparkSQLAPI中的SchemaRDD已经更名为DataFrame。
通过调用将DataFrame的内容作为行RDD(RDDofRows)返回的rdd方法,可以将DataFrame转换成RDD。
可以通过如下数据源创建DataFrame:
已有的RDD
结构化数据文件
JSON数据集
Hive表
外部数据库
SQLContext
SparkSQL提供SQLContext封装Spark中的所有关系型功能。可以用之前的示例中的现有SparkContext创建SQLContext。下述代码片段展示了如何创建一个SQLContext对象。
valsqlContext=neworg.apache.spark.sql.SQLContext(sc)
此外,SparkSQL中的HiveContext可以提供SQLContext所提供功能的超集。可以在用HiveQL解析器编写查询语句以及从Hive表中读取数据时使用。
在Spark程序中使用HiveContext无需既有的Hive环境。
JDBC数据源
JDBC数据源可用于通过JDBCAPI读取关系型数据库中的数据。相比于使用JdbcRDD,应该将JDBC数据源的方式作为首选,因为JDBC数据源能够将结果作为DataFrame对象返回,直接用SparkSQL处理或与其他数据源连接。
SparkSQL示例应用
为了确保SparkShell程序有足够的内存,可以在运行spark-shell命令时,加入driver-memory命令行参数,如下所示:
spark-shell.cmd--driver-memory1G
SparkSQL应用
/ 首先用已有的Spark Context对象创建SQLContext对象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 导入语句,可以隐式地将RDD转化成DataFrame import sqlContext.implicits._ // 创建一个表示客户的自定义类 case class Customer(customer_id: Int, name: String, city: String, state: String, zip_code: String) // 用数据集文本文件创建一个Customer对象的DataFrame val dfCustomers = sc.textFile("data/customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt, p(1), p(2), p(3), p(4))).toDF() // 将DataFrame注册为一个表 dfCustomers.registerTempTable("customers") // 显示DataFrame的内容 dfCustomers.show() // 打印DF模式 dfCustomers.printSchema() // 选择客户名称列 dfCustomers.select("name").show() // 选择客户名称和城市列 dfCustomers.select("name", "city").show() // 根据id选择客户 dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show() // 根据邮政编码统计客户数量 dfCustomers.groupBy("zip_code").count().show()
在上一示例中,模式是通过反射而得来的。
也可以通过编程的方式指定数据集的模式。这种方法在由于数据的结构以字符串的形式编码而无法提前定义定制类的情况下非常实用。
如下代码示例展示了如何使用新的数据类型类StructType,StringType和StructField指定模式。
// // 用编程的方式指定模式 // // 用已有的Spark Context对象创建SQLContext对象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 创建RDD对象 val rddCustomers = sc.textFile("data/customers.txt") // 用字符串编码模式 val schemaString = "customer_id name city state zip_code" // 导入Spark SQL数据类型和Row import org.apache.spark.sql._ import org.apache.spark.sql.types._; // 用模式字符串生成模式对象 val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // 将RDD(rddCustomers)记录转化成Row。 val rowRDD = rddCustomers.map(_.split(",")).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4))) // 将模式应用于RDD对象。 val dfCustomers = sqlContext.createDataFrame(rowRDD, schema) // 将DataFrame注册为表 dfCustomers.registerTempTable("customers") // 用sqlContext对象提供的sql方法执行SQL语句。 val custNames = sqlContext.sql("SELECT name FROM customers") // SQL查询的返回结果为DataFrame对象,支持所有通用的RDD操作。 // 可以按照顺序访问结果行的各个列。 custNames.map(t => "Name: " + t(0)).collect().foreach(println) // 用sqlContext对象提供的sql方法执行SQL语句。 val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code") // SQL查询的返回结果为DataFrame对象,支持所有通用的RDD操作。 // 可以按照顺序访问结果行的各个列。 customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)
----------------------------------------------
sparksql中遇到的问题:
http://www.cnblogs.com/shishanyuan/p/4723604.html?utm_source=tuicool
http://www.aboutyun.com/forum.php?mod=viewthread&tid=12358&page=1
http://dataunion.org/13433.html
http://www.csdn.net/article/2015-07-10/2825184
https://spark.apache.org/docs/latest/sql-programming-guide.html
SparkSQL:notypetagavailableforxxxx
caseclass类要定义在Object类的上面
如果cassclass类放在了Object类里面,就会报标题的异常
------------------------------------------------
参考:Spark-1.3.1与Hive整合实现查询分析
http://shiyanjun.cn/archives/1113.html