Spark SQL I

 

快速入门

什么是 SparkSQL

Spark SQL is Spark’s module for working with structured data.

限定:结构化数据处理

SparkSQL 的特点

  1. 融合性:SQL 可以无缝集成在代码中,随时用 SQL 处理数据
  2. 统一数据访问:一套标准 API 可读写不同数据源
  3. Hive 兼容:可以使用 SparkSQL 直接计算并生成 Hive 数据表
  4. 标准化连接:支持标准化 JDBC\ODBC 连接,方便和各种数据库进行数据交互

SparkSQL 概述

SparkSQL 和 Hive 的异同

SparkSQL和Hive的异同

SparkSQL 和 Hive 都是”分布式 SQL 计算引擎“

SparkSQL 的数据抽象

Pandas - DataFrame SparkCore - RDD SparkSQL - DataFrame
二维表数据结构 不限定标准数据结构 二维表数据结构
本地集合 分布式集合 分布式集合

SparkSQL 其实有3类数据抽象对象:

  • SchemaRDD 对象(已废弃):引入 Schema 的 RDD
  • Dataset 对象:加入泛型特性,可用于 Java、Scala 语言
  • DataFrame 对象:可用于 Java、Scala、Python、R

DataFrame 概述

DataFrame v.s. RDD:

  DataFrame RDD
存储能力 只能存储二维表结构数据 存储任意结构的数据
示例 DFvsRDD1 DFvsRDD2

SparkSession 对象

在 Spark 2.0 后,推出了 SparkSession 对象,作为 Spark 编程的统一入口对象。

SparkSession 对象可以:

  • 用于 SparkSQL 编程作为入口对象
  • 用于 SparkCore 编程,可以通过 SparkSession 对象中获取到 SparkContext
from pyspark.sql import SparkSession

if __name__ == '__main__':
	spark = SparkSession.builder.\
		appName('myApp').\
		master('local').\
		config(...).\
		getOrCreate()

DataFrame 入门

DataFrame 的组成

DataFrame 是一个二维表结构,其表格结构(表、行、列)的组成如下:

在结构层面:

  • StructType 对象描述整个 DataFrame 的表结构
  • StructField 对象描述一个列的信息

在数据层面:

  • Row 对象记录一行数据
  • Column 对象记录一列数据并包含列的信息

StructType 的构建方法如下:

struct_type = StructType().\
	add("id", IntegerType(), False).\
	add("subject", StringType(), True).\
	add("age", IntegerType(), False)
  • StructField 包含:列名、列类型、是否可为空
  • 一个 StructType 对象由多个 StructField 组成

基于 RDD 构建

DataFrame 对象可以从 RDD 转换而来

使用列表指定列名,列类型推断得到

# coding:utf8
from pyspark.sql import SparkSession

if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("myApp").\
        master("local").\
        getOrCreate()
    sc = spark.sparkContext

    rdd = sc.textFile("./data/input/sql/people.txt").\
        map(lambda x: x.split(",")).\
        map(lambda x: (x[0], int(x[1])))

    df = spark.createDataFrame(rdd, schema=['name','age'])
    df.printSchema()
    df.show()

使用 StructType 对象定义表结构

    schema = StructType().add("name", StringType(), False).\
        add("age", IntegerType(), True)
    df = spark.createDataFrame(rdd, schema=schema)

使用 rdd.toDF() 方法

	df = rdd.toDF(["name", "age"])
	df = rdd.toDF(schema=schema)

基于 Pandas 的 DataFrame

	df = spark.createDataFrame(pandas_df, schema=...)

读取外部数据

通过 SparkSQL 的统一 API 进行数据读取构建 DataFrame

spark.read.format("text|csv|json|parquet|orc|avro|jdbc|...")
	.option("Key", "Value")  # 可选
	.schema(StructType|String)
	.load("/path/to/file")  # 支持本地文件和HDFS
  • format(“text”):只有一个列,列明默认为 value
  • format(“json”):一般不用指明 schema,json 自带列名和列类型(字符串和数字)
  • format(“csc”)
  • format(“parquet”):一种列存储文件格式,内置 schema,经过序列化

