【Project】基于spark-App端口懂车帝数据采集与可视化

文章目录

      • hadoop完全分布式部署
        • hdfs-site.xml
        • core-site.xml
        • marpred-site.xml
        • yarn-site.xml
      • spark集群部署
        • spark-env.sh
      • mongodb分片模式部署
        • config 服务器
          • 初始化config 副本集
        • shard 服务器
          • 初始化shard 副本集
        • mongos服务器
          • 添加shard
          • 设置chunk大小
        • 启动分片
          • 为集合 user 创建索引并进行分片
      • fillder抓包+数据采集
      • spark清洗数据
      • Flask+echarts进行可视化
      • 效果图

hadoop完全分布式部署

hdfs-site.xml
<configuration><property><name>dfs.namenode.name.dir</name><value>file:/usr/local/src/hadoop/dfs/name</value></property><property><name>dfs.datanode.data.dir</name><value>file:/usr/local/src/hadoop/dfs/data</value></property><property><name>dfs.replication</name><value>3</value></property></property><!-- Secondary NameNode 所在主机的ip和端口  --><property><name>dfs.namenode.secondary.http-address</name><value>master02:9890</value></property>
</configuration>
core-site.xml
<configuration><!--   --><!-- 设置hadoop的文件系统,由URI指定  --><property><!-- 指定namenode地址节点所在机器  --><name>fs.defaultFS</name><value>hdfs://master01:9000</value></property><!-- 配置hadoop临时目录,默认/tmp/hadoop-${user.name}  --><property><name>hadoop.tmp.dir</name><value>/usr/local/src/hadoop/tmp/</value></property></configuration>
marpred-site.xml
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>mapreduce.jobhistory.address</name><value>master01:10020</value></property>
</configuration>
yarn-site.xml
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>mapreduce.jobhistory.address</name><value>master01:10020</value></property>
</configuration>

spark集群部署

spark-env.sh
export JAVA_HOME=/usr/local/src/jdk1.8.0_152
export HADOOP_CONF_DIR=/usr/local/src/hadoop-3.2.2/etc/hadoop/
export SPARK_MASTER_IP=master01
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_MEMORY=512m
export SPARK_WORKER_CORES=1
export SPARK_EXECUTOR_MEMORY=512m
export SPARK_EXECUTOR_CORES=1
export SPARK_WORKER_INSTANCES=1

mongodb分片模式部署

服务器名称IP地址Shard1 (端口/角色)Shard2 (端口/角色)Shard3 (端口/角色)mongos (端口)Config Server (端口/角色)
master01192.168.121.13427018 (主节点)27020 (仲裁节点)27019 (副节点)2702127022 (主节点)
node01192.168.121.13527019 (副节点)27018 (主节点)27020 (仲裁节点)2702127022 (副节点)
node02192.168.121.13627020 (仲裁节点)27019 (副节点)27018 (主节点)-27022 (副节点)
config 服务器
dbpath=/usr/local/src/mongodb_demo/shardcluster/configServer/data
logpath=/usr/local/src/mongodb_demo/shardcluster/configServer/logs/config_server.log
port=27022
bind_ip=master01
logappend=true
fork=trues
maxConns=5000
replSet=configs
configsvr=true
初始化config 副本集
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/configServer/configFile/mongodb_config.conf$ ./mongo --host master01 --port 27022
> rs.initiate()
configs:SECONDARY> rs.add('node01:27022')
configs:PRIMARY> rs.add('node02:27022')
shard 服务器
dbpath=/usr/local/src/mongodb_demo/shardcluster/shard/shard1_data
logpath=/usr/local/src/mongodb_demo/shardcluster/shard/logs/shard1.log
port=27018
bind_ip=master01
logappend=true
fork=true
maxConns=5000
replSet=shard1
shardsvr=true

类似地,编辑 shard2.conf 和 shard3.conf,分别修改 dbpath, logpath, port, bind_ip, 和 replSet 参数。

初始化shard 副本集
# 所有节点都启动
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard1.conf
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard2.conf
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard3.conf# master01节点
$ ./mongo --host master01 --port 27018
> rs.initiate()
configs:SECONDARY> rs.add('node01:27019')
configs:PRIMARY> rs.add('node02:27020')# node01节点
$ ./mongo --host node01--port 27018
> rs.initiate()
configs:SECONDARY> rs.add('node02:27019')
configs:PRIMARY> rs.add('master01 :27020')# node02节点
$ ./mongo --host node02 --port 27018
> rs.initiate()
configs:SECONDARY> rs.add('master01 :27019')
configs:PRIMARY> rs.add('node01:27020')
mongos服务器
logpath=/usr/local/src/mongodb_demo/shardcluster/mongos/logs/mongos.log
logappend=true
port=27021
bind_ip=master01
fork=true
configdb=configs/master01:27022,node01:27022,node02:27022
maxConns=20000

