【大数据测试spark+kafka-详细教程(附带实例)】

大数据测试:Spark + Kafka 实时数据处理与窗口计算教程

  • 1. 概述
    • 1.1 大数据技术概述
    • 1.2 Apache Kafka 与 Spark 的结合
  • 2. 技术原理与流程
    • 2.1 Kafka 简介
    • 2.2 Spark Streaming 简介
    • 2.3 数据流动与处理流程
  • 3. 环境配置
    • 3.1 安装依赖项
  • 4. 实例:实时数据处理与窗口计算
    • 4.1 Kafka 生产者代码
    • 4.2 Spark Streaming 消费者代码
    • 4.3 解释与操作
  • 5. 运行与测试
    • 5.1 创建 Kafka Topic
    • 5.2 启动 Kafka 生产者
    • 5.3 启动 Spark Streaming 程序
    • 5.4 输出结果
  • 6. 总结

1. 概述

1.1 大数据技术概述

大数据(Big Data)指的是无法用传统数据库技术和工具进行处理和分析的超大规模数据集合。在大数据技术中,实时数据流的处理尤为重要,尤其是如何高效地对海量的实时数据进行采集、存储、处理与分析。

在这方面,Apache KafkaApache Spark 是两个关键技术。Kafka 作为分布式流处理平台,可以高效地进行实时数据流的生产和消费,而 Spark 提供了强大的分布式计算能力,尤其是其扩展的流式计算模块 Spark Streaming,非常适合处理实时数据流。

1.2 Apache Kafka 与 Spark 的结合

  • Kafka 是一个分布式消息队列,可以处理高吞吐量、低延迟的实时数据流。Kafka 被广泛用于日志收集、监控系统、实时数据传输等场景。
  • Spark 是一个统一的分析引擎,支持批量处理、流式处理和图计算。Spark Streaming 是 Spark 的一个流式处理组件,用于实时处理流数据。

通过结合 Kafka 和 Spark,我们可以实现大规模数据的实时处理、聚合和窗口计算。Spark 可以从 Kafka 消费数据流,并进行实时计算与分析,适用于诸如实时日志分析、用户行为分析、实时推荐等场景。


2. 技术原理与流程

2.1 Kafka 简介

Kafka 是一个分布式的消息队列系统,能够实现高吞吐量、可扩展性、容错性。它的基本组成包括:

  • Producer(生产者):负责向 Kafka 发送数据。
  • Consumer(消费者):从 Kafka 中消费数据。
  • Broker(代理):Kafka 的节点,每个节点负责存储消息。
  • Topic(主题):消息被组织在 Topic 中,生产者向 Topic 发送数据,消费者从 Topic 中读取数据。
  • Partition(分区):Kafka 支持水平分区,使得数据可以分布在多个 Broker 上。

2.2 Spark Streaming 简介

Spark Streaming 是 Spark 的流处理模块,它以 DStream(离散流)为基本数据结构,能够实时地处理数据流。DStream 是一个连续的 RDD(弹性分布式数据集),Spark Streaming 将实时流数据划分成一个个小的批次,使用批处理模型对这些小批次进行处理。

2.3 数据流动与处理流程

  1. Kafka Producer:将数据发送到 Kafka Topic。
  2. Kafka Broker:Kafka 集群负责存储和转发数据。
  3. Spark Streaming:通过 Kafka 的消费者接口从 Topic 中消费数据。
  4. 数据处理与计算:在 Spark Streaming 中进行数据聚合、过滤、窗口计算等操作。
  5. 输出结果:将处理后的数据输出到外部系统,如 HDFS、数据库或控制台。

3. 环境配置

3.1 安装依赖项

  1. 安装 Java:确保安装了 Java 8 或更高版本。

    检查版本:

    java -version
    
  2. 安装 Apache Spark:从 Apache Spark 官网 下载并安装 Spark。

  3. 安装 Apache Kafka:从 Kafka 官网 下载并安装 Kafka。

  4. Maven 配置:在 Java 项目中使用 Maven 作为构建工具,添加必要的 Spark 和 Kafka 依赖。

pom.xml 文件中添加 Spark 和 Kafka 的 Maven 依赖:

