基于Spark 的零售交易数据挖掘分析与可视化

news/2024/10/9 12:34:19/

基于Spark 的零售交易数据挖掘分析与可视化

本文将带你通过 PySpark 进行电商数据的分析处理,并将结果保存为 JSON 文件,供前端展示。我们将从数据的读取、处理、分析到结果保存和网页展示,覆盖完整的数据流。项目结构如下:

1、Spark 分析数据
2、生成 JSON 文件
3、使用 Bottle 框架搭建简单 Web 服务器

项目简介

我们使用了 PySpark 来处理一个电商数据集,数据存储在 HDFS 上。通过 SQL 和 RDD 操作实现了多个业务需求分析,并最终将结果保存为 JSON 文件,用于前端展示。后端 Web 服务采用 Bottle 框架,提供静态文件服务和页面展示。

数据集介绍

数据集包括了以下字段:

InvoiceNo: 订单号
StockCode: 商品编码
Description: 商品描述
Quantity: 数量
InvoiceDate: 订单日期
UnitPrice: 商品单价
CustomerID: 客户编号
Country: 国家
在这里插入图片描述

1. 数据读取

首先,我们从 HDFS 中读取 CSV 文件作为 Spark 的 DataFrame,并通过 createOrReplaceTempView 创建 SQL 查询视图。代码如下:

# 从HDFS中读取数据集为DataFrame
df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('../data/E_Commerce_Data.csv')
df.createOrReplaceTempView("data")

2. 分析任务

通过 SQL 查询和 RDD 操作,项目实现了以下 10 项数据分析任务:

  1. 客户数最多的 10 个国家
    通过 SQL 查询,统计每个国家的客户数,并选出客户数最多的 10 个国家:
def countryCustomer():countryCustomerDF = spark.sql("SELECT Country,COUNT(DISTINCT CustomerID) AS countOfCustomer FROM data GROUP BY Country ORDER BY countOfCustomer DESC LIMIT 10")return countryCustomerDF.collect()

在这里插入图片描述

  1. 销量最高的 10 个国家
    统计每个国家的商品销量,并选出销量最高的 10 个国家:
def countryQuantity():countryQuantityDF = spark.sql("SELECT Country,SUM(Quantity) AS sumOfQuantity FROM data GROUP BY Country ORDER BY sumOfQuantity DESC LIMIT 10")return countryQuantityDF.collect()

在这里插入图片描述

  1. 各国总销售额分布
    计算每个国家的销售额,结果按销售额大小进行排序:
def countrySumOfPrice():countrySumOfPriceDF = spark.sql("SELECT Country,SUM(UnitPrice*Quantity) AS sumOfPrice FROM data GROUP BY Country")return countrySumOfPriceDF.collect()

在这里插入图片描述

  1. 销量最高的 10 个商品
    统计商品的销量,按销量大小选出销量最高的 10 个商品:
def stockQuantity():stockQuantityDF = spark.sql("SELECT StockCode,SUM(Quantity) AS sumOfQuantity FROM data GROUP BY StockCode ORDER BY sumOfQuantity DESC LIMIT 10")return stockQuantityDF.collect()

在这里插入图片描述

  1. 商品描述的热门关键词 Top 300
    通过对商品描述字段进行分词和词频统计,得到最热门的 300 个关键词:
def wordCount():wordCount = spark.sql("SELECT LOWER(Description) as description from data").rdd.filter(lambda line:line['description'] is not None).flatMap(lambda line:line['description'].split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b).repartition(1).sortBy(lambda x:x[1],False)wordCountSchema = StructType([StructField("word", StringType(), True),StructField("count", IntegerType(), True)])wordCountDF = spark.createDataFrame(wordCount, wordCountSchema)return wordCountDF.take(300)

在这里插入图片描述

  1. 退货订单数最多的 10 个国家
    统计退货订单数量最多的 10 个国家,退货订单的 InvoiceNo 以 ‘C’ 开头:
def countryReturnInvoice():countryReturnInvoiceDF = spark.sql("SELECT Country,COUNT(DISTINCT InvoiceNo) AS countOfReturnInvoice FROM data WHERE InvoiceNo LIKE 'C%' GROUP BY Country ORDER BY countOfReturnInvoice DESC LIMIT 10")return countryReturnInvoiceDF.collect()

在这里插入图片描述

  1. 月销售额随时间的变化趋势
    通过提取 InvoiceDate 中的年份和月份,计算每月的销售额:
def tradePrice():result3 = formatData()result4 = result3.map(lambda line:(line[0]+"-"+line[1],line[3]*line[4]))result5 = result4.reduceByKey(lambda a,b:a+b).sortByKey()schema = StructType([StructField("date", StringType(), True),StructField("tradePrice", DoubleType(), True)])tradePriceDF = spark.createDataFrame(result5, schema)return tradePriceDF.collect()

在这里插入图片描述

  1. 日销量随时间的变化趋势
    计算每天的销售量变化趋势,提取 InvoiceDate 的年、月、日,并进行汇总:
def saleQuantity():result3 = formatData()result4 = result3.map(lambda line:(line[0]+"-"+line[1]+"-"+line[2],line[3]))result5 = result4.reduceByKey(lambda a,b:a+b).sortByKey()schema = StructType([StructField("date", StringType(), True),StructField("saleQuantity", IntegerType(), True)])saleQuantityDF = spark.createDataFrame(result5, schema)return saleQuantityDF.collect()

在这里插入图片描述

  1. 各国购买订单量与退货订单量的关系
    通过联表查询,展示每个国家的购买订单量与退货订单量的关系:
