SparkStreaming在实时处理的两个场景示例

简介

Spark Streaming是Apache Spark生态系统中的一个组件,用于实时流式数据处理。它提供了类似于Spark的API,使开发者可以使用相似的编程模型来处理实时数据流。

Spark Streaming的工作原理是将连续的数据流划分成小的批次,并将每个批次作为RDD(弹性分布式数据集)来处理。这样,开发者可以使用Spark的各种高级功能,如map、reduce、join等,来进行实时数据处理。Spark Streaming还提供了内置的窗口操作、状态管理、容错处理等功能,使得开发者能够轻松处理实时数据的复杂逻辑。

Spark Streaming支持多种数据源,包括Kafka、Flume、HDFS、S3等,因此可以轻松地集成到各种数据管道中。它还能够与Spark的批处理和SQL引擎进行无缝集成,从而实现流式处理与批处理的混合使用。
在这里插入图片描述

本文以 TCP、kafka场景讲解spark streaming的使用

消息队列下的信息铺抓

类似消息队列的有redis、kafka等核心组件。
本文以kafka为例,向kafka中实时抓取数据,

pom.xml中添加以下依赖

<dependencies><!-- Spark Core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.0</version></dependency><!-- Spark Streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.2.0</version></dependency><!-- Spark SQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.2.0</version></dependency><!-- Kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- Spark Streaming Kafka Connector --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.2.0</version></dependency><!-- PostgreSQL JDBC --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.24</version></dependency>
</dependencies>

创建项目编写以下代码实现功能

