2020-11-11--Spark编程基础(Scala版)第6章 Spark SQL

第6章 Spark SQL

  • 6.1 Spark SQL简介
    • 6.1.1 从Shark说起
    • Shark的设计导致了两个问题
    • 6.1.2 Spark SQL设计
    • 6.1.3 为什么推出Spark SQL
  • 6.2 DataFrame概述
  • 6.3 DataFrame的创建
  • 6.4 DataFrame的保存
  • 6.5 DataFrame的常用操作
  • 6.6 从RDD转换得到DataFrame
    • 6.6.1 利用反射机制推断RDD模式
    • 6.6.2 使用编程方式定义RDD模式

6.1 Spark SQL简介

6.1.1 从Shark说起



Shark即Hive on Spark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作
Shark的出现,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高

Shark的设计导致了两个问题

一是执行计划优化完全依赖于Hive,不方便添加新的优化策略
二是因为Spark是线程级并行,而MapReduce是进程级并行,因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支
2014年6月1日Shark项目和Spark SQL项目的主持人Reynold Xin宣布:停止对Shark的开发,团队将所有资源放在Spark SQL项目上,至此,Shark的发展画上了句话,但也因此发展出两个直线:Spark SQL和Hive on Spark

Spark SQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive
Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎

6.1.2 Spark SQL设计

Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责

Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据
Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范


6.1.3 为什么推出Spark SQL

关系数据库已经很流行
关系数据库在大数据时代已经不能满足要求
首先,用户需要从不同数据源执行各种操作,包括结构化、半结构化和非结构化数据
其次,用户需要执行高级分析,比如机器学习和图像处理
在实际大数据应用中,经常需要融合关系查询和复杂分析算法(比如机器学习或图像处理),但是,缺少这样的系统
Spark SQL填补了这个鸿沟:
首先,可以提供DataFrame API,可以对内部和外部各种数据源执行各种关系型操作
其次,可以支持大数据中的大量数据源和数据分析算法
Spark SQL可以融合:传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力

6.2 DataFrame概述

DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能
Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询

图 DataFrame与RDD的区别
RDD是分布式的 Java对象的集合,但是,对象内部结构对于RDD而言却是不可知的
DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息

6.3 DataFrame的创建

从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能
SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持
可以通过如下语句创建一个SparkSession对象:

// An highlighted block
scala> import org.apache.spark.sql.SparkSession
scala> val spark=SparkSession.builder().getOrCreate()

在创建DataFrame之前,为了支持RDD转换为DataFrame及后续的SQL操作,需要通过import语句(即import spark.implicits._)导入相应的包,启用隐式转换。
在创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,例如:
spark.read.json(“people.json”):读取people.json文件创建DataFrame;在读取本地文件或HDFS文件时,要注意给出正确的文件路径;
spark.read.parquet(“people.parquet”):读取people.parquet文件创建DataFrame;
spark.read.csv(“people.csv”):读取people.csv文件创建DataFrame。
一个实例
在“/usr/local/spark/examples/src/main/resources/”这个目录下,这个目录下有两个样例数据people.json和people.txt。people.json文件的内容如下:
{“name”:“Michael”}
{“name”:“Andy”, “age”:30}
{“name”:“Justin”, “age”:19}
people.txt文件的内容如下
Michael, 29
Andy, 30
Justin, 19

// An highlighted block
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSessionscala> val spark=SparkSession.builder().getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2bdab835//使支持RDDs转换为DataFrames及后续sql操作
scala> import spark.implicits._
import spark.implicits._scala> val df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

6.4 DataFrame的保存

可以使用spark.write操作,把一个DataFrame保存成不同格式的文件,例如,把一个名称为df的DataFrame保存到不同格式文件中,方法如下:
df.write.json("people.json“)
df.write.parquet("people.parquet“)
df.write.csv(“people.csv”)
下面从示例文件people.json中创建一个DataFrame,然后保存成csv格式文件,代码如下:

// An highlighted block
scala> val peopleDF = spark.read.format("json").
| load("file:///usr/local/spark/examples/src/main/resources/people.json")
scala> peopleDF.select("name", "age").write.format("csv").
| save("file:///usr/local/spark/mycode/sql/newpeople.csv")

6.5 DataFrame的常用操作

可以执行一些常用的DataFrame操作



6.6 从RDD转换得到DataFrame

6.6.1 利用反射机制推断RDD模式

“/usr/local/spark/examples/src/main/resources/”目录下,有个Spark安装时自带的样例数据people.txt,其内容如下
Michael, 29
Andy, 30
Justin, 19
现在要把people.txt加载到内存中生成一个DataFrame,并查询其中的数据
在利用反射机制推断RDD模式时,需要首先定义一个case class,因为,只有case class才能被Spark隐式地转换为DataFrame

// An highlighted block
scala> import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder 
scala> import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoder 
scala> import spark.implicits._  //导入包,支持把一个RDD隐式转换为一个DataFrame
import spark.implicits._
scala> case class Person(name: String, age: Long)  //定义一个case class
defined class Person
scala> val peopleDF = spark.sparkContext.
| textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").
| map(_.split(",")).
| map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint] 
scala> peopleDF.createOrReplaceTempView("people") //必须注册为临时表才能供下面的查询使用
scala> val personsRDD = spark.sql("select name,age from people where age > 20")
//最终生成一个DataFrame,下面是系统执行返回的信息
personsRDD: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> personsRDD.map(t => "Name: "+t(0)+ ","+"Age: "+t(1)).show()  //DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值
//下面是系统执行返回的信息
+------------------+ 
| value|
+------------------+
|Name:Michael,Age:29|
| Name:Andy,Age:30|
+------------------+

6.6.2 使用编程方式定义RDD模式

当无法提前定义case class时,就需要采用编程方式定义RDD模式。
比如,现在需要通过编程方式把people.txt加载进来生成DataFrame,并完成SQL查询。

通过编程方式定义RDD模式的实现过程

// An highlighted block
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
//生成字段
scala> val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true))
fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,IntegerType,true))
scala> val schema = StructType(fields)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age, IntegerType,true))
//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
//shcema就是“表头”
//下面加载文件生成RDD
scala> val peopleRDD = spark.sparkContext.
| textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:26 
//对peopleRDD 这个RDD中的每一行元素都进行解析
scala> val rowRDD = peopleRDD.map(_.split(",")).
|  map(attributes => Row(attributes(0), attributes(1).trim.toInt))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:29
//上面得到的rowRDD就是“表中的记录”
//下面把“表头”和“表中的记录”拼装起来
scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
//必须注册为临时表才能供下面查询使用
scala> peopleDF.createOrReplaceTempView("people")
scala> val results = spark.sql("SELECT name,age FROM people")
results: org.apache.spark.sql.DataFrame = [name: string, age: int] 
scala> results.
|  map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).
|  show()
+--------------------+
| value|
+--------------------+
|name: Michael,age:29|
| name: Andy,age:30|
| name: Justin,age:19|