DataFrame 的入门操作

DataFrame 支持两种编程风格:

  • DSL 风格
  • SQL 风格

DSL 风格

  • show:展示 DataFrame 中的数据,默认展示20条
  • printSchema:打印输出 df 的 schema 信息
  • select:选择 DataFrame 中的指定列
  • where(filter):过滤 DataFrame 内的数据,返回一个过滤后的 DataFrame
  • groupBy:按照指定的列进行数据分组,返回值是 GroupedData 对象

SQL 风格

如果想要使用 SQL 风格的语法,需要将 DataFrame 注册成表,采用如下的方式之一:

  • df.createTempView(“table_name”) # 注册一个临时视图
  • df.createOrReplace(“table_name”) # 注册一个临时试图,如果存在则进行替换
  • df.createGlobalTempView(“table_name”) # 注册一个全局视图
  • df.createOrReplaceGlobalTempView(“table_name”) # 注册一个全局视图,如果存在则进行替换

临时表和全局表

  • 临时表:只在当前 SparkSession 中可用
  • 全局表:跨 SparkSession 对象使用,在一个程序内的多个 SparkSession 中均可调用,查询前带上前缀 global_temp

表注册完成后,可以通过以下语法进行查询:

spark.sql("""
	sql_query
""").show()

pyspark.sql.functions 包

PySpark 提供了一个包:pyspark.sql.functions
包内提供了一系列的计算函数,供 SparkSQL 使用

WordCount 案例练习

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode

if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("myApp").\
        master("local").\
        getOrCreate()
    sc = spark.sparkContext

	# SQL
    rdd = sc.textFile("./data/input/words.txt").\
        flatMap(lambda x: x.split(" ")).\
        map(lambda x: [x])
    df = rdd.toDF(["word"])
    df.createOrReplaceTempView("words")

    spark.sql("""
        select word, count(*)
        from words
        group by word
    """).show()

	# DSL
    df = spark.read.format("text").\
        schema("word string").\
        load("./data/input/words.txt")
    df.select(explode(split("word", " ")).alias("word")).\
        groupBy("word").\
        count().\
        show()

电影评分数据分析案例

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import avg, count

if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("myApp").\
        master("local").\
        getOrCreate()
    sc = spark.sparkContext

    schema = StructType().add("user_id", StringType(), False).\
        add("movie_id", StringType(), False).\
        add("score", IntegerType(), False).\
        add("ts", StringType(), True)
    df = spark.read.format("csv").\
        option("sep", "\t").\
        option("header", False).\
        option("encoding", "utf-8").\
        schema(schema).\
        load("./data/input/sql/u.data")

    # 1. 求用户平均分
    df.groupBy("user_id").\
        avg("score").\
        show()

    # 2. 求电影平均分
    df.groupBy("movie_id").\
        avg("score").\
        show()

    # 3. 大于平均分的评分的数量
    score_avg = df.select(avg("score")).first()[0]
    print(df.where(f"score > {score_avg}").count())

    # 4. 找出打分次数最多的用户,并计算它的打分平均分
    max_user_id = df.groupBy("user_id").\
        count().\
        orderBy("count", ascending=False).\
        first()["user_id"]
    # print(max_user_id)
    print(df.where(f"user_id={max_user_id}").select(avg("score")).first()[0])

    # 5. 查询评分次数超过100次的电影,按平均分展示TOP10
    df.groupBy("movie_id").\
        agg(
            count("score").alias("cnt"),
            avg("score").alias("score_avg")).\
        where("cnt>100").\
        orderBy("score_avg", ascending=False).\
        show(10)

SparkSQL Shuffle 分区数据

原因:在 SparkSQL 中当 Job 中产生 Shuffle 时,默认的分区数(spark.sql.shuffle.partitions)为200,在实际项目中要合理的设置。