类似地,在node01节点配置

添加shard

$ ./mongo --host master01--port 27021mongos> use gateway
mongos> sh.addShard("shard1/master01:27018,node01:27019,node02:27020")
mongos> sh.addShard("shard2/master01:27020,node01:27018,node02:27019")
mongos> sh.addShard("shard3/master01:27019,node01:27020,node02:27018")
设置chunk大小
use config;
db.settings.insertOne({ _id: "chunksize", value: 64 });
启动分片
mongos> use gateway
mongos> sh.enableSharding("xxxdatabase")
为集合 user 创建索引并进行分片
mongos> use xxxdatabase
mongos> db.user.createIndex({"id":1})
mongos> sh.shardCollection("xxxdatabase.xxxcollection",{"id":1})

fillder抓包+数据采集

在这里插入图片描述

    def data_SalesVolume(self,allcity):'''获取全国和各城市车辆销售量排名数据结果存储到本地和hdfs:param allcity: 城市名称-->list:return:'''data_list = []for city in allcity:city_quote = quote(city)newurl = self.url+f'method=jsb.app.fetch&rank_data_type=116&month=202412&energy_type&price=0%2C-1&manufacturer&rank_city_name={city_quote}&market_time=0&offset=0&count=500&scm_version=1.0.0.2209&iid=3991681621300419&device_id=3991681621296323&ac=wifi&channel=home&aid=36&app_name=automobile&version_code=748&version_name=7.4.8&device_platform=android&os=android&ab_client=a1%2Cc2%2Ce1%2Cf2%2Cg2%2Cf7&ab_group=3167590%2C3577236&ssmix=a&device_type=PCLM10&device_brand=OPPO&language=zh&os_api=28&os_version=9&manifest_version_code=748&resolution=720*1280&dpi=320&update_version_code=7483&_rticket=1738335686380&cdid=637e94d4-1e98-49a9-9b9e-7f567951c149&rom_version=coloros__pq3b.190801.10161630+release-keys&longi_lati_type=0&longi_lati_time=0&content_sort_mode=0&total_memory=3.85&cpu_name=Qualcomm+Technologies%2C+Inc+MSM8998&overall_score=8.6995&cpu_score=7.9848&host_abi=armeabi-v7a'print(f'准备获取=={city}==的数据---{newurl}---')citynames = self.session.get(url=newurl, headers=self.headers).json()city_sales = {city:citynames}data_list.append(city_sales)# print(citynames)print('----------获取销量数据完成-----------')data_json = {'SalesValue':data_list}with open(f'data/data_salesvolume.json','w',encoding='utf-8') as f:json.dump(data_json,f,ensure_ascii=False,indent=4)hdfs_path = "/data_doncar/data_carkm.json"  # HDFS 目标路径self.hdfs_client.create(hdfs_path, data_json, overwrite=True)  # 上传到 HDFS

spark清洗数据

创建对象

package org.exampleimport org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._object DonCar_spark {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)Logger.getLogger("akka").setLevel(Level.ERROR)val spark = SparkSession.builder().appName("car_analysis").master("local[*]").config("spark.mongodb.read.connection.uri", "mongodb://root:123456@192.168.121.134:27017").getOrCreate()

连接mongodb数据库

    def readmongocollection(db:String,collection:String) = {spark.read.format("mongodb").option("database",db).option("collection",collection).load()}

创建临时表

val car_info = readmongocollection("doncar_top","car_info")val citys_sv = readmongocollection("doncar_top","citys_sv")val nationwide_sv = readmongocollection("doncar_top","nationwide_sv")citys_sv.createOrReplaceTempView("citys_table")car_info.createOrReplaceTempView("car_table")nationwide_sv.createOrReplaceTempView("nationwide_table")

数据分析 并且把数据存到HDFS