package org.example;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;public class SparkStreamingKafka {public static void main(String[] args) throws InterruptedException {// 创建 Spark 配置SparkConf sparkConf = new SparkConf().setAppName("spark_kafka").setMaster("local[*]").setExecutorEnv("setLogLevel", "ERROR");//设置日志等级为ERROR,避免日志增长导致的磁盘膨胀// 创建 Spark Streaming 上下文JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次// 创建 Spark SQL 会话SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();// 设置 Kafka 相关参数Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("auto.offset.reset", "earliest");// auto.offset.reset可指定参数有// latest:从分区的最新偏移量开始读取消息。// earliest:从分区的最早偏移量开始读取消息。// none:如果没有有效的偏移量,则抛出异常。kafkaParams.put("enable.auto.commit", true);  //采用自动提交offset 的模式kafkaParams.put("auto.commit.interval.ms",2000);//每隔离两秒提交一次commited-offsetkafkaParams.put("group.id", "spark_kafka"); //消费组名称// 创建 Kafka streamCollection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)  //订阅kafka);//定义数据结构StructType schema = new StructType().add("key", DataTypes.LongType).add("value", DataTypes.StringType);kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {// 转换为 DataFrameDataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {return RowFactory.create(record.offset(), record.value());  //将偏移量和value聚合}), schema);// 写入到 PostgreSQLdf.write()//选择写入数据库的模式.mode(SaveMode.Append)//采用追加的写入模式//协议.format("jdbc")//option 参数.option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL//确定表名.option("dbtable", "public.spark_kafka")//指定表名.option("user", "postgres") // PostgreSQL 用户名.option("password", "postgres") // PostgreSQL 密码.save();});// 启动 Spark StreamingstreamingContext.start();// 等待 Spark Streaming 应用程序终止streamingContext.awaitTermination();}
}

在执行代码前,向创建名为spark_kafka的topic

kafka-topics.sh --create --topic spark_kafka --bootstrap-server 10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

向spark_kafka 主题进行随机推数

kafka-producer-perf-test.sh --topic spark_kafka --thrghput 10 --num-records 10000 --record-size 100000 --producer-props bootstrap.servers=10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

运行过程中消费的offset会一直被提交到每一个分区
在这里插入图片描述

此时在数据库中查看,数据已经实时落地到库中
在这里插入图片描述

TCP

TCP环境下,实时监控日志的输出,可用于监控设备状态、环境变化等。当监测到异常情况时,可以实时发出警报。

package org.example;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;public class SparkStreamingKafka {public static void main(String[] args) throws InterruptedException {// 创建 Spark 配置SparkConf sparkConf = new SparkConf().setAppName("spark_kafka") // 设置应用程序名称.setMaster("local[*]") // 设置 Spark master 为本地模式,[*]表示使用所有可用核心// 设置日志等级为ERROR,避免日志增长导致的磁盘膨胀.setExecutorEnv("setLogLevel", "ERROR");// 创建 Spark Streaming 上下文JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次// 创建 Spark SQL 会话SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();// 设置 Kafka 相关参数Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092"); // Kafka 服务器地址kafkaParams.put("key.deserializer", StringDeserializer.class); // key 反序列化器类kafkaParams.put("value.deserializer", StringDeserializer.class); // value 反序列化器类kafkaParams.put("auto.offset.reset", "earliest"); // 从最早的偏移量开始消费消息kafkaParams.put("enable.auto.commit", true);  // 采用自动提交 offset 的模式kafkaParams.put("auto.commit.interval.ms", 2000); // 每隔两秒提交一次 committed-offsetkafkaParams.put("group.id", "spark_kafka"); // 消费组名称// 创建 Kafka streamCollection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)  // 订阅 Kafka);// 定义数据结构StructType schema = new StructType().add("key", DataTypes.LongType).add("value", DataTypes.StringType);kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {// 转换为 DataFrameDataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {return RowFactory.create(record.offset(), record.value());  // 将偏移量和 value 聚合}), schema);// 写入到 PostgreSQLdf.write()// 选择写入数据库的模式.mode(SaveMode.Append) // 采用追加的写入模式// 协议.format("jdbc")// option 参数.option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL// 确定表名.option("dbtable", "public.spark_kafka") // 指定表名.option("user", "postgres") // PostgreSQL 用户名.option("password", "postgres") // PostgreSQL 密码.save();});// 启动 Spark StreamingstreamingContext.start();// 等待 Spark Streaming 应用程序终止streamingContext.awaitTermination();}
}

在10.0.0.108 打开9999端口键入数值 ,使其被spark接收到并进行运算

nc -lk 9999

开启端口可以键入数值 此时会在IDEA的控制台显示其计算值
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/714359.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

适配器模式 详解 设计模式

适配器模式 适配器模式是一种结构型设计模式&#xff0c;其主要作用是解决两个不兼容接口之间的兼容性问题。适配器模式通过引入一个适配器来将一个类的接口转换成客户端所期望的另一个接口&#xff0c;从而让原本由于接口不匹配而无法协同工作的类能够协同工作。 结构 适配…

想要调用淘宝开放平台API,没有申请应用怎么办?

用淘宝自定义API接口可以访问淘宝开放平台API。 custom-自定义API操作 taobao.custom 公共参数 注册账号获取API请求地址 名称类型必须描述keyString是调用key&#xff08;必须以GET方式拼接在URL中&#xff09;secretString是调用密钥api_nameString是API接口名称&#xf…

Docker与虚拟机比较

在对比Docker和虚拟机前&#xff0c;先简单了解下虚拟化&#xff0c;明确Docker和虚拟机分别对应的虚拟化级别&#xff0c;然后对Docker和虚拟机进行比较。需要注意的是&#xff0c;Docker和虚拟机并没有什么可比性&#xff0c;而是Docker使用的容器技术和虚拟机使用的虚拟化技…

【K8S类型系统】一文梳理 K8S 各类型概念之间的关系(GVK/GVR/Object/Schema/RestMapper)

参考 k8s 官方文档 https://kubernetes.io/zh-cn/docs/reference/https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.29/ 重点 Kubernetes源码学习-kubernetes基础数据结构 - 知乎 重点 Kubernetes类型系统 | 李乾坤的博客 重点 k8s源码学习-三大核心数…

前端学习第二天-html提升

达标要求 了解列表的分类 熟练掌握列表的用法 熟练掌握表格的结构构成 合并单元格 表单的组成 熟练掌握表单控件分类的使用 1.列表 1.1 无序列表 <ul>&#xff1a;定义无序列表&#xff0c;并且只能包含<li>子元素。 <li>&#xff1a;定义列表项&a…

LZO索引文件失效说明

在hive中创建lzo文件和索引时&#xff0c;进行查询时会出现问题.hive的默认输入格式是开启小文件合并的&#xff0c;会把索引也合并进来。所以要关闭hive小文件合并功能&#xff01;

Matlab:元胞自动机

元胞自动机是一种基于离散空间的动态系统&#xff0c;由许多简单单元按照某些规则进行相互作用和演化而形成的复杂结构。元胞自动机可以用于模拟物理、生物、社会等领域的现象&#xff0c;以及进行优化、图像处理、噪声生成等方面的应用。 例1&#xff1a;生命游戏 nextState…

maven项目报错Cannot resolve plugin org.apache.maven.plugins:maven-war-plugin:2.2

如果IDEA整合maven没有问题&#xff0c;还是报这个错误&#xff0c;很大可能是由于在下载过程中存在网络问题&#xff0c;导致文件下载一半而停止&#xff0c;但是已经在仓库中存在这个文件夹&#xff0c;解决方法是删除文件夹重新下载即可。 删除本地仓库下的\org\apache\mav…

(算法)位运算

常见的位运算符&#xff1a; 给定一个数n判断他的二进制第x位是0还是1 把第x位修改为1 因为是只是修改n的某个位置&#xff0c;所以不应该移动改变n 既然修改为1&#xff0c;那么就要想到 | 运算符 把第x位修改为0 因为修改为0,所以要用&运算符 位图思想 判定字符串…

二维码门楼牌管理系统技术服务的深度解析

文章目录 前言一、标准地址名称的定义与重要性二、二维码门楼牌管理系统的核心技术三、标准地址名称在二维码门楼牌管理中的应用四、二维码门楼牌管理系统的优势与挑战五、展望未来 前言 在数字化浪潮中&#xff0c;二维码门楼牌管理系统以其高效、便捷的特性&#xff0c;正逐…

【一】【算法分析与设计】基础测试

排列式 题目描述 7254是一个不寻常的数&#xff0c;因为它可以表示为7254 39 x 186&#xff0c;这个式子中1~9每个数字正好出现一次 输出所有这样的不同的式子&#xff08;乘数交换被认为是相同的式子&#xff09; 结果小的先输出&#xff1b;结果相同的&#xff0c;较小的乘…

js 实战小案例

实战 时间 js 格式化时间 <script type"text/javascript">function formatDate(date) { let year date.getFullYear(); let month String(date.getMonth() 1).padStart(2, 0); // getMonth() 返回的月份是从0开始的&#xff0c;所以要加1&#xff0c;并…

【go从入门到精通】go包,内置类型和初始化顺序

大家好&#xff0c;这是我给大家准备的新的一期专栏&#xff0c;专门讲golang&#xff0c;从入门到精通各种框架和中间件&#xff0c;工具类库&#xff0c;希望对go有兴趣的同学可以订阅此专栏。 go基础 。 Go文件名&#xff1a; 所有的go源码都是以 ".go" 结尾&…

Mamba 环境安装:causal-conv1d和mamba-ssm报错解决办法

问题描述&#xff1a; 在执行命令 pip install causal_conv1d 和 mamba_ssm 出错&#xff1a; 解决方案&#xff1a; 1、使用网友配置好的Docker环境&#xff0c;参考&#xff1a;解决causal_conv1d和mamba_ssm无法安装 -&#xff1e; 直接使用Mamba基础环境docker镜像 DockH…

java实现图片转pdf,并通过流的方式进行下载(前后端分离)

首先需要导入相关依赖&#xff0c;由于具体依赖本人也不是记得很清楚了&#xff0c;所以简短的说一下。 iText&#xff1a;PDF 操作库&#xff0c;用于创建和操作 PDF 文件。可通过 Maven 或 Gradle 引入 iText 依赖。 MultipartFile&#xff1a;Spring 框架中处理文件上传的类…

一台工控机的能量

使用Docker搭建EPICS的IOC记录 Zstack EPICS Archiver在小课题组的使用经验 以前电子枪调试&#xff0c;用一台工控机跑起束测后台&#xff0c;这次新光源用的电子枪加工回来又是测试&#xff0c;又是用一台工控机做起重复的事&#xff0c;不过生命在于折腾&#xff0c;重复的…

stm32——hal库学习笔记(IIC)

一、IIC总线协议介绍&#xff08;掌握&#xff09; 二、AT24C02介绍&#xff08;了解&#xff09; 三、AT24C02读写时序&#xff08;掌握&#xff09; 四、AT24C02驱动步骤&#xff08;掌握&#xff09; 五、编程实战&#xff08;掌握&#xff09; myiic.c #include "./B…

汽车虚拟仿真技术的实现、应用和未来

汽车虚拟仿真技术是一种利用计算机模拟汽车运行的技术&#xff0c;以实现对汽车行为的分析、评估和改进。汽车虚拟仿真技术是汽车工业中重要的开发设计和测试工具&#xff0c;可以大大缩短产品研发周期、降低研发成本和提高产品质量。本文将从汽车虚拟仿真技术的实现过程、应用…

Ubuntu18.04 系统上配置并运行SuperGluePretrainedNetwork(仅使用CPU)

SuperGlue是Magic Leap在CVPR 2020上展示的研究项目&#xff0c;它是一个图神经网络&#xff08;Graph Neural Network&#xff09;和最优匹配层&#xff08;Optimal Matching layer&#xff09;的结合&#xff0c;训练用于对两组稀疏图像特征进行匹配。这个项目提供了PyTorch代…

前端的文字的字体应该如何设置

要设置文字的字体&#xff0c;在CSS中使用font-family属性。这个属性可以接受一个或多个字体名称作为其值&#xff0c;浏览器会按照列表中的顺序尝试使用这些字体渲染文本。如果第一个字体不可用&#xff0c;浏览器会尝试使用列表中的下一个字体&#xff0c;依此类推。 字体设…