spark超大数据批量写入redis

利用spark的分布式优势,一次性批量将7000多万的数据写入到redis中。

# 配置spark接口
import os
import findspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
os.environ["JAVA_HOME"] = "/usr/local/jdk1.8.0_192"
findspark.init("/usr/local/hadoop/spark-2.4.4-bin-hadoop2.6/")
# 设置配置信息
conf = SparkConf()
conf.set("spark.driver.memory", "16g")
conf.set("spark.executor.memory", "16g")
conf.set("spark.driver.maxResultSize","3g")
conf.set("spark.executor.maxResultSize", "3g")
conf.set("spark.ui.showConsoleProgress","false") # 取消进度条显示
spark = SparkSession.builder.appName("local_redis_spark").master("local[*]").enableHiveSupport().config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # 提升日志级别
import redis
# 初始化一个全局函数来获取Redis连接池
def get_redis_connection_pool():# 配置redis参数host='127.0.0.1' # 替换为redis的服务地址即可port=6379password='123456' # 密码db=1 # db库如果不设置 默认为0max_connections=10  # 设置最大连接数redis_pool = redis.ConnectionPool(host=host, port=port, db=db, password=password, max_connections=max_connections)  return redis_pool# 清空旧数据
with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.flushdb() # 清空当前库的所有数据 而flushall()则情况所有库数据
%%time
# 并行处理函数serv_id
def servid_pfun(sdf_data):# 定义redis写入函数 以连接池的方式获取链接 及时释放def write_to_redis(data_dict):with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.mset(data_dict)# 构建一个空字典 批量写入dat = {}for rw in sdf_data:dat[rw.serv_id] = str((rw.r_inst_id, rw.avg_value))# 批量写入write_to_redis(dat)# 并行处理函数one_id
def oneid_pfun(sdf_data):# 定义redis写入函数 以连接池的方式获取链接 及时释放def write_to_redis(data_dict):with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.mset(data_dict)# 构建一个空字典 批量写入dat = {}for rw in sdf_data:dat[rw.r_inst_id] = str((rw.offer_list,rw.filter_prod_offer_inst_list,rw.fuka_serv_offer_list,rw.filter_list,rw.new_serv_id))# 批量写入write_to_redis(dat)# 加载缓存数据
oneid_sdf = spark.sql("""select * from database.table1""")servid_sdf = spark.sql("""select * from database.table2""")# 设置分区数 如果批量写入的内存大小以及最大链接数有限制
# servid_num_parts = 50000
# oneid_num_parts = 10000 # 使用repartition方法进行重新分区
# servid_sdf_part = servid_sdf.repartition(servid_num_parts)
# oneid_sdf_part = oneid_sdf.repartition(oneid_num_parts)# 分批写入redis
servid_sdf.foreachPartition(servid_pfun)
print(f"servid字典缓存成功")
oneid_sdf.foreachPartition(oneid_pfun)
print(f"oneid字典缓存成功")
# 关闭spark
spark.stop() 
print(f"redis缓存插入成功")

执行时间可能跟资源环境有关,测试整个过程大概只需要5分钟左右,非常快速。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/699055.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言中的大小写字母转换

引言 在C语言编程中,我们经常需要进行大小写字母的转换。在 ASCII 码中,大写字母和小写字母之间的差值是固定的,因此我们可以利用这一特性进行大小写转换。本文将详细介绍C语言中大小写字母转换的具体步骤。 大小写转换的原理 在ASCII码表…

【CMake】CMake 中引入 Qt Linguist 翻译功能

【CMake】CMake 中引入 Qt Linguist 翻译功能 文章目录 Qt Linguist 通常使用方法1 - 设置翻译路径2 - 查找 Qt 翻译工具3 - 应用 Qt 翻译工具4 - 参考链接 Qt Linguist 通常使用方法 在编写代码时,将需要翻译的字符串使用 tr() 函数包裹起来,如 this-…

【Web前端笔记12】运算符_数据类型和流程循环语句

12 运算符_数据类型和流程循环语句 一、数据类型 1、数据类型分类 二、基本运算符 1、typeof运算符 2、运算符 (1)加法运算符 (2)算术运算符 (3)赋值运算符(=) (4)比较运算符 (5)布尔运算符 (6)位运算符 3、运算符优先级 4、类型转换 (1)自动转换…

STM32F4XX - uart设置