<dependencies><!-- Spark Core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.1</version></dependency><!-- Spark Streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.1</version></dependency><!-- Spark Streaming Kafka --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.3.1</version></dependency><!-- Kafka Consumer --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>

4. 实例:实时数据处理与窗口计算

4.1 Kafka 生产者代码

以下是一个简单的 Kafka 生产者,用于生成模拟的用户行为日志(如点击事件)并发送到 Kafka Topic logs

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 模拟用户点击日志数据String[] actions = {"click", "view", "scroll"};String[] users = {"user1", "user2", "user3"};// 向 Kafka 发送模拟数据for (int i = 0; i < 100; i++) {String user = users[i % 3];String action = actions[i % 3];String timestamp = String.valueOf(System.currentTimeMillis() / 1000);String value = user + "," + action + "," + timestamp;producer.send(new ProducerRecord<>("logs", null, value));try {Thread.sleep(1000); // 每秒发送一条数据} catch (InterruptedException e) {e.printStackTrace();}}producer.close();}
}

4.2 Spark Streaming 消费者代码

以下是一个 Spark Streaming 程序,它从 Kafka Topic logs 中消费数据并进行窗口计算,统计每个用户在过去 10 秒内的点击次数。

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.List;public class SparkKafkaWindowExample {public static void main(String[] args) throws InterruptedException {// 初始化 Spark StreamingContextJavaStreamingContext jssc = new JavaStreamingContext("local[2]", "SparkKafkaWindowExample", new Duration(2000));// Kafka 配置参数String brokers = "localhost:9092";String groupId = "spark-consumer-group";String topic = "logs";// Kafka 参数设置Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", brokers);kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", groupId);kafkaParams.put("auto.offset.reset", "latest");kafkaParams.put("enable.auto.commit", "false");List<String> topics = Arrays.asList(topic);// 从 Kafka 获取数据流JavaReceiverInputDStream<ConsumerRecord<String, String>> stream =KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams));// 处理每条记录:解析用户、动作和时间戳JavaPairRDD<String, String> userActions = stream.mapToPair(record -> {String[] fields = record.value().split(",");return new Tuple2<>(fields[0], fields[1]); // userId, action});// 定义窗口大小为 10 秒,滑动间隔为 5 秒JavaPairRDD<String, Integer> userClickCounts = userActions.window(new Duration(10000), new Duration(5000)) // 滑动窗口.reduceByKeyAndWindow((Function2<Integer, Integer, Integer>) Integer::sum,new Duration(10000), // 窗口大小:10秒new Duration(5000)   // 滑动间隔5);// 输出每个窗口的用户点击次数userClickCounts.foreachRDD(rdd -> {rdd.collect().forEach(record -> {System.out.println("User: " + record._1() + ", Click Count: " + record._2());});});// 启动流式处理jssc.start();jssc.awaitTermination();}
}

4.3 解释与操作

  • Kafka 配置:配置 Kafka 参数,连接到 Kafka 服务,订阅 Topic logs
  • 数据解析:从 Kafka 消费数据后,解析每条日志(如 user1,click,1609459200)。
  • 窗口计算:使用 window() 定义一个窗口,窗口大小为 10 秒,滑动间隔为 5 秒。使用 reduceByKeyAndWindow() 聚合每个窗口内的用户点击次数。
  • 输出结果:每 5 秒统计一次过去 10 秒内的用户点击次数,输出到控制台。

5. 运行与测试

5.1 创建 Kafka Topic

在 Kafka 中创建 Topic logs

kafka-topics.sh --create --topic logs --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

5.2 启动 Kafka 生产者

运行 Kafka 生产者代码,模拟数据发送到 Kafka:

java KafkaProducerExample

5.3 启动 Spark Streaming 程序

运行 Spark Streaming 程序,消费 Kafka 数据并执行窗口计算:

java SparkKafkaWindowExample

5.4 输出结果

每隔 5 秒输出用户的点击次数,如:

User: user1, Click Count: 3
User: user2, Click Count: 5

6. 总结

通过结合使用 Apache KafkaApache Spark,我们可以高效地处理大规模的实时数据流。Kafka 负责消息的可靠传输,而 Spark Streaming 负责实时计算和分析。使用窗口计算(如 window()reduceByKeyAndWindow()),我们可以在不同时间段内对数据进行聚合,适用于实时监控、推荐系统、用户行为分析等场景。

此架构适用于需要处理大数据、实时响应的应用程序,并能满足高吞吐量、低延迟的要求。


推荐阅读:《大数据 ETL + Flume 数据清洗》,《大数据测试 Elasticsearch》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/58874.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

其他教程:如何设计一个App扫码登录功能,来实现免账号密码登录功能,仅供参考

–本次教程是给大家列举一下常见的扫码登录功能&#xff0c;小程序或app均可参考&#xff0c;如需更安全的模式可多次设计加工&#xff0c;本次只列举概念层面的实现方式

ChatGPT键盘快捷键(按ctrl + /呼出)

文章目录 ChatGPT键盘快捷键- 打开新聊天: Ctrl Shift O- 聚焦聊天输入: Shift Esc- 复制最后一个代码块: Ctrl Shift ;- 复制最后一个回复: Ctrl Shift C- 设置自定义指令: Ctrl Shift I- 切换边栏: Ctrl Shift S- 删除聊天: Ctrl Shift ⌫- 显示快捷方式: Ctrl …

AWTK-HarmonyOS NEXT 发布

AWTK 全称为 Toolkit AnyWhere&#xff0c;是 ZLG 倾心打造的一套基于 C 语言开发的 GUI 框架。旨在为用户提供一个功能强大、高效可靠、简单易用、可轻松做出炫酷效果的 GUI 引擎&#xff0c;支持跨平台同步开发&#xff0c;一次编程&#xff0c;到处编译&#xff0c;跨平台使…

CSRF详解

CSRF&#xff0c;全称是Cross-Site Request Forgery&#xff0c;即跨站请求伪造&#xff0c;也被称为“one click attack”或者session riding&#xff0c;是一种网络攻击方式。它允许攻击者诱导用户在已登录的Web应用程序上执行非预期的操作。 工作原理CSRF攻击通常涉及三个主…

JSON格式

JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;易于人和机器阅读和解析。它基于JavaScript的对象表示法&#xff0c;但被广泛用于多种编程语言。 JSON中的数据类型 字符串&#xff08;String&#xff09;&#xff1a;用双引…

【开发】Java的内存溢出

Java之内存溢出 Java之内存溢出 | OutOfMemoryError 我们都知道&#xff0c;在Java的世界里&#xff0c;由JVM管理着Java中的“垃圾对象”&#xff0c;也就是不被引用的对象&#xff1b;当一个对象不被其它对象引用的时候&#xff0c;该对象就会被垃圾回收器清理掉。但是在某些…

EasyExcel的AbstractColumnWidthStyleStrategy注入CellStyle不生效

设置背景色 CellStyle style workbook.createCellStyle();style.setFillForegroundColor(IndexedColors.RED.getIndex()); // 是设置前景色不是背景色style.setFillPattern(FillPatternType.SOLID_FOREGROUND)EasyExcel.writerTable(0).head(Head1.class).registerWriteHandl…

【51单片机】LED点阵屏 原理 + 使用

学习使用的开发板&#xff1a;STC89C52RC/LE52RC 编程软件&#xff1a;Keil5 烧录软件&#xff1a;stc-isp 开发板实图&#xff1a; 文章目录 LED点阵屏显示原理74HC595 编码LED点阵屏显示笑脸LED点阵屏显示动画 LED点阵屏 点阵屏在开发板的右上角&#xff0c;注意使用前需要…

算法求解-最大和子序列问题(C#)

1、算法目标&#xff1a; 题目描述 设数组a是有n个元素的整数数组&#xff0c;如何从中找出最大和子序列的和&#xff1f; 输入 第一行输入n 第二行输入n个数 输出 输出最大和子序列的和 样例输入 4 2 -1 3 -4 样例输出 4 2、算法实现&#xff1a; using System;class Progr…

深度学习-张量相关

一. 张量的创建 张量简介 张量是pytorch的基本数据结构 张量&#xff0c;英文为Tensor&#xff0c;是机器学习的基本构建模块&#xff0c;是以数字方式表示数据的形式。 例如&#xff0c;图像可以表示为形状为 [3, 224, 224] 的张量&#xff0c;这意味着 [colour_channels, h…

使用vite构建一个react网站,并部署到Netlify上

这篇教程中&#xff0c;我会教你如何用vite快速构建一个react网站&#xff0c;并把网站免费部署到Netlify上&#xff0c;让别人可以经由网址访问你的react网站。 1. 使用vite构建基础框架 npm create vitelatestcd vite-project npm install npm run dev2. 网站内容设计 3. 构…

Fastify Swagger:自动化API文档生成与展示

在现代软件开发中&#xff0c;API文档的生成和维护是一个不可或缺的环节。Fastify Swagger 是一个专为 Fastify 框架设计的插件&#xff0c;它能够自动生成符合 Swagger&#xff08;OpenAPI v2 或 v3&#xff09;规范的文档&#xff0c;从而帮助开发者轻松创建和维护API文档。本…

【网络原理】万字详解 UDP 和 TCP

&#x1f970;&#x1f970;&#x1f970;来都来了&#xff0c;不妨点个关注叭&#xff01; &#x1f449;博客主页&#xff1a;欢迎各位大佬!&#x1f448; 文章目录 1. UDP1.1 UDP 报文格式1.1.1 源端口/目的端口1.1.2 报文长度1.1.3 校验和 2. TCP2.1 TCP 报文结构2.2 TCP 特…

Go 中的泛型,日常如何使用

泛型从 go 的 1.18 开始支持 什么是泛型编程 在泛型出现之前&#xff0c;如果需要计算两数之和&#xff0c;可能会这样写&#xff1a; func Add(a, b int) int {returb a b } 这个很简单&#xff0c;但是只能两个参数都是 int 类型的时候才能调用 如果想要计算两个浮点数…

IoTDB 与 HBase 对比详解:架构、功能与性能

五大方向&#xff0c;洞悉 IoTDB 与 HBase 的详尽对比&#xff01; 在物联网&#xff08;IoT&#xff09;领域&#xff0c;数据的采集、存储和分析是确保系统高效运行和决策准确的重要环节。随着物联网设备数量的增加和数据量的爆炸式增长&#xff0c;开发者和决策者们需要选择…

单片机串口接收状态机STM32

单片机串口接收状态机stm32 前言 项目的芯片stm32转国产&#xff0c;国产芯片的串口DMA接收功能测试不通过&#xff0c;所以要由原本很容易配置的串口空闲中断触发DMA接收数据的方式转为串口逐字节接收的状态机接收数据 两种方式各有优劣&#xff0c;不过我的芯片已经主频跑…

【Python编程实例】-深入理解Python线程安全

深入理解Python线程安全 文章目录 深入理解Python线程安全1、Python中的线程2、线程安全2.1 GIL 及其对线程的影响2.2 竞态条件3、同步原语3.1 线程锁3.2 信号量4、使用同步原语进行通信和协调4.1 事件用于信号通知4.2 条件变量用于条件等待4.3 协调用屏障(Barriers for Coord…

词嵌入方法(Word Embedding)

词嵌入方法&#xff08;Word Embedding&#xff09; Word Embedding是NLP中的一种技术&#xff0c;通过将单词映射到一个空间向量来表示每个单词 ✨️常见的词嵌入方法&#xff1a; &#x1f31f;Word2Vec&#xff1a;由谷歌提出的方法&#xff0c;分为CBOW&#xff08;conti…

【go从零单排】实现枚举类型(Enum)

&#x1f308;Don’t worry , just coding! 内耗与overthinking只会削弱你的精力&#xff0c;虚度你的光阴&#xff0c;每天迈出一小步&#xff0c;回头时发现已经走了很远。 &#x1f4d7;概念 在Go语言中&#xff0c;并没有内置的枚举类型&#xff08;Enum&#xff09;&…

Python爬虫如何处理验证码与登录

Python爬虫如何处理验证码与登录 Python 爬虫在抓取需要登录的网站数据时&#xff0c;通常会遇到两个主要问题&#xff1a;登录验证和验证码处理。这些机制是网站用来防止自动化程序过度抓取数据的主要手段。本文将详细讲解如何使用 Python 处理登录与验证码&#xff0c;以便进…