Spark Core - Part II

 

第3章 RDD 持久化

RDD 的数据是过程数据

RDD 之间进行相互迭代计算(Transformation),当执行开启后,新 RDD 的生成,代表老 RDD 的消失

RDD 的数据是过程数据,只在处理的过程中存在,一旦处理完成,就不见了。

RDD 的缓存

基于 [[2021-12-15-SparkCore#RDD 的数据是过程数据|RDD 的数据是过程数据]],如果需要重复使用中间 RDD,那么需要重复生成这个中间 RDD。

因此,Spark 提供了缓存 API,可以让我们通过调用 API,将指定的 RDD 数据保留在内存或硬盘上。

API:

RDD.cache()
RDD.persist(storageLevel=StorageLevel(False, True, False, False, 1))
# 可选 storageLevel
# DISK_ONLY
# DISK_ONLY_2
# DISK_ONLY_3
# MEMORY_AND_DISK
# MEMORY_AND_DISK_2
# MEMORY_AND_DISK_DESER
# MEMORY_ONLY
# MEMORY_ONLY_2
# OFF_HEAP
  • 缓存技术可以将过程 RDD 数据,持久化保存到内存或者硬盘上
  • 在设计上存在丢失风险,因此,Spark 会保留 RDD 之间的血缘关系

示例代码:

from pyspark import SparkConf, SparkContext
from defs import context_jieba, filter_words, append_words, extract_user_and_words
from operator import add

if __name__ == '__main__':
    conf = SparkConf().setAppName('myApp').setMaster('local')
    sc = SparkContext(conf=conf)

    file_rdd = sc.textFile('./data/input/SogouQ.txt')
    split_rdd = file_rdd.map(lambda x: x.split("\t"))
    split_rdd.cache()  # 写入缓存

    # 搜索关键词统计
    context_rdd = split_rdd.map(lambda x: x[2])
    words_rdd = context_rdd.flatMap(context_jieba)
    filter_rdd = words_rdd.filter(filter_words)
    append_rdd = filter_rdd.map(append_words)
    # 聚合
    result1 = append_rdd.reduceByKey(lambda a, b: a + b).\
        sortBy(lambda x: x[1], ascending=False).\
        take(5)
    print(result1)

    # 用户搜索点击统计
    user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))
    user_words_rdd = user_content_rdd.flatMap(extract_user_and_words)
    # 聚合
    result2 = user_words_rdd.reduceByKey(lambda a,b: a+b).\
        sortBy(lambda x: x[1], ascending=False).\
        take(5)
    print(result2)

    # 搜索时间段统计
    time_rdd = split_rdd.map(lambda x: x[0])
    hour_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))
    #聚合
    result3 = hour_rdd.reduceByKey(add).\
        sortBy(lambda x: x[1], ascending=False).\
        take(5)
    print(result3)

RDD 的 CheckPoint

也是将 RDD 的数据保存起来,但是它仅支持硬盘存储

API:

SparkContext.setCheckpointDir(dirName)  # 指定checkpoint存储目录,支持本地文件、hdfs
RDD.checkpoint()
  • 在设计上认为是安全的,因此,不会保留血缘关系

缓存和 CheckPoint 的对比

  缓存 CheckPoint
形式 分散存储 集中存储
风险 缓存分区越多,风险越高 不管分区数量多少,风险是一样的
HDFS 不支持 支持
内存 支持 不支持
设计安全 不安全(保留血缘关系) 安全(不保留血缘关系)

注意

CheckPoint 是一种重量级的使用,也就是 RDD 的重新计算成本很高的时候,或者数据量很大时,我们采用 CheckPoint 比较合适。

如果数据量小,或者 RDD 重新计算是非常快的,用 CheckPoint 没有必要,直接缓存即可。

第4章 RDD 案例练习

搜索引擎日志分析案例

源数据:

用户查询日志(SogouQ)

业务需求:

搜索引擎日志分析-业务需求

jieba 库使用入门

jieba是优秀的中文分词第三方库

三种模式:

  • 精确模式(jieba.cut(s)):把文本精确的切分开,不存在冗余单词
  • 全模式(jieba.cut(s,cut_all=True)):把文本中所有可能的词语都扫描出来,有冗余
  • 搜索引擎模式(jieba.cut_for_search(s)):在精确模式基础上,对长词再次切分

案例实现代码

from pyspark import SparkConf, SparkContext
from defs import context_jieba, filter_words, append_words, extract_user_and_words
from operator import add

if __name__ == '__main__':
    conf = SparkConf().setAppName('myApp').setMaster('local')
    sc = SparkContext(conf=conf)

    file_rdd = sc.textFile('./data/input/SogouQ.txt')
    split_rdd = file_rdd.map(lambda x: x.split("\t"))
    split_rdd.cache()  # 写入缓存

    # 搜索关键词统计
    context_rdd = split_rdd.map(lambda x: x[2])
    words_rdd = context_rdd.flatMap(context_jieba)
    filter_rdd = words_rdd.filter(filter_words)
    append_rdd = filter_rdd.map(append_words)
    # 聚合
    result1 = append_rdd.reduceByKey(lambda a, b: a + b).\
        sortBy(lambda x: x[1], ascending=False).\
        take(5)
    print(result1)

    # 用户搜索点击统计
    user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))
    user_words_rdd = user_content_rdd.flatMap(extract_user_and_words)
    # 聚合
    result2 = user_words_rdd.reduceByKey(lambda a,b: a+b).\
        sortBy(lambda x: x[1], ascending=False).\
        take(5)
    print(result2)

    # 搜索时间段统计
    time_rdd = split_rdd.map(lambda x: x[0])
    hour_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))
    #聚合
    result3 = hour_rdd.reduceByKey(add).\
        sortBy(lambda x: x[1], ascending=False).\
        take(5)
    print(result3)
