Spark Streaming的核心功能及其示例PySpark代码

Spark Streaming是Apache Spark中用于实时流数据处理的模块。以下是一些常见功能的实用PySpark代码示例:

  1. 基础流处理:从TCP套接字读取数据并统计单词数量
from pyspark import `SparkContext
from pyspark.streaming import StreamingContext# 创建SparkContext和StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)  # 1秒的批处理间隔# 创建一个DStream,从TCP源读取数据
lines = ssc.socketTextStream("localhost", 9999)# 对每一行数据进行分词,映射为(word, 1)的键值对,然后按单词统计数量
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)# 打印每个RDD中的前10个元素
word_counts.pprint()# 启动流计算
ssc.start()
# 等待流计算结束
ssc.awaitTermination()

在上述代码中:

  • sc 是 SparkContext ,用于与Spark集群交互。
  • ssc 是 StreamingContext ,定义了批处理间隔。
  • lines 是一个 DStream ,从指定的TCP套接字读取数据。
  • words 对每行数据进行分词, word_counts 统计每个单词出现的次数。
  • pprint 方法打印每个批次的前10个元素。
  1. 使用窗口函数
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "WindowedWordCount")
ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用窗口函数,窗口大小为3秒,滑动间隔为1秒
windowed_word_counts = word_counts.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 3, 1)windowed_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在这个示例中:

  • reduceByKeyAndWindow 方法用于在窗口上进行聚合操作。
  • 第一个参数是用于合并窗口内元素的函数,第二个参数是用于移除窗口外元素的函数。
  1. 状态更新
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "StatefulWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")  # 启用检查点def updateFunction(new_values, running_count):if running_count is None:running_count = 0return sum(new_values, running_count)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用updateStateByKey进行状态更新
stateful_word_counts = word_counts.updateStateByKey(updateFunction)stateful_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在上述代码中:

  • updateStateByKey 方法用于维护每个键的状态。
  • updateFunction 定义了如何根据新值和现有状态更新状态。
  1. 与Kafka集成
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilssc = SparkContext("local[2]", "KafkaWordCount")
ssc = StreamingContext(sc, 1)# Kafka参数
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topics = ["test"]# 创建Kafka输入DStream
kvs = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)
lines = kvs.map(lambda x: x[1])words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)word_counts.pprint()ssc.start()
ssc.awaitTermination()

在这个示例中:

  • KafkaUtils.createDirectStream 用于从Kafka主题读取数据。
  • kvs 是一个包含Kafka消息的DStream, lines 提取消息内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/66702.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习系列75:sql大模型工具vanna

1. 概述 vanna是一个可以将自然语言转为sql的工具。简单的demo如下: !pip install vanna import vanna from vanna.remote import VannaDefault vn VannaDefault(modelchinook, api_keyvanna.get_api_key(my-emailexample.com)) vn.connect_to_sqlite(https://va…

【线性代数】列主元法求矩阵的逆

列主元方法是一种用于求解矩阵逆的数值方法,特别适用于在计算机上实现。其基本思想是通过高斯消元法将矩阵转换为上三角矩阵,然后通过回代求解矩阵的逆。以下是列主元方法求解矩阵 A A A 的逆的步骤: [精确算法] 列主元高斯消元法 步骤 1&am…

[0242-06].第06节:SpringBoot对SpringMVC的自动配置

SpringBoot学习大纲 一、基于SpringBoot搭建Web工程: 1.1.编码实现步骤: a.创建SpringBoot项目 b.选中依赖:选中我们所需要的模块 1.2.SSM中的WEB开发配置与SpringBoot中WEB开发自动配置对比: a.SSM中的WEB开发: 1…

【21】Word:德国旅游业务❗

目录 题目 NO1.2.3 NO4 NO5.6 NO7 NO8.9.10.11 题目 NO1.2.3 F12:另存为布局→页面设置→页边距:上下左右选中“德国主要城市”→开始→字体对话框→字体/字号→文本效果:段落对话框→对齐方式/字符间距/段落间距 NO4 布局→表对话框…

什么是软件架构

什么是软件架构 程序员说,软件架构是要决定编写哪些C程序或OO类、使用哪些库和框架 程序经理说,软件架构就是模块的划分和接口的定义 系统分析员说,软件架构就是为业务领域对象的关系建模 配置管理员说,软件架构就是开发出来的…

1/20赛后总结

1/20赛后总结 T1『讨论区管理员』的旅行 - BBC编程训练营 算法:IDA* 分数:0 damn it! Ac_code走丢了~~(主要是没有写出来)~~ T2华强买瓜 - BBC编程训练营 算法:双向DFS或者DFS剪枝 分数:0 Ac_code…

大数据与AI驱动的商业查询平台:企业市场拓展的变革引擎​

在竞争白热化的商业环境里,企业对准确市场信息的高效获取能力,直接关系到业务拓展的成败。商业查询平台借助大数据和人工智能技术,为企业提供精准客户筛选、市场拓展分析以及风险评估服务,正逐渐成为企业市场开拓的得力助手。本文…

redis 各个模式的安装

一、Redis单机安装 1、安装gcc依赖 Redis是C语言编写的,编译需要GCC。 Redis6.x.x版本支持了多线程,需要gcc的版本大于4.9,但是CentOS7的默认版本是4.8.5。 升级gcc版本: yum -y install centos-release-scl yum -y install d…

TiDB 的优势与劣势

TiDB 的优势与劣势 TiDB 作为一款新兴的分布式数据库,在业界逐渐崭露头角。它兼具传统关系型数据库的特性,又充分利用分布式架构的优势。那么,TiDB 究竟有怎样的优缺点呢?今天我们来聊聊 TiDB 的优势与劣势,帮你全面了…

蓝桥杯算法日常|c\c++常用竞赛函数总结备用

一、字符处理相关函数 大小写判断函数 islower和isupper:是C标准库中的字符分类函数,用于检查一个字符是否为小写字母或大写字母,需包含头文件cctype.h(也可用万能头文件包含)。返回布尔类型值。例如: #…

微服务知识——4大主流微服务架构方案

文章目录 1、微服务聚合模式2、微服务共享模式3、微服务代理模式4、微服务异步消息模式 微服务是大型架构的必经之路,也是大厂重点考察对象,下面我就重点详解4大主流微服务架构方案。 1、微服务聚合模式 微服务聚合设计模式,解决了如何从多个…

【HTML+CSS】使用HTML与后端技术连接数据库

目录 一、概述 1.1 HTML前端 1.2 后端技术 1.3 数据库 二、HTML表单示例 三、PHP后端示例 3.1 连接数据库 3.2 接收数据并插入数据库 四、安全性 4.1 防止SQL注入 4.2 数据验证与清洗 五、优化 5.1 索引优化 5.2 查询优化 六、现代Web开发中的最佳实践 6.1 使用…

T-SQL语言的数据库编程

T-SQL语言的数据库编程 1. 引言 在信息化迅速发展的今天,数据库已经成为数据管理和使用的重要工具。其中,T-SQL(Transact-SQL)作为微软SQL Server的扩展SQL语言,不仅用于数据查询和管理,还能够进行复杂的…

通信协议—WebSocket

一、WebSocket编程概念 1.1 什么是WebSocket WebSocket 是一种全双工通信协议,允许在客户端(通常是浏览器)和服务器之间建立持久连接,以实现实时的双向通信。它是 HTML5 标准的一部分,相比传统的 HTTP 请求&#xff…

cadence笔记--画PMU6050原理图和封装

简介 本文主要介绍使用Cadence自己画一个PMU6050的原理图PCB的实际用例,Cadence使用的是24.1版本。 原理图 首先获取PMU6050引脚参数,使用立创商城查询PMU6050型号,点击数据手册如下图所示: 如下图所示,左边是原理图&…

CSS3 3D 转换介绍

CSS3 中的 3D 转换提供了一种在二维屏幕上呈现三维效果的方式,主要包括translate3d、rotate3d、scale3d等转换函数,下面来详细介绍: 1. 3D 转换的基本概念 坐标系 在 CSS3 的 3D 空间中,使用的是右手坐标系。X 轴是水平方向&…

Text2SQL 智能报表方案介绍

0 背景 Text2SQL智能报表方案旨在通过自然语言处理(NLP)技术,使用户能够以自然语言的形式提出问题,并自动生成相应的SQL查询,从而获取所需的数据报表,用户可根据得到结果展示分析从而为结论提供支撑&#…

FFmpeg音视频采集

文章目录 音视频采集音频采集获取设备信息录制麦克风录制声卡 视频采集摄像机画面采集 音视频采集 DirectShow(简称DShow)是一个Windows平台上的流媒体框架,提供了高质量的多媒体流采集和回放功能,它支持多种多样的媒体文件格式&…

【漫话机器学习系列】056.F1值(F1 score)

F1值(F1 Score) 定义 F1值是机器学习中一种用于评估模型性能的指标,特别适合用于 不平衡数据集 的分类任务。它是 精确率(Precision) 和 召回率(Recall) 的调和平均值。通过综合考虑精确率和召…

Mac安装Homebrew

目录 安装修改homeBrew源常用命令安装卸载软件升级软件相关清理相关 安装 官网 https://brew.sh/不推荐官网安装方式(很慢很慢或者安装失败联网失败) 检测是否安装homebrewbrew -v执行安装命令 苹果电脑 常规安装脚本 (推荐 完全体 几分钟就…