↓推荐关注↓
一、Spark SQL的概念理解
Spark SQL是spark套件中一个模板,它将数据的计算任务通过SQL的形式转换成了RDD的计算,类似于Hive通过SQL的形式将数据的计算任务转换成了MapReduce。
Spark SQL的特点:
Spark SQL数据抽象:
Spark SQL客户端查询:
DataFrame查询方式
(1)、DSL风格:
需要引入import spark.implicit. _ 这个隐式转换,可以将DataFrame隐式转换成RDD
(2)、SQL风格:
a、需要将DataFrame注册成一张表格,如果通过CreateTempView这种方式来创建,那么该表格Session有效,如果通过CreateGlobalTempView来创建,那么该表格跨Session有效,但是SQL语句访问该表格的时候需要加上前缀global_temp
b、需要通过sparkSession.sql方法来运行你的SQL语句
DataSet查询方式
RDD->DataFrame
#通过反射设置schema,数据集是spark自带的people.txt,路径在下面的代码中
case class Person(name:String,age:Int)
val peopleDF=spark.sparkContext.textFile("file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt").map(_.split(",")).map(para=>Person(para(0).trim,para(1).trim.toInt)).toDF
peopleDF.show
#注册成一张临时表
peopleDF.createOrReplaceTempView("persons")
val teen=spark.sql("select name,age from persons where age between 13 and 29")
teen.show
这时teen是一张表,每一行是一个row对象,如果需要访问Row对象中的每一个元素,可以通过下标 row(0);你也可以通过列名 row.getAs[String]("name")
也可以使用getAs方法:
3、通过编程的方式来设置schema,适用于编译器不能确定列的情况
val peopleRDD=spark.sparkContext.textFile("file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt")
val schemaString="name age"
val filed=schemaString.split(" ").map(filename=> org.apache.spark.sql.types.StructField(filename,org.apache.spark.sql.types.StringType,nullable = true))
val schema=org.apache.spark.sql.types.StructType(filed)
peopleRDD.map(_.split(",")).map(para=>org.apache.spark.sql.Row(para(0).trim,para(1).trim))
val peopleDF=spark.createDataFrame(res6,schema)
peopleDF.show
DataFrame->RDD
dataFrame.rdd
RDD->DataSet
rdd.map(para=> Person(para(0).trim(),para(1).trim().toInt)).toDS
DataSet->DataSet
dataSet.rdd
DataFrame -> DataSet
dataFrame.to[Person]
DataSet -> DataFrame
dataSet.toDF
用户自定义UDF函数
通过spark.udf功能用户可以自定义函数
自定义udf函数:
用户自定义聚合函数
1. 弱类型用户自定义聚合函数
//聚合函数需要输入参数的数据类型
override def inputSchema: StructType = ???
//可以理解为保存聚合函数业务逻辑数据的一个数据结构
override def bufferSchema: StructType = ???
// 返回值的数据类型
override def dataType: DataType = ???
// 对于相同的输入一直有相同的输出
override def deterministic: Boolean = true
//用于初始化你的数据结构
override def initialize(buffer: MutableAggregationBuffer): Unit = ???
//用于同分区内Row对聚合函数的更新操作
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???
//用于不同分区对聚合结果的聚合。
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???
//计算最终结果
override def evaluate(buffer: Row): Any = ???
2、强类型用户自定义聚合函数
//用于定义一个聚合函数内部需要的数据结构
override def zero: Average = ???
//针对每个分区内部每一个输入来更新你的数据结构
override def reduce(b: Average, a: Employee): Average = ???
//用于对于不同分区的结构进行聚合
override def merge(b1: Average, b2: Average): Average = ???
//计算输出
override def finish(reduction: Average): Double = ???
//用于数据结构他的转换
override def bufferEncoder: Encoder[Average] = ???
//用于最终结果的转换
override def outputEncoder: Encoder[Double] = ???
1、内置Hive
2、外部Hive(这里主要使用这个方法)
这就是hive里面的表
1、输入
对于Spark SQL的输入需要使用sparkSession.read方法
2、输出
对于Spark SQL的输出需要使用 sparkSession.write方法
通用模式 dataFrame.write.format("json").save("path") 支持类型:parquet、json、text、csv、orc
专业模式 dataFrame.write.csv("path") 直接指定类型
如果你使用通用模式,spark默认parquet是默认格式、sparkSession.read.load 加载的默认是parquet格式dataFrame.write.save也是默认保存成parquet格式。
如果需要保存成一个text文件,那么需要dataFrame里面只有一列(只需要一列即可)。
1、数据说明
这里有三个数据集,合起来大概有几十万条数据,是关于货品交易的数据集。
2、任务
这里有三个需求:
3、步骤
1. 加载数据
tbStock.txt
#代码
case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable
val tbStockRdd=spark.sparkContext.textFile("file:///root/dataset/tbStock.txt")
val tbStockDS=tbStockRdd.map(_.split(",")).map(attr=>tbStock(attr(0),attr(1),attr(2))).toDS
tbStockDS.show()
tbStockDetail.txt
case class tbStockDetail(ordernumber:String,rownum:Int,itemid:String,number:Int,price:Double,amount:Double) extends Serializable
val tbStockDetailRdd=spark.sparkContext.textFile("file:///root/dataset/tbStockDetail.txt")
val tbStockDetailDS=tbStockDetailRdd.map(_.split(",")).map(attr=>tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble,attr(5).trim().toDouble)).toDS
tbStockDetailDS.show()
tbDate.txt
case class tbDate(dateid:String,years:Int,theyear:Int,month:Int,day:Int,weekday:Int,week:Int,quarter:Int,period:Int,halfmonth:Int) extends Serializable
val tbDateRdd=spark.sparkContext.textFile("file:///root/dataset/tbDate.txt")
val tbDateDS=tbDateRdd.map(_.split(",")).map(attr=>tbDate(attr(0),attr(1).trim().toInt,attr(2).trim().toInt,attr(3).trim().toInt,attr(4).trim().toInt,attr(5).trim().toInt,attr(6).trim().toInt,attr(7).trim().toInt,attr(8).trim().toInt,attr(9).trim().toInt)).toDS
tbDateDS.show()
2. 注册表
tbStockDS.createOrReplaceTempView("tbStock")
tbDateDS.createOrReplaceTempView("tbDate")
tbStockDetailDS.createOrReplaceTempView("tbStockDetail")
3. 解析表
#sql语句
select c.theyear,count(distinct a.ordernumber),sum(b.amount)
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear
order by c.theyear
a、先统计每年每个订单的销售额
select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
group by a.dateid,a.ordernumber
b、计算最大金额订单的销售额
select d.theyear,c.SumOfAmount as SumOfAmount
from
(select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
group by a.dateid,a.ordernumber) c
join tbDate d on c.dateid=d.dateid
group by d.theyear
order by theyear desc
a、求出每年每个货品的销售额
select c.theyear,b.itemid,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid
b、在a的基础上,统计每年单个货品的最大金额
select d.theyear,max(d.SumOfAmount) as MaxOfAmount
from
(select c.theyear,b.itemid,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) d
group by theyear
c、用最大销售额和统计好的每个货品的销售额join,以及用年join,集合得到最畅销货品那一行信息
select distinct e.theyear,e.itemid,f.maxofamount
from
(select c.theyear,b.itemid,sum(b.amount) as sumofamount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) e
join
(select d.theyear,max(d.sumofamount) as maxofamount
from
(select c.theyear,b.itemid,sum(b.amount) as sumofamount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) d
group by d.theyear) f on e.theyear=f.theyear
and e.sumofamount=f.maxofamount order by e.theyear
转自:大数据真好玩