print(result3)

defs.py

import jieba

def context_jieba(data):
    l = []
    seq = jieba.cut_for_search(data)
    for x in seq:
        l.append(x)
    return l

def filter_words(data):
    return data not in ['谷', '帮', '客']

def append_words(data):
    if data == '传智播': data = '传智播客'
    if data == '博学': data = '博学谷'
    if data == '院校': data = '院校帮'
    return (data, 1)

def extract_user_and_words(data):
    user_id = data[0]
    content = data[1]
    
    l = []
    seq = jieba.cut_for_search(content)
    for x in seq:
        if filter_words(x):
            l.append((user_id + '_' + append_words(x)[0], 1))
    
    return l

提交到集群运行

spark-submit --master yarn --py-files /path/to/defs.py /path/to/main.py

注意

  • 删除 setMater 部分
  • 读取的文件路径改为 hdfs 路径

榨干集群性能

指定内存和 CPU 核心

spark-submit --master yarn --py-files /path/to/defs.py \
--executor-memory 2g \  # executor内存
--executor-cores 1 \  # executor核心数
--num-executors 6 \  # 总executor数量
/path/to/main.py

第5章 RDD 共享变量

广播变量

引出问题

案例代码:

import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    stu_info_list = [(1, '张大仙', 11), (2, '王晓晓', 13), (3, '张甜甜', 11),
                     (4, '王大力', 11)]

    score_info_rdd = sc.parallelize([
        (1, '语文', 99), (2, '数学', 99), (3, '英语', 99), (4, '编程', 99),
        (1, '语文', 99), (2, '编程', 99), (3, '语文', 99), (4, '英语', 99),
        (1, '语文', 99), (3, '英语', 99), (2, '编程', 99)
    ])

    def map_func(data):
        id = data[0]
        name = ""
        for stu_info in stu_info_list:
            stu_id = stu_info[0]
            if id == stu_id:
                name = stu_info[1]

        return (name, data[1], data[2])

    print(score_info_rdd.map(map_func).collect())

本地 list 对象,被发送到每个分区的处理线程上使用,也就是每个 executor 内,其实存放了2份一样的数据。
executor 是进程,进程内资源共享,这2份数据没有必要,造成了内存浪费。

广播变量-引出问题.

解决方案 - 广播变量

如果将本地 list 对象标记为广播变量,那么 Spark 会:
只给每个 executor 一份数据,而不是像原本那样,每一个分区的处理线程各一份,节省了内存。

广播变量

使用方法

# 1. 将本地Python List对象标记为广播变量
broadcast = sc.broadcast(stu_info_list)

# 2. 在使用到本地集合对象的地方, 从广播变量中取出来用即可
broadcast.value

累加器

引出问题

案例代码:

# coding:utf8
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    def map_func(date):
        global count
        count += 1
        print(count)

    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
    count = 0
    # count = sc.accumulator(0)

    rdd.map(map_func).collect()
    print(count)  # 0
    # print(count)  # 10

driver 将 count 对象复制发送给每个 executor,此时,driver 和每个 executor 各有一个 count,互不相干。

解决方案 - 累加器

使用累加器对象(sc.accumulator(init_value)),这个对象可以从各个 executor 中收集它们的执行结果,作用回自己身上。

注意事项

    rdd2 = rdd1.map(map_func)
    rdd2.collect()
    rdd3 = rdd2.map(lambda x: x)
    rdd3.collect()
    print(count)  # 20

原因:构建 rdd3 时,rdd2 不存在,需要重新再次构建 rdd2,所有 map_func 被调用两次。

综合案例

源数据:

   hadoop spark # hadoop spark spark
mapreduce ! spark spark hive !
hive spark hadoop mapreduce spark %
   spark hive sql sql spark hive , hive spark !
!  hdfs hdfs mapreduce mapreduce spark hive

  #

需求:

  1. 正常单词统计计数
  2. 特殊字符统计总数

代码:

# coding:utf8
from pyspark import SparkConf, SparkContext
import re
from operator import add

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    file_rdd = sc.textFile("./data/input/accumulator_broadcast_data.txt")
    abnormal_char = [",", ".", "!", "#", "$", "%"]

    # 广播特殊字符变量
    broadcast = sc.broadcast(abnormal_char)
    # 定义累加器
    acmlt = sc.accumulator(0)

    # 处理数据
    # 1. 删除空行
    # 2. 删除每行两端空格
    # 3. 正则表达式、切分
    words_rdd = file_rdd.filter(lambda l: l.strip()).\
        map(lambda l: l.strip()).\
        flatMap(lambda l: re.split("\s+", l))

    # 过滤特殊字符,同时计数
    def filter_func(data):
        global acmlt
        if data in broadcast.value:
            acmlt += 1
            return False
        else:
            return True

    normal_words_rdd = words_rdd.filter(filter_func)

    # 正常单词计数
    result_rdd = normal_words_rdd.map(lambda x: (x, 1)).\
        reduceByKey(add)

    # 打印结果
    print(result_rdd.collect())
	# [('hadoop', 3), ('spark', 11), ('mapreduce', 4), ('hive', 6), ('sql', 2), ('hdfs', 2)]
    print(acmlt)  # 8