spark-streaming(二)

DStream创建(kafka数据源)

1.在idea中的 pom.xml 中添加依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version>
</dependency>

2.创建一个新的object,并写入以下代码

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord/*** 通过 DirectAPI 0 - 10 消费 Kafka 数据* 消费的 offset 保存在 _consumer_offsets 主题中*/
object DirectAPI {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")val ssc = new StreamingContext(sparkConf, Seconds(3))// 定义 Kafka 相关参数val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092,node02:9092,node03:9092",ConsumerConfig.GROUP_ID_CONFIG -> "kafka","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer])// 通过读取 Kafka 数据,创建 DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("kafka"), kafkaPara))// 提取出数据中的 value 部分val valueDStream = kafkaDStream.map(record => record.value())// WordCount 计算逻辑valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()ssc.start()ssc.awaitTermination()}
}    

3.在虚拟机中,开启kafka、zookeeper、yarn、dfs集群

4.创建一个新的topic---kafka,用于接下来的操作

查看所有的topic(是否创建成功)

开启kafka生产者,用于产生数据

启动idea中的代码,在虚拟机中输入数据

输入后可以在idea中查看到

查看消费进度

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/76969.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVA聚焦OutOfMemoryError 异常

个人主页 文章专栏 在正文开始前&#xff0c;我想多说几句&#xff0c;也就是吐苦水吧…最近这段时间一直想写点东西&#xff0c;停下来反思思考一下。 心中万言&#xff0c;真正执笔时又不知先写些什么。通常这个时候&#xff0c;我都会随便写写&#xff0c;文风极像散文&…

如何在Spring Boot中配置自定义端口运行应用程序

Spring Boot 应用程序默认在端口 8080 上运行嵌入式 Web 服务器&#xff08;如 Tomcat、Jetty 或 Undertow&#xff09;。然而&#xff0c;在开发、测试或生产环境中&#xff0c;开发者可能需要将应用程序配置为在自定义端口上运行&#xff0c;例如避免端口冲突、适配微服务架构…

linux嵌入式(进程与线程1)

Linux进程 进程介绍 1. 进程的基本概念 定义&#xff1a;进程是程序的一次执行过程&#xff0c;拥有独立的地址空间、资源&#xff08;如内存、文件描述符&#xff09;和唯一的进程 ID&#xff08;PID&#xff09;。 组成&#xff1a; 代码段&#xff1a;程序的指令。 数据…

智驭未来:NVIDIA自动驾驶安全白皮书与实验室创新实践深度解析

一、引言&#xff1a;自动驾驶安全的范式革新 在当今数字化浪潮的推动下&#xff0c;全球自动驾驶技术正大步迈入商业化的深水区。随着越来越多的自动驾驶车辆走上道路&#xff0c;其安全性已成为整个行业乃至社会关注的核心命题。在这个关键的转折点上&#xff0c;NVIDIA 凭借…

多模态大模型 Qwen2.5-VL 的学习之旅

Qwen-VL 是阿里云研发的大规模视觉语言模型&#xff08;Large Vision Language Model, LVLM&#xff09;。Qwen-VL 可以以图像、文本、检测框作为输入&#xff0c;并以文本和检测框作为输出。Qwen-VL 系列模型性能强大&#xff0c;具备多语言对话、多图交错对话等能力&#xff…

Redis 与 Memcache 全面对比:功能、性能与应用场景解析

Redis 和 Memcache 都是常用的内存数据库&#xff0c;以下是它们在多个方面的能力比较&#xff1a; 一、数据类型 Redis&#xff1a;支持丰富的数据类型&#xff0c;如字符串&#xff08;String&#xff09;、哈希&#xff08;Hash&#xff09;、列表&#xff08;List&#x…

Oracle--PL/SQL编程

前言&#xff1a;本博客仅作记录学习使用&#xff0c;部分图片出自网络&#xff0c;如有侵犯您的权益&#xff0c;请联系删除 PL/SQL&#xff08;Procedural Language/SQL&#xff09;是Oracle数据库中的一种过程化编程语言&#xff0c;构建于SQL之上&#xff0c;允许编写包含S…

新增优惠券

文章目录 概要整体架构流程技术细节小结 概要 接口分析 一个基本的新增接口&#xff0c;按照Restful风格设计即可&#xff0c;关键是请求参数。之前表分析时已经详细介绍过这个页面及其中的字段&#xff0c;这里不再赘述。 需要特别注意的是&#xff0c;如果优惠券限定了使…

力扣面试经典150题(第二十三题)- KMP算法

问题 给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;则返回 -1 。 示例 1&#xff1a; 输入&#xff1a;haysta…

PostgreSQL 的 MVCC 机制了解

