快速入门
什么是 SparkSQL
Spark SQL is Spark’s module for working with structured data.
限定:结构化数据处理
SparkSQL 的特点
- 融合性:SQL 可以无缝集成在代码中,随时用 SQL 处理数据
- 统一数据访问:一套标准 API 可读写不同数据源
- Hive 兼容:可以使用 SparkSQL 直接计算并生成 Hive 数据表
- 标准化连接:支持标准化 JDBC\ODBC 连接,方便和各种数据库进行数据交互
SparkSQL 概述
SparkSQL 和 Hive 的异同
SparkSQL 和 Hive 都是”分布式 SQL 计算引擎“
SparkSQL 的数据抽象
Pandas - DataFrame | SparkCore - RDD | SparkSQL - DataFrame |
---|---|---|
二维表数据结构 | 不限定标准数据结构 | 二维表数据结构 |
本地集合 | 分布式集合 | 分布式集合 |
SparkSQL 其实有3类数据抽象对象:
- SchemaRDD 对象(已废弃):引入 Schema 的 RDD
- Dataset 对象:加入泛型特性,可用于 Java、Scala 语言
- DataFrame 对象:可用于 Java、Scala、Python、R
DataFrame 概述
DataFrame v.s. RDD:
DataFrame | RDD | |
---|---|---|
存储能力 | 只能存储二维表结构数据 | 存储任意结构的数据 |
示例 |
SparkSession 对象
在 Spark 2.0 后,推出了 SparkSession
对象,作为 Spark 编程的统一入口对象。
SparkSession 对象可以:
- 用于 SparkSQL 编程作为入口对象
- 用于 SparkCore 编程,可以通过 SparkSession 对象中获取到 SparkContext
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.\
appName('myApp').\
master('local').\
config(...).\
getOrCreate()
DataFrame 入门
DataFrame 的组成
DataFrame 是一个二维表结构,其表格结构(表、行、列)的组成如下:
在结构层面:
- StructType 对象描述整个 DataFrame 的表结构
- StructField 对象描述一个列的信息
在数据层面:
- Row 对象记录一行数据
- Column 对象记录一列数据并包含列的信息
StructType 的构建方法如下:
struct_type = StructType().\
add("id", IntegerType(), False).\
add("subject", StringType(), True).\
add("age", IntegerType(), False)
- StructField 包含:列名、列类型、是否可为空
- 一个 StructType 对象由多个 StructField 组成
基于 RDD 构建
DataFrame 对象可以从 RDD 转换而来
使用列表指定列名,列类型推断得到
# coding:utf8
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("myApp").\
master("local").\
getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("./data/input/sql/people.txt").\
map(lambda x: x.split(",")).\
map(lambda x: (x[0], int(x[1])))
df = spark.createDataFrame(rdd, schema=['name','age'])
df.printSchema()
df.show()
使用 StructType 对象定义表结构
schema = StructType().add("name", StringType(), False).\
add("age", IntegerType(), True)
df = spark.createDataFrame(rdd, schema=schema)
使用 rdd.toDF() 方法
df = rdd.toDF(["name", "age"])
df = rdd.toDF(schema=schema)
基于 Pandas 的 DataFrame
df = spark.createDataFrame(pandas_df, schema=...)
读取外部数据
通过 SparkSQL 的统一 API 进行数据读取构建 DataFrame
spark.read.format("text|csv|json|parquet|orc|avro|jdbc|...")
.option("Key", "Value") # 可选
.schema(StructType|String)
.load("/path/to/file") # 支持本地文件和HDFS
- format(“text”):只有一个列,列明默认为 value
- format(“json”):一般不用指明 schema,json 自带列名和列类型(字符串和数字)
- format(“csc”)
- format(“parquet”):一种列存储文件格式,内置 schema,经过序列化
- …
DataFrame 的入门操作
DataFrame 支持两种编程风格:
- DSL 风格
- SQL 风格
DSL 风格
- show:展示 DataFrame 中的数据,默认展示20条
- printSchema:打印输出 df 的 schema 信息
- select:选择 DataFrame 中的指定列
- where(filter):过滤 DataFrame 内的数据,返回一个过滤后的 DataFrame
- groupBy:按照指定的列进行数据分组,返回值是
GroupedData
对象
SQL 风格
如果想要使用 SQL 风格的语法,需要将 DataFrame 注册成表,采用如下的方式之一:
- df.createTempView(“table_name”) # 注册一个临时视图
- df.createOrReplace(“table_name”) # 注册一个临时试图,如果存在则进行替换
- df.createGlobalTempView(“table_name”) # 注册一个全局视图
- df.createOrReplaceGlobalTempView(“table_name”) # 注册一个全局视图,如果存在则进行替换
临时表和全局表
- 临时表:只在当前 SparkSession 中可用
- 全局表:跨 SparkSession 对象使用,在一个程序内的多个 SparkSession 中均可调用,查询前带上前缀
global_temp
表注册完成后,可以通过以下语法进行查询:
spark.sql("""
sql_query
""").show()
pyspark.sql.functions 包
PySpark 提供了一个包:pyspark.sql.functions
包内提供了一系列的计算函数,供 SparkSQL 使用
WordCount 案例练习
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("myApp").\
master("local").\
getOrCreate()
sc = spark.sparkContext
# SQL
rdd = sc.textFile("./data/input/words.txt").\
flatMap(lambda x: x.split(" ")).\
map(lambda x: [x])
df = rdd.toDF(["word"])
df.createOrReplaceTempView("words")
spark.sql("""
select word, count(*)
from words
group by word
""").show()
# DSL
df = spark.read.format("text").\
schema("word string").\
load("./data/input/words.txt")
df.select(explode(split("word", " ")).alias("word")).\
groupBy("word").\
count().\
show()
电影评分数据分析案例
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import avg, count
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("myApp").\
master("local").\
getOrCreate()
sc = spark.sparkContext
schema = StructType().add("user_id", StringType(), False).\
add("movie_id", StringType(), False).\
add("score", IntegerType(), False).\
add("ts", StringType(), True)
df = spark.read.format("csv").\
option("sep", "\t").\
option("header", False).\
option("encoding", "utf-8").\
schema(schema).\
load("./data/input/sql/u.data")
# 1. 求用户平均分
df.groupBy("user_id").\
avg("score").\
show()
# 2. 求电影平均分
df.groupBy("movie_id").\
avg("score").\
show()
# 3. 大于平均分的评分的数量
score_avg = df.select(avg("score")).first()[0]
print(df.where(f"score > {score_avg}").count())
# 4. 找出打分次数最多的用户,并计算它的打分平均分
max_user_id = df.groupBy("user_id").\
count().\
orderBy("count", ascending=False).\
first()["user_id"]
# print(max_user_id)
print(df.where(f"user_id={max_user_id}").select(avg("score")).first()[0])
# 5. 查询评分次数超过100次的电影,按平均分展示TOP10
df.groupBy("movie_id").\
agg(
count("score").alias("cnt"),
avg("score").alias("score_avg")).\
where("cnt>100").\
orderBy("score_avg", ascending=False).\
show(10)
SparkSQL Shuffle 分区数据
原因:在 SparkSQL 中当 Job 中产生 Shuffle 时,默认的分区数(spark.sql.shuffle.partitions)为200,在实际项目中要合理的设置。
可以设置在:
- 配置文件:
conf/spark-defaults.conf
-spark.sql.shuffle.partitions 2
- 在客户端提交参数中:
bin/spark-submit --conf "spark.sql.shuffle.partitions=2"
- 在代码中可以设置:
spark = SparkSession.builder.\ appName("myApp").\ master("local").\ config("spark.sql.shuffle.partitions", "2").\ getOrCreate()
SparkSQL 数据清洗 API
在大数据完整生产过程中,需要对数据进行数据清洗,将杂乱无章的数据整理为符合处理要求的规整数据。
dropDuplicates:数据去重
API:
DataFrame.dropDuplicates(subset=None)
dropna:删除有缺失值的行
API:
DataFrame.dropna(how='any', thresh=None, subset=None)
fillna:填充缺失值数据
API:
DataFrame.fillna(value)
代码:
# 填充所有缺失值
df.fillna("loss").show()
# 仅填充指定列
df.fillna("loss", subset=['job']).show()
# 给定字典,设定各个列的填充规则
df.fillna({"name":"未知姓名", "age":20, "job":"worker"}).show()
DataFrame 数据写出
使用 SparkSQL 统一 API 写出 DataFrame 数据
df.write.mode().\
format().\
option("Key", "Value").\
save("path/to/file")
- mode:传入模式,可选:append(追加)、overwrite(覆盖)、ignore(忽略)、error(重复就报异常,默认)
- format:写出文件格式,可选:text、csv、json、parquet、ocr、avro、jdbc
- save:写出的路径,支持本地文件和 HDFS
DataFrame 通过 JDBC 读写数据库(MySQL 示例)
略
SparkSQL 函数定义
回顾 Hive 中自定义函数有三种类型:
- UDF(User Defined Function)函数
- 一对一关系,输入一个值经过函数以后输出一个值;
- 在 Hive 中继承 UDF 类,方法名称为 evaluate,返回值不能为 void,其实就是实现一个方法
- UDAF(User Defined Aggregation Function)聚合函数
- 多对一的关系,输入多个值输出一个值,通常与 groupBy 联合使用
- UDTF(User Defined Table Generating Functions)函数
- 一对多关系,输入一个值输出多个值(一行变多行);
- 用户自定义表生成函数
SparkSQL 定义 UDF 函数
在 SparkSQL 中,目前仅仅支持 UDF 函数和 UDAF 函数,目前 Python 仅支持 UDF。
但可以通过 mapPatitions
算子模拟实现 UDAF;通过返回 array 或者 dict 来模拟实现 UDTF。
定义方式有两种:
sparksession.udf.register()
注册的 UDF 可用于 DSL 和 SQL
返回值用于 DSL 风格,传参内给的名字用于 SQL 风格
dsl_udf = sparksession.udf.register("sql_udf", f, returnType)
# 参数1:UDF名称,可用于SQL风格
# 参数2:被注册成UDF的方法名
# 参数3:声明UDF的返回值类型
# 返回值:UDF对象,可用于DSL风格
pyspark.sql.functions.udf
仅能用于 DSL 风格
dsl_udf = pyspark.sql.functions.udf(f, returnType)
# 参数1:被注册成UDF的方法名
# 参数2:声明UDF的返回值类型
# 返回值:UDF对象,可用于DSL风格
代码:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9], 3).map(lambda x:[x])
df = rdd.toDF(["num"])
dsl_udf = spark.udf.register('sql_udf', lambda x: x*2, IntegerType())
df.select(dsl_udf("num")).show()
df.selectExpr("sql_udf(num)").show()
dsl_udf2 = udf(lambda x: x*2, IntegerType())
df.select(dsl_udf2("num")).show()
SparkSQL 使用窗口函数
对每一行,对其相关行(即窗口)数据进行聚合。窗口函数包含分区、排序和框架这3个核心元素。
function over (
[partition by partition_expression]
[order by order_expression]
[frame]
)
将 DataFrame 注册成表,在 SQL 语句中使用窗口函数。