//    各城市汽车销量val citys_sales = spark.sql("select city,sales from citys_table").groupBy("city").sum("sales")//    电车续航排名val brand_max_km = spark.sql("select brand,range_km from car_table").groupBy("brand").max()brand_max_km.limit(7).show()//    油电销量val car_type_sales =spark.sql("select sales,car_type from nationwide_table ").withColumn("category",when(col("car_type") === 0,0).otherwise(1)).groupBy("category").sum("sales")car_type_sales.show()// 能源占比
//    0 纯油
//    1 纯电
//    2 混动
//    3-4 增程val car_type = spark.sql("select car_type,sales from nationwide_table").groupBy("car_type").sum("sales")//    car_type.printSchema()val car_type2 = car_type.withColumn("sum(sales)",when(col("car_type") === 3,col("sum(sales)")+car_type.filter(col("car_type") === 4).select("sum(sales)").first().getLong(0)).otherwise(col("sum(sales)")))car_type2.show()//销量——价格关系val price_sales = spark.sql("SELECT min_price,max_price,sales from nationwide_table").withColumn("average_price",(col("min_price") + col("max_price")) / 2).drop("min_price","max_price")price_sales.show()//   品牌销量val brand_sales = spark.sql("SELECT brand_name,model,sales FROM nationwide_table").groupBy("brand_name").sum("sales").sort(col("sum(sales)").desc)brand_sales.show()//    城市销量排名val city_top = citys_sales.sort(col("sum(sales)").desc)city_top.show()//    车辆销售排名val car_top = spark.sql("SELECT model,sales FROM nationwide_table").sort(col("sales").desc)car_top.show()}
}

在这里插入图片描述

Flask+echarts进行可视化