PostgreSQL 的 MVCC 机制了解 PostgreSQL 使用多版本并发控制(MVCC)作为其核心并发控制机制&#xff0c;这是它与许多其他数据库系统的关键区别之一。MVCC 允许读操作不阻塞写操作&#xff0c;写操作也不阻塞读操作&#xff0c;从而提供高度并发性。 一 MVCC 基本原理 1.1 M…

互联网大厂Java面试:RocketMQ、RabbitMQ与Kafka的深度解析

互联网大厂Java面试&#xff1a;RocketMQ、RabbitMQ与Kafka的深度解析 面试场景 面试官&#xff1a;马架构&#xff0c;您好&#xff01;欢迎参加我们的面试。今天我们将围绕消息中间件展开讨论&#xff0c;尤其是RocketMQ、RabbitMQ和Kafka。您有十年的Java研发和架构设计经…

《巧用DeepSeek快速搞定数据分析》书籍分享

文章目录 前言内容简介作者简介购书链接书籍目录 前言 随着大数据时代的到来&#xff0c;数据分析和人工智能技术正迅速改变着各行各业的运作方式。DeepSeek作为先进的人工智能模型&#xff0c;不仅在自然语言处理领域具有广泛应用&#xff0c;还在数据分析、图像识别、推荐系…

4.Three.js 中 Camera 摄像机详解

一、什么是 Camera&#xff1f; 在 Three.js 中&#xff0c;Camera&#xff08;摄像机&#xff09;决定了我们如何观察三维场景。 你可以把它理解为我们“眼睛”的位置和方向&#xff0c;场景中的物体再复杂&#xff0c;如果没有摄像机&#xff0c;就没有“观察角度”&#x…

gem5-gpu教程03 当前的gem5-gpu软件架构(因为涉及太多专业名词所以用英语表达)

Current gem5-gpu Software Architecture 这是当前gem5-gpu软件架构的示意图。 Ruby是在gem5-gpu上下文中用于处理CPU和GPU之间内存访问的高度可配置的内存系统 CudaCore (src/gpu/gpgpu-sim/cuda_core.*, src/gpu/gpgpu-sim/CudaCore.py) Wrapper for GPGPU-Sim shader_cor…

负载均衡的实现方式有哪些?

负载均衡实现方式常见的有: 软件负载均衡、硬件负载均衡、DNS负载均衡 扩展 二层负载均衡&#xff1a;在数据链路层&#xff0c;基于MAC地址进行流量分发&#xff0c;较少见于实际应用中 三层负载均衡&#xff1a;在网络层&#xff0c;基于IP地址来分配流量&#xff0c;例如某…

MyBatis 和 MyBatis-Plus 在 Spring Boot 中的配置、功能对比及 SQL 日志输出的详细说明,重点对比日志输出的配置差异

以下是 MyBatis 和 MyBatis-Plus 在 Spring Boot 中的配置、功能对比及 SQL 日志输出的详细说明&#xff0c;重点对比日志输出的配置差异&#xff1a; 1. MyBatis 和 MyBatis-Plus 核心对比 特性MyBatisMyBatis-Plus定位基础持久层框架MyBatis 的增强版&#xff0c;提供代码生…

《数据结构世界的乐高积木:顺序表的奇幻旅程》

目录 1. 线性表 2. 顺序表 2.1 概念与结构 2.2 分类 2.2.1 静态顺序表 2.2.2 动态顺序表 2.3 动态顺序表的实现 1. 线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。线性表是⼀种在实际中⼴泛使⽤的数据结构&#xff0c;常⻅的…

RHCE 练习二:通过 ssh 实现两台主机免密登录以及 nginx 服务通过多 IP 区分多网站

一、题目要求 1.配置ssh实现A&#xff0c;B主机互相免密登录 2.配置nginx服务&#xff0c;通过多ip区分多网站 二、实验 实验开始前需准备两台 linux 主机便于充当服务端以及客户端&#xff0c;两台主机 IP 如下图&#xff1a; 实验1&#xff1a;配置 ssh 实现 A&#xff0…

第十五届蓝桥杯 2024 C/C++组 好数

题目&#xff1a; 题目描述&#xff1a; 题目链接&#xff1a; 好数 思路&#xff1a; 第一种思路详解&#xff1a; 因为每次检查数都是从个位开始&#xff0c;所以对于每一个数都是先检查奇数位再检查偶数位&#xff0c;即存在先检查奇数位再检查偶数位的循环。注意一次完…

展锐Android13状态栏默认显示电池电量百分比

展锐Android13电池状态默认不显示电池电量百分比&#xff0c;打开 /frameworks/base/packages/SettingsProvider/res/values/defaults.xml 在xml的文件最后&#xff0c;增加一项配置def_show_battery_percent&#xff1a; <?xml version"1.0" encoding"u…