可以设置在:

  1. 配置文件:conf/spark-defaults.conf - spark.sql.shuffle.partitions 2
  2. 在客户端提交参数中:bin/spark-submit --conf "spark.sql.shuffle.partitions=2"
  3. 在代码中可以设置:
    spark = SparkSession.builder.\
     appName("myApp").\
     master("local").\
     config("spark.sql.shuffle.partitions", "2").\
     getOrCreate()
    

SparkSQL 数据清洗 API

在大数据完整生产过程中,需要对数据进行数据清洗,将杂乱无章的数据整理为符合处理要求的规整数据。

dropDuplicates:数据去重

API:

DataFrame.dropDuplicates(subset=None)

dropna:删除有缺失值的行

API:

DataFrame.dropna(how='any', thresh=None, subset=None)

fillna:填充缺失值数据

API:

DataFrame.fillna(value)

代码:

# 填充所有缺失值
df.fillna("loss").show()
# 仅填充指定列
df.fillna("loss", subset=['job']).show()
# 给定字典,设定各个列的填充规则
df.fillna({"name":"未知姓名", "age":20, "job":"worker"}).show()

DataFrame 数据写出

使用 SparkSQL 统一 API 写出 DataFrame 数据

df.write.mode().\
	format().\
	option("Key", "Value").\
	save("path/to/file")
  • mode:传入模式,可选:append(追加)、overwrite(覆盖)、ignore(忽略)、error(重复就报异常,默认)
  • format:写出文件格式,可选:text、csv、json、parquet、ocr、avro、jdbc
  • save:写出的路径,支持本地文件和 HDFS

DataFrame 通过 JDBC 读写数据库(MySQL 示例)

SparkSQL 函数定义

回顾 Hive 中自定义函数有三种类型:

  • UDF(User Defined Function)函数
    • 一对一关系,输入一个值经过函数以后输出一个值;
    • 在 Hive 中继承 UDF 类,方法名称为 evaluate,返回值不能为 void,其实就是实现一个方法
  • UDAF(User Defined Aggregation Function)聚合函数
    • 多对一的关系,输入多个值输出一个值,通常与 groupBy 联合使用
  • UDTF(User Defined Table Generating Functions)函数
    • 一对多关系,输入一个值输出多个值(一行变多行);
    • 用户自定义表生成函数

SparkSQL 定义 UDF 函数

在 SparkSQL 中,目前仅仅支持 UDF 函数和 UDAF 函数,目前 Python 仅支持 UDF。
但可以通过 mapPatitions 算子模拟实现 UDAF;通过返回 array 或者 dict 来模拟实现 UDTF。

定义方式有两种:

sparksession.udf.register()

注册的 UDF 可用于 DSL 和 SQL
返回值用于 DSL 风格,传参内给的名字用于 SQL 风格

dsl_udf = sparksession.udf.register("sql_udf", f, returnType)
# 参数1:UDF名称,可用于SQL风格
# 参数2:被注册成UDF的方法名
# 参数3:声明UDF的返回值类型
# 返回值:UDF对象,可用于DSL风格

pyspark.sql.functions.udf

仅能用于 DSL 风格

dsl_udf = pyspark.sql.functions.udf(f, returnType)
# 参数1:被注册成UDF的方法名
# 参数2:声明UDF的返回值类型
# 返回值:UDF对象,可用于DSL风格

代码:

rdd = sc.parallelize([1,2,3,4,5,6,7,8,9], 3).map(lambda x:[x])
df = rdd.toDF(["num"])

dsl_udf = spark.udf.register('sql_udf', lambda x: x*2, IntegerType())
df.select(dsl_udf("num")).show()
df.selectExpr("sql_udf(num)").show()

dsl_udf2 = udf(lambda x: x*2, IntegerType())
df.select(dsl_udf2("num")).show()

SparkSQL 使用窗口函数

This section describes nonaggregate window functions that, for each row from a query, perform a calculation using rows related to that row.

对每一行,对其相关行(即窗口)数据进行聚合。窗口函数包含分区、排序和框架这3个核心元素。

function over (
    [partition by partition_expression]
    [order by order_expression]
    [frame]
)

将 DataFrame 注册成表,在 SQL 语句中使用窗口函数。