from flask import Flask, render_template, redirect, url_for, request, flash, session,jsonify
from SparkAnalysis import analysis
from SparkAnalysis import Get_HdfsData
import pymysqlapp = Flask(__name__)#重定向路由
@app.route('/')
def default():return redirect(url_for('login'))@app.route('/login', methods=['GET', 'POST'])
def login():if request.method == 'POST':username = request.form['username']password = request.form['password']conn = pymysql.connect(host='localhost',  # 数据库主机地址user='root',  # 数据库用户名password='123456',  # 数据库密码database='users'  # 数据库名称)with conn.cursor() as cursor:cursor.execute("SELECT * FROM account WHERE username = %s AND password = %s", (username, password))account = cursor.fetchone()  # 获取第一行记录conn.close()if account:return redirect(url_for('index'))else:return redirect(url_for('login'))return render_template('login.html')@app.route('/register', methods=['GET', 'POST'])
def register():if request.method == 'POST':username = request.form['username']password = request.form['password']conn = pymysql.connect(host='localhost',user='root',password='123456',database='users')with conn.cursor() as cursor:cursor.execute("SELECT * FROM account WHERE username = %s", (username,))if cursor.fetchone():return redirect(url_for('register'))# 如果用户名不存在,插入新用户cursor.execute("INSERT INTO account (username, password) VALUES (%s, %s)", (username, password))conn.commit()conn.close()return redirect(url_for('login'))return render_template('register.html')@app.route('/index')
def index():return render_template('index.html',map_data=Get_HdfsData.get_map_data(),bar1_x = analysis.get_bar1_data()['brand'].values.tolist(),# bar1_y=Get_HdfsData.get_bar1_data()['range_km'].values.tolist(),bar1_y=[float(num) for num in analysis.get_bar1_data()['range_km'].values.tolist()],type_0 = Get_HdfsData.get_num_data()[0][2:],type_n0 = Get_HdfsData.get_num_data()[1][2:],pie_data = Get_HdfsData.get_pie_data(),scatter_data = Get_HdfsData.get_scatter_data(),bar2_x=analysis.get_bar2_data()['brand_name'].values.tolist()[:5],bar2_y=analysis.get_bar2_data()['sales'].values.tolist()[:5],line1_x_top=Get_HdfsData.get_line1_data()['city'].values.tolist()[:12],line1_y_top=Get_HdfsData.get_line1_data()['value'].values.tolist()[:12],line1_x_last=analysis.get_line1_data()['city'].values.tolist()[-12:],line1_y_last=analysis.get_line1_data()['value'].values.tolist()[-12:],line2_x_top=Get_HdfsData.get_line2_data()['model'].values.tolist()[:10],line2_y_top=Get_HdfsData.get_line2_data()['sales'].values.tolist()[:10],)if __name__ == '__main__':app.run(host='127.0.0.1', port=5001)

echarts部分代码


(function() {var myChart = echarts.init(document.querySelector(".map .chart"));echarts.registerMap('china', chinaData);option = {tooltip: {trigger: 'item',formatter: '{b}<br/>{c} (辆)'},visualMap: {min: 1,max: 40000,text: ['High', 'Low'],realtime: false,calculable: true,inRange: {color: ['lightskyblue', 'yellow', 'orangered']},textStyle: {color: 'red' // 设置字体颜色为红色}},series: [{name: 'city_map',type: 'map',map: 'china',data: map_data,roam: true // 在这里添加 roam 属性}]};myChart.setOption(option);// 监听浏览器缩放,图表对象调用缩放resize函数window.addEventListener("resize", function() {myChart.resize();});}
)();

效果图

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/80617.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

brew 安装openjdk查看其版本

使用brew&#xff08;如果你使用Homebrew安装&#xff09; 如果你通过Homebrew安装了OpenJDK&#xff0c;可以使用以下命令来查看安装的版本,&#xff1a; brew list --versions openjdk8 这将会列出所有通过Homebrew安装的OpenJDK版本及其版本号。 3. 查看/usr/libexec/ja…

【Linux网络】构建与优化HTTP请求处理 - HttpRequest从理解到实现

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;博客仓库&#xff1a;https://gitee.com/JohnKingW/linux_test/tree/master/lesson &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &…

Day12(回溯法)——LeetCode51.N皇后39.组合总和

1 前言 今天刷了三道回溯法和一道每日推荐&#xff0c;三道回溯法也迷迷糊糊的&#xff0c;每日推荐把自己绕进去了&#xff0c;虽然是一道之前做过的题的变种。刷的脑子疼。。。今天挑两道回溯题写一下吧&#xff0c;其中有一道是之前做过的N皇后&#xff0c;今天在详细写一写…

初阶数据结构:二叉搜索树

目录 概念 性能 效率分析 二分缺陷 功能 插入 查找 删除 实现 应用 概念 二叉搜索树&#xff08;又称&#xff1a;二叉排序树&#xff09;&#xff0c;是由一些具有特别性质的二叉树衍变而来。 只要一棵二叉树具备以下性质&#xff0c;即可称作二叉搜索树。 【1】若…

详解springcloud gateway工作原理、断言、filter、uri、id、全局跨域、globalfilter等以及关键源码实现

1.gateway概念 网关就是当前微服务项目的"统一入口"程序中的网关就是当前微服务项目对外界开放的统一入口所有外界的请求都需要先经过网关才能访问到我们的程序提供了统一入口之后,方便对所有请求进行统一的检查和管理 2. 网关的主要功能 将所有请求统一经过网关网…

C#中的弱引用使用

弱引用&#xff08;Weak Reference&#xff09;是一种特殊的引用类型&#xff0c;它允许你引用一个对象&#xff0c;但不会阻止该对象被垃圾回收器&#xff08;GC&#xff09;回收。弱引用通常用于需要缓存或跟踪对象&#xff0c;但又不希望因保留引用而导致内存泄漏的场景。弱…

spring响应式编程系列:异步生产数据

目录 示例 大致流程 create new MonoCreate subscribe new LambdaMonoSubscriber monoCreate.subscribe accept success onNext 时序图 类图 数据发布者 MonoCreate 数据订阅者 LambdaMonoSubscriber 订阅的消息体 DefaultMonoSink 本篇文章我们来研究如何将…

MCP Python SDK构建的**SQLite浏览器**的完整操作指南

以下是使用MCP Python SDK构建的SQLite浏览器的完整操作指南&#xff1a; 一、环境准备 安装依赖 # 安装MCP SDK及SQLite支持 pip install mcp sqlite3创建测试数据库 sqlite3 test.db <<EOF CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT); IN…

【Python爬虫基础篇】--3.cookie和session

目录 1.cookie 1.1.定义 1.2.参数 1.3.分类 2.session 3.使用cookie登录微博 4.使用session登录 1.cookie 由于http是一个无状态的协议&#xff0c;请求与请求之间无法相互传递或者记录一些信息&#xff0c;cookie和session正是为了解决这个问题而产生。 例子&#xff1…

风车邮箱系统详细使用指南:Windows与Ubuntu双平台解析

风车邮箱系统V1.2使用手册 风车邮箱系统详细使用指南&#xff1a;Windows与Ubuntu双平台解析 前言 在日常网络活动中&#xff0c;我们经常需要一个临时邮箱来注册各类网站或接收验证码&#xff0c;但不想使用自己的真实邮箱。「风车无线邮箱系统」作为一款优秀的临时邮箱工具…

同样的接口用postman/apifox能跑通,用jmeter跑就报错500

之前没用过jmeter,第一次用调试压测脚本遇到了问题 一样的接口用postman能跑通&#xff0c;用jmeter跑就报错500&#xff0c;百度很多文章都说是该接口需要加一个‘内容编码’改成utf-8,我加了还是不行 后来我就想到apifox好像有隐藏的header&#xff0c;然后开始比较apifox的…

1656打印路径-Floyd回溯/图论-链表/数据结构

蓝桥账户中心 1.税收&#xff1a; “城市的税收”&#xff1a;所以是中介点的税收&#xff0c;经过该点后加上 2.路径&#xff1a; 用数组存储前驱节点从而串成链表 pre[ i ][ j ]代表的是从 i 到 j 的最短路径上 j 的前驱节点是什么 那么便可以pre[ i ][ j ]k 把k加入pa…

Eigen矩阵操作类 (Map, Block, 视图类)

1. Map 类&#xff1a;内存映射&#xff08;零拷贝操作&#xff09; 核心功能 将现有的 C/C 数组或缓冲区映射为 Eigen 矩阵/向量&#xff0c;不复制数据&#xff0c;直接操作原内存。 模板参数 cpp Map<Matrix<Scalar, Rows, Cols, Options, MaxRows, MaxCols>&…

多系统安装经验,移动硬盘,ubuntu grub修改/etc/fstab 移动硬盘需要改成nfts格式才能放steam游戏

总结&#xff1a;我硬盘会自动挂载&#xff0c;直接格式化nfts&#xff0c;steam就能装里面了 机械硬盘装系统真的不行&#xff0c;超级慢游戏还跑不了 --------------------------------------------------------------------底下都不用看 笔记本一个系统&#xff0c;移动硬盘…

JFLAP SOFTWARE 编译原理用(自动机绘图)

csdn全是蛆虫&#xff0c;2mb的软件&#xff0c;都在那里搞收费&#xff0c;我就看不惯&#xff0c;我就放出来&#xff0c;那咋了&#xff01;&#xff01;&#xff01; https://pan.baidu.com/s/1IuEfHScynjCCUF5ScF26KA 通过网盘分享的文件&#xff1a;JFLAP7.1.jar 链接: h…

[Windows] Disk Sorter文件分类管理软件 v16.7.18

[Windows] Disk Sorter文件分类管理 链接&#xff1a;https://pan.xunlei.com/s/VOOl0sDntAdHvlMkc7N0ZOD-A1?pwd966n# Disk Sorter是一个功能强大的文件分类管理软件&#xff0c;允许对本地磁盘、网络共享、NAS设备和企业存储系统中的文件进行分类&#xff0c;并且支持生成…

STM32提高篇: 蓝牙通讯

STM32提高篇: 蓝牙通讯 一.蓝牙通讯介绍1.蓝牙技术类型 二.蓝牙协议栈1.蓝牙芯片架构2.BLE低功耗蓝牙协议栈框架 三.ESP32-C3中的蓝牙功能1.广播2.扫描3.通讯 四.发送和接收 一.蓝牙通讯介绍 蓝牙&#xff0c;是一种利用低功率无线电&#xff0c;支持设备短距离通信的无线电技…

6.1.多级缓存架构

目录 一、多级缓存基础与核心概念 缓存的定义与价值 • 缓存的应用场景&#xff08;高并发、低延迟、减轻数据库压力&#xff09; • 多级缓存 vs 单级缓存的优劣对比 多级缓存核心组件 • 本地缓存&#xff08;Caffeine、Guava Cache&#xff09; • 分布式缓存&#xff08;…

MySQL的MVCC【学习笔记】

MVCC 事务的隔离级别分为四种&#xff0c;其中Read Committed和Repeatable Read隔离级别&#xff0c;部分实现就是通过MVCC&#xff08;Multi-Version Concurrency Control&#xff0c;多版本并发控制&#xff09; 版本链 版本链是通过undo日志实现的&#xff0c; 事务每次修改…

基于OpenMV+STM32+OLED与YOLOv11+PaddleOCR的嵌入式车牌识别系统开发笔记

基于OpenMV、STM32与OLED的嵌入式车牌识别系统开发笔记 基于OpenMV、STM32与OLED的嵌入式车牌识别系统开发笔记系统架构全景 一、实物演示二、OpenMV端设计要点1. 硬件配置优化2. 智能帧率控制算法3. 数据传输协议设计 三、PyTorch后端核心实现&#xff1a;YOLOv11与PaddleOCR的…