Spark Streaming的核心功能及其示例PySpark代码

Spark Streaming是Apache Spark中用于实时流数据处理的模块。以下是一些常见功能的实用PySpark代码示例:

  1. 基础流处理:从TCP套接字读取数据并统计单词数量
from pyspark import `SparkContext
from pyspark.streaming import StreamingContext# 创建SparkContext和StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)  # 1秒的批处理间隔# 创建一个DStream,从TCP源读取数据
lines = ssc.socketTextStream("localhost", 9999)# 对每一行数据进行分词,映射为(word, 1)的键值对,然后按单词统计数量
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)# 打印每个RDD中的前10个元素
word_counts.pprint()# 启动流计算
ssc.start()
# 等待流计算结束
ssc.awaitTermination()

在上述代码中:

  • sc 是 SparkContext ,用于与Spark集群交互。
  • ssc 是 StreamingContext ,定义了批处理间隔。
  • lines 是一个 DStream ,从指定的TCP套接字读取数据。
  • words 对每行数据进行分词, word_counts 统计每个单词出现的次数。
  • pprint 方法打印每个批次的前10个元素。
  1. 使用窗口函数
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "WindowedWordCount")
ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用窗口函数,窗口大小为3秒,滑动间隔为1秒
windowed_word_counts = word_counts.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 3, 1)windowed_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在这个示例中:

  • reduceByKeyAndWindow 方法用于在窗口上进行聚合操作。
  • 第一个参数是用于合并窗口内元素的函数,第二个参数是用于移除窗口外元素的函数。
  1. 状态更新
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "StatefulWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")  # 启用检查点def updateFunction(new_values, running_count):if running_count is None:running_count = 0return sum(new_values, running_count)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用updateStateByKey进行状态更新
stateful_word_counts = word_counts.updateStateByKey(updateFunction)stateful_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在上述代码中:

  • updateStateByKey 方法用于维护每个键的状态。
  • updateFunction 定义了如何根据新值和现有状态更新状态。
  1. 与Kafka集成
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilssc = SparkContext("local[2]", "KafkaWordCount")
ssc = StreamingContext(sc, 1)# Kafka参数
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topics = ["test"]# 创建Kafka输入DStream
kvs = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)
lines = kvs.map(lambda x: x[1])words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)word_counts.pprint()ssc.start()
ssc.awaitTermination()

在这个示例中:

  • KafkaUtils.createDirectStream 用于从Kafka主题读取数据。
  • kvs 是一个包含Kafka消息的DStream, lines 提取消息内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/66702.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习系列75:sql大模型工具vanna

1. 概述 vanna是一个可以将自然语言转为sql的工具。简单的demo如下: !pip install vanna import vanna from vanna.remote import VannaDefault vn VannaDefault(modelchinook, api_keyvanna.get_api_key(my-emailexample.com)) vn.connect_to_sqlite(https://va…

【线性代数】列主元法求矩阵的逆

列主元方法是一种用于求解矩阵逆的数值方法,特别适用于在计算机上实现。其基本思想是通过高斯消元法将矩阵转换为上三角矩阵,然后通过回代求解矩阵的逆。以下是列主元方法求解矩阵 A A A 的逆的步骤: [精确算法] 列主元高斯消元法 步骤 1&am…

[0242-06].第06节:SpringBoot对SpringMVC的自动配置

SpringBoot学习大纲 一、基于SpringBoot搭建Web工程: 1.1.编码实现步骤: a.创建SpringBoot项目 b.选中依赖:选中我们所需要的模块 1.2.SSM中的WEB开发配置与SpringBoot中WEB开发自动配置对比: a.SSM中的WEB开发: 1…

【21】Word:德国旅游业务❗

目录 题目 NO1.2.3 NO4 NO5.6 NO7 NO8.9.10.11 题目 NO1.2.3 F12:另存为布局→页面设置→页边距:上下左右选中“德国主要城市”→开始→字体对话框→字体/字号→文本效果:段落对话框→对齐方式/字符间距/段落间距 NO4 布局→表对话框…

蓝桥杯算法日常|c\c++常用竞赛函数总结备用

一、字符处理相关函数 大小写判断函数 islower和isupper:是C标准库中的字符分类函数,用于检查一个字符是否为小写字母或大写字母,需包含头文件cctype.h(也可用万能头文件包含)。返回布尔类型值。例如: #…

微服务知识——4大主流微服务架构方案

文章目录 1、微服务聚合模式2、微服务共享模式3、微服务代理模式4、微服务异步消息模式 微服务是大型架构的必经之路,也是大厂重点考察对象,下面我就重点详解4大主流微服务架构方案。 1、微服务聚合模式 微服务聚合设计模式,解决了如何从多个…

【HTML+CSS】使用HTML与后端技术连接数据库

目录 一、概述 1.1 HTML前端 1.2 后端技术 1.3 数据库 二、HTML表单示例 三、PHP后端示例 3.1 连接数据库 3.2 接收数据并插入数据库 四、安全性 4.1 防止SQL注入 4.2 数据验证与清洗 五、优化 5.1 索引优化 5.2 查询优化 六、现代Web开发中的最佳实践 6.1 使用…

cadence笔记--画PMU6050原理图和封装

简介 本文主要介绍使用Cadence自己画一个PMU6050的原理图PCB的实际用例,Cadence使用的是24.1版本。 原理图 首先获取PMU6050引脚参数,使用立创商城查询PMU6050型号,点击数据手册如下图所示: 如下图所示,左边是原理图&…

Text2SQL 智能报表方案介绍

0 背景 Text2SQL智能报表方案旨在通过自然语言处理(NLP)技术,使用户能够以自然语言的形式提出问题,并自动生成相应的SQL查询,从而获取所需的数据报表,用户可根据得到结果展示分析从而为结论提供支撑&#…

FFmpeg音视频采集

文章目录 音视频采集音频采集获取设备信息录制麦克风录制声卡 视频采集摄像机画面采集 音视频采集 DirectShow(简称DShow)是一个Windows平台上的流媒体框架,提供了高质量的多媒体流采集和回放功能,它支持多种多样的媒体文件格式&…

【漫话机器学习系列】056.F1值(F1 score)

F1值(F1 Score) 定义 F1值是机器学习中一种用于评估模型性能的指标,特别适合用于 不平衡数据集 的分类任务。它是 精确率(Precision) 和 召回率(Recall) 的调和平均值。通过综合考虑精确率和召…

Mac安装Homebrew

目录 安装修改homeBrew源常用命令安装卸载软件升级软件相关清理相关 安装 官网 https://brew.sh/不推荐官网安装方式(很慢很慢或者安装失败联网失败) 检测是否安装homebrewbrew -v执行安装命令 苹果电脑 常规安装脚本 (推荐 完全体 几分钟就…

一文大白话讲清楚webpack基本使用——9——预加载之prefetch和preload以及webpackChunkName的使用

文章目录 一文大白话讲清楚webpack基本使用——9——预加载之prefetch和preload1. 建议按文章顺序从头看,一看到底,豁然开朗2. preload和prefetch的区别2. prefetch的使用3. preload的使用4. webpackChunkName 一文大白话讲清楚webpack基本使用——9——…

【Elasticsearch 】 聚合分析:桶聚合

🧑 博主简介:CSDN博客专家,历代文学网(PC端可以访问:https://literature.sinhy.com/#/?__c1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,精通Java编…

tensorflow源码编译在C++环境使用

https://tensorflow.google.cn/install/source?hlzh-cn查看tensorflow和其他需要下载软件对应的版本,最好一模一样 1、下载TensorFlow源码 https://github.com/tensorflow/tensorflow 2、安装编译protobuf(3.9.2) protobuf版本要和TensorFlo…

使用 F12 查看 Network 及数据格式

在浏览器中,F12 开发者工具的 “Network” 面板是用于查看网页在加载过程中发起的所有网络请求,包括 API 请求,以及查看这些请求的详细信息和响应数据的。以下以常见的 Chrome 浏览器为例,介绍如何使用 F12 控制台查看 Network 里…

Redis 2.6.12在Win10系统上的安装教程

诸神缄默不语-个人CSDN博文目录 这个版本的安装包是跟同事要的,em,如果真的需要这个版本的话可以跟我要: 解压后双击第一个bat文件,即可挂起Redis服务:

分布式数据库中间件(DDM)的使用场景

华为云分布式数据库中间件(DDM)是一款专注于解决数据库分布式扩展问题的中间件服务,突破了传统数据库的容量和性能瓶颈,能够实现海量数据的高并发访问。以下是九河云总结的DDM的典型使用场景: 1. 互联网应用 在电商、…

Ubuntu16.04 安装OpenCV4.5.4 避坑

Ubuntu16.04 安装C版OpenCV4.5.4 Ubuntu16.04 VSCode下cmakeclanglldb调试c 文章目录 Ubuntu16.04 安装C版OpenCV4.5.41. 下载Opencv压缩包2. 安装Opencv-4.5.43. 配置OpenCV的编译环境4.测试是否安装成功 1. 下载Opencv压缩包 下载Opencv压缩包,选择source版本。…

RabbitMQ集群安装rabbitmq_delayed_message_exchange

1、单节点安装rabbitmq安装延迟队列 安装延迟队列rabbitmq_delayed_message_exchange可以参考这个文章: rabbitmq安装延迟队列-CSDN博客 2、集群安装rabbitmq_delayed_message_exchange 在第二个节点 join_cluster 之后,start_app 就会报错了 (CaseC…