Spark 综合案例

 

案例背景

需求:对某零售公司各省店铺的销售数据,进行统计分析。

开发需求:

  1. 每个省份的销售额统计
  2. 销售额TOP3的省份中,统计有多少家店铺日均销售额1000+
  3. 销售额TOP3的省份中,各个省份的平均每单销售额
  4. 销售额TOP3的省份中,各个省份的支付类型比例

案例数据:下载

样例数据(Json):

样例json

相关需求字段:

  • storeProvince(店铺所在省份)
  • storeID(店铺ID)
  • dateTS(订单日期)
  • receivable(收款金额)
  • payType(付款类型)

实现代码:

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_unixtime

if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("myApp").\
        master("local").\
        config("spark.sql.shuffle.partitions", "4").\
        config("spark.sql.warehouse.dir", "...").\
        config("hive.metastore.uris", "...").\
        enableHiveSupport().\
        getOrCreate()
    
    # 导入数据
    # 省份信息缺失处理
    # 处理订单金额超过10000
    # 只取所需字段
    df = spark.read.format("json").\
        load("./data/input/mini.json").\
        dropna(thresh=1, subset=["storeProvince"]).\
        filter("storeProvince <> 'null'").\
        filter("receivable < 10000").\
        select(["storeProvince", "storeID", "dateTS", "receivable", "payType"])

    # 1.每个省份的销售额统计
    rec_sum_df = df.groupBy("storeProvince").sum("receivable")
    rec_sum_df.show()

    # 2.销售额TOP3的省份中,统计有多少家店铺日均销售额1000+
    top3_prov = rec_sum_df.orderBy("sum(receivable)", ascending=False).limit(3)

    top3_df = top3_prov.join(df, on="storeProvince", how="inner")
    top3_daily_df = top3_df.groupBy("storeProvince", "storeID", from_unixtime(top3_df.dateTS.substr(0,10), "yyyy-MM-dd").alias("date")).\
        sum("receivable")
    top3_daily_avg_df = top3_daily_df.groupBy("storeProvince", "storeID").\
        avg("sum(receivable)").\
        withColumnRenamed("avg(sum(receivable))", "daily_avg")
    top3_daily_avg_df.where("daily_avg > 1000").show()

    # 3.销售额TOP3的省份中,各个省份的平均每单销售额
    top3_df.groupBy("storeProvince").\
        avg("receivable").\
        show()
    
    # 4.销售额TOP3的省份中,各个省份的支付类型比例
    top3_pay_count = top3_df.groupBy("storeProvince", "payType").\
        count().withColumnRenamed("count", "pay_count")
    top3_prov_count = top3_df.groupBy("storeProvince").\
        count().withColumnRenamed("count", "prov_count")
    top3_pay_count.join(top3_prov_count, on="storeProvince", how="inner").\
        withColumn("payRate", top3_pay_count.pay_count/top3_prov_count.prov_count).\
        show()

输出结果:

+--------------+------------------+                                                                                     
| storeProvince|   sum(receivable)|
+--------------+------------------+
|        广东省|1713207.9233400004|
|        北京市|10926.909999999993|
|        浙江省|            4568.1|
|        湖南省|1701303.5340000016|
|广西壮族自治区|          37828.22|
|        江苏省|            6357.9|
|        上海市|            7358.5|
|        江西省|             553.5|
|        山东省|             664.0|
+--------------+------------------+

+-------------+-------+------------------+                                                                              
|storeProvince|storeID|         daily_avg|
+-------------+-------+------------------+
|       广东省|   1595|            1696.0|
|       湖南省|   2177|            1242.0|
|       广东省|   2118| 1209.321739130435|
|       湖南省|   1926|1763.7142857142858|
|       湖南省|    915|            1850.0|
|       广东省|   3654|            4409.0|
|       湖南省|    769|1499.3333333333333|
|       广东省|    571|            3927.0|
|       广东省|    407|          1298.125|
|       广东省|   3629|            9540.0|
|       广东省|   2653|            1178.5|
|       广东省|   1865|          2701.098|
|       广东省|   1475|            1027.0|
|       广东省|   2645|            1947.5|
|       广东省|   3725|           1175.25|
|       广东省|   2091|1038.2857142857142|
|       湖南省|   3454|            2258.0|
|       湖南省|   1057|            1860.0|
|       广东省|   3247|            1650.5|
|       湖南省|   2552|            1020.0|
+-------------+-------+------------------+
only showing top 20 rows

+--------------+------------------+                                                                                     
| storeProvince|   avg(receivable)|
+--------------+------------------+
|        广东省| 32.80939010935137|
|        湖南省| 36.85506550842688|
| 广西壮族自治区|40.029862433862434|
+--------------+------------------+

+--------------+--------+---------+----------+--------------------+                                                     
| storeProvince| payType|pay_count|prov_count|             payRate|
+--------------+--------+---------+----------+--------------------+
|        广东省|  alipay|     3595|     52217| 0.06884731026294119|
|        广东省|bankcard|      378|     52217|0.007239021774517877|
|        广东省|    cash|    27627|     52217|  0.5290805676312312|
|        广东省|  wechat|    20617|     52217| 0.39483310033130975|
|        湖南省|  wechat|    11487|     46162| 0.24884103808327196|
|        湖南省|  alipay|     1958|     46162| 0.04241583986828994|
|        湖南省|bankcard|       29|     46162|6.282223473852952E-4|
|        湖南省|    cash|    32688|     46162|  0.7081148997010528|
|广西壮族自治区|  alipay|       40|       945|0.042328042328042326|
|广西壮族自治区|bankcard|        9|       945|0.009523809523809525|
|广西壮族自治区|    cash|      692|       945|  0.7322751322751323|
|广西壮族自治区|  wechat|      204|       945| 0.21587301587301588|
+--------------+--------+---------+----------+--------------------+