def buyReturn():returnDF = spark.sql("SELECT Country AS Country,COUNT(DISTINCT InvoiceNo) AS countOfReturn FROM data WHERE InvoiceNo LIKE 'C%' GROUP BY Country")buyDF = spark.sql("SELECT Country AS Country2,COUNT(DISTINCT InvoiceNo) AS countOfBuy FROM data WHERE InvoiceNo NOT LIKE 'C%' GROUP BY Country2")buyReturnDF = returnDF.join(buyDF, returnDF["Country"] == buyDF["Country2"], "left_outer")buyReturnDF = buyReturnDF.select(buyReturnDF["Country"],buyReturnDF["countOfBuy"],buyReturnDF["countOfReturn"])return buyReturnDF.collect()

在这里插入图片描述

  1. 商品的平均单价与销量的关系
    通过计算每个商品的平均单价和总销量,展示二者的关系:
def unitPriceSales():unitPriceSalesDF = spark.sql("SELECT StockCode,AVG(DISTINCT UnitPrice) AS avgUnitPrice,SUM(Quantity) AS sumOfQuantity FROM data GROUP BY StockCode")return unitPriceSalesDF.collect()

在这里插入图片描述

3. 数据结果保存

所有分析结果都以 JSON 格式保存到 static/ 目录。我们定义了一个简单的 save() 函数来处理文件写入:

def save(path, data):with open(path, 'w') as f:f.write(data)

4. 使用 Bottle 框架搭建 Web 服务器

为了展示这些分析结果,我们使用了 Bottle 框架,提供静态文件服务。Web 服务器代码如下:

from bottle import route, run, static_file@route('/static/<filename>')
def server_static(filename):return static_file(filename, root="./static")@route("/<name:re:.*\.html>")
def server_page(name):return static_file(name, root=".")@route("/")
def index():return static_file("index.html", root=".")

通过访问 /static/filename 可以获取生成的 JSON 文件,访问 / 可以加载主页 index.html。

5. 运行项目

运行项目非常简单,只需启动 Python 脚本,它将自动生成分析结果,并启动 Bottle Web 服务器。

python app.py

在浏览器中访问 http://localhost:9999 即可查看分析结果。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

总结

通过 PySpark 处理海量电商数据,并将结果可视化,是数据分析和数据工程领域的典型场景。本项目展示了如何通过 Spark 进行数据的处理和分析,结合 Bottle 框架实现简单的 Web 服务,将结果供用户查看。

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于MapReduce, MySQL, python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等
在这里插入图片描述


http://www.ppmy.cn/news/1527140.html

相关文章

Python | Leetcode Python题解之第414题第三大的数

题目&#xff1a; 题解&#xff1a; class Solution:def thirdMax(self, nums: List[int]) -> int:a, b, c None, None, Nonefor num in nums:if a is None or num > a:a, b, c num, a, belif a > num and (b is None or num > b):b, c num, belif b is not No…

java--JDBC-连接池----JDBC小总结

一.连接池 1.连接池概述 目的&#xff1a;为了解决建立数据库连接耗费资源和时间很多的问题&#xff0c;提高性能。 Connection对象在JDBC使用的时候就会去创建一个对象,使用结束以后就会将这个对象给销毁了(close).每次创建和销毁对象都是耗时操作.需要使用连接池对其进行优…

音频评价指标

第一个是主观评价指标&#xff0c;后面几个是客观评价指标 1.MOS (Mean Opinion Score, 平均意见得分) 评价方法 MOS 是一种主观评估方法&#xff0c;通过让一组听众对合成的语音质量进行评分来衡量语音的自然度或质量。评分通常在 1 到 5 的范围内&#xff0c;1 表示“非常…

速盾:文件下载开cdn消耗流量大吗?

CDN&#xff08;内容分发网络&#xff09;是一种用于提高网站性能和用户体验的技术。它通过将静态文件和动态内容分发到位于世界各地的服务器节点&#xff0c;从而实现更快的加载速度和更高的可靠性。 在文件下载方面&#xff0c;CDN可以帮助提供更快速和可靠的下载体验。当用…

【乐吾乐大屏可视化组态编辑器】API接口文档(pgsql)

API接口文档&#xff08;pgsql&#xff09; 在线使用&#xff1a;https://v.le5le.com/ 采用前后端分离架构&#xff0c;乐吾乐后端服务提供一整套完整的web组态编辑器的所有数据接口&#xff0c;包含2D/3D图纸接口服务、文件接口服务和用户接口服务等&#xff0c;安装包版本…

智能机巢+无人机:自动化巡检技术详解

智能机巢与无人机的结合&#xff0c;在自动化巡检领域展现出了巨大的潜力和优势。以下是对这一技术的详细解析&#xff1a; 一、智能机巢概述 智能机巢&#xff0c;也被称为无人机机场或无人机机巢&#xff0c;是专门为无人机提供停靠、充电、维护等服务的智能化设施。它不仅…

【计算机网络】TCP 协议——详解三次握手与四次挥手

文章目录 一、引言二、TCP1、TCP 的数据格式2、TCP 的三次握手3、TCP 的四次挥手4、TCP 的全双工通信 三、TCP 的状态转换1、TCP 连接的建立&#xff08;三次握手&#xff09;状态2、TCP 连接的终止&#xff08;四次挥手&#xff09;状态3、TCP 异常情况 一、引言 TCP与UDP的区…

如何在win10Docker安装Mysql数据库?

1.拉取镜像 docker pull mysql 2.查看镜像 使用以下命令来查看是否已安装了 mysql镜像。 3.运行镜像 命令&#xff1a; docker run -p 3306:3306 --name mysql --restartalways --privilegedtrue \ -v /usr/local/mysql/log:/var/log/mysql \ -v /usr/local/mysql/data:/var…