初始化一个波特率为115200的串口。下面函数参数为115200. 代码如下: void uart1_init(u32 bound) {GPIO_InitTypeDef GPIO_InitStructure;USART_InitTypeDef USART_InitStructure;NVIC_InitTypeDef NVIC_InitStructure;RCC_AHB1PeriphClockCmd(RCC_AHB1Periph_GPIO…

LINUX读取RTC实时时钟时间

linux 读写RTC时间_linux rtc 读写-CSDN博客

shutil.copyfileobj()和BaseHTTPRequestHandler self.wfile在Web 服务器中的应用

shutil.copyfileobj() 是 Python 的 shutil 模块中用于复制文件对象内容的一个函数。它可以将一个文件对象的内容复制到另一个文件对象中。 shutil.copyfileobj(fsrc, fdst, length16*1024) fsrc: 源文件对象,即要从中复制内容的文件对象。fdst: 目标文件对象&…

Java知识点一

hello,大家好!我们今天开启Java语言的学习之路,与C语言的学习内容有些许异同,今天我们来简单了解一下Java的基础知识。 一、数据类型 分两种:基本数据类型 引用数据类型 (1)整型 八种基本数…

mysql进阶学习 | DAY 14

存储引擎 体系结构 连接层 服务层 引擎层 存储层 存储引擎 表类型 查看引擎 查看建表语句 指定存储引擎 ENGINE SHOW engins InnoDB 默认存储引擎 遵循ACID模型 支持事务 行级锁 提高并发访问性能 支持外键 FOREIGN KEY约束 保证数据完整性和正确性 对应文件 xx…

Rust: reqwest库示例

一、异步处理单任务 1、cargo.toml [dependencies] tokio { version "1.0.0", features ["full", "tracing"] } tokio-util { version "0.7.0", features ["full"] } tokio-stream { version "0.1" }…

抖音爬虫批量视频提取功能介绍|抖音评论提取工具

抖音爬虫是指通过编程技术从抖音平台上获取视频数据的程序。在进行抖音爬虫时,需要注意遵守相关法律法规和平台规定,以确保数据的合法获取和使用。 一般来说,抖音爬虫可以实现以下功能之一:批量视频提取。这个功能可以用于自动化地…

大数据-数据可视化-环境部署vue+echarts+显示案例

文章目录 一、安装node.js1 打开火狐浏览器,下载Node.js2 进行解压3 配置环境变量4 配置生效二、安装vue脚手架1 下载vue脚手架,耐心等待。三、创建vue项目并启动1 创建2 启动四、下载echarts.js与axios.js到本地。五、图表显示demo【以下所有操作均在centos上进行】 一、安…

使用C#+NPOI进行Excel处理,实现多个Excel文件的求和统计

一个简易的控制台程序,使用C#NPOI进行Excel处理,实现多个Excel文件的求和统计。 前提: 待统计的Excel格式相同统计结果表与待统计的表格格式一致 引入如下四个动态库: 1. NPOI.dll 2. NPOI.OOXML.dll 3. NPOI.OpenXml4Net.dll …

Python爬虫技术详解:从基础到高级应用,实战与应对反爬虫策略【第93篇—Python爬虫】

前言 随着互联网的快速发展,网络上的信息爆炸式增长,而爬虫技术成为了获取和处理大量数据的重要手段之一。在Python中,requests模块是一个强大而灵活的工具,用于发送HTTP请求,获取网页内容。本文将介绍requests模块的…

Java设计模式 | 简介

设计模式的重要性: 软件工程中,设计模式(design pattern)是对软件设计中普遍存在(反复出现)的各种问题,所提出的解决方案。 这个术语由埃里希 伽玛(Erich Gamma)等人在1…

在项目中应用设计模式的实践指南

目录 ✨✨ 祝屏幕前的您天天开心,每天都有好运相伴。我们一起加油!✨✨ 🎈🎈作者主页: 喔的嘛呀🎈🎈 引言 一. 单例模式(Singleton Pattern) 1、实现单例模式的方式 1…

Python集合详细教程

Python集合是一种无序、可变的数据类型,它是由一组不重复的元素组成的。集合中的元素必须是可哈希的,即不可变的,例如数字、字符串、元组等。 创建集合 可以使用花括号{}或set()函数来创建集合。 复制代码# 使用花括号创建集合 set1 {1, …

【Leetcode】2583. 二叉树中的第 K 大层和

文章目录 题目思路代码结果 题目 题目链接 给你一棵二叉树的根节点 root 和一个正整数 k 。 树中的 层和 是指 同一层 上节点值的总和。 返回树中第 k 大的层和(不一定不同)。如果树少于 k 层,则返回 -1 。 注意,如果两个节点与根…

东南亚印度越南印尼菲律宾媒体海外宣发稿公司怎么找?跨境出海推广新闻营销资源渠道一览

【本篇由言同数字科技有限公司原创】随着全球化和互联网的普及,品牌跨境海外推广在东南亚地区变得越来越重要。印度、越南、印尼和菲律宾作为东南亚地区的重要经济国家,其崛起的中产阶级和不断增长的消费者市场成为国际品牌实现成功的理想目标。本文将探…

eureka 简介和基本使用

Eureka 是Netflix开发的服务发现框架,是Spring Cloud微服务架构中的一部分。它主要用于微服务架构中的服务注册与发现。Eureka由两部分组成:Eureka Server 和 Eureka Client。获取更详细的信息可以访问官网,如下图: Eureka Server…

【Docker 的安装:centos】

文章目录 1 :peach:各版本平台支持情况:peach:2 :peach:CentOS 安装:peach:2.1 :apple:安装依赖:apple:2.2 :apple:安装 Docker:apple:2.3 :apple:实战经验:apple:2.3.1 :lemon:Docker 镜像源修改:lemon:2.3.2 :lemon:Docker 目录修改:lemon: 1 🍑各版本平台支持情况…