2023_Spark_实验二十五:SparkStreaming读取Kafka数据源:使用Direct方式

SparkStreaming读取Kafka数据源:使用Direct方式

一、前提工作

  • 安装了zookeeper

  • 安装了Kafka

  • 实验环境:kafka + zookeeper + spark

  • 实验流程

二、实验内容

实验要求:实现的从kafka读取实现wordcount程序

启动zookeeper

zk.sh start# zk.sh脚本 参考教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

启动Kafka

kf.sh start# kf.sh 参照教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

 (测试用,实验不做)创建Kafka主题,如test,可参考:Kafka的安装与基本操作

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

--bootstrap-server  连接的Kafka Broker主机名称和端口号

--create 创建主题

--describe 查看主题详细描述

# 创建kafka主题测试
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --create --bootstrap-server hd1:9092 --replication-factor 3 --partitions 1 --topic gnutest2# 再次查看first主题的详情
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server hd1:9092 --describe --topic gnutest2

启动Kafka控制台生产者,可参考:Kafka的安装与基本操作

# 创建kafka生产者
/opt/module/kafka_2.12-3.0.0/bin/kafka-console-producer.sh --bootstrap-server hd1:9092 --topic gnutest2

创建maven项目

添加kafka依赖

       <!--- 添加streaming依赖 ---><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>${spark.version}</version></dependency><!--- 添加streaming kafka依赖 ---><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.13</artifactId><version>3.4.1</version></dependency>

编写程序,如下所示:

package examsimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport java.lang/*** @projectName SparkLearning2023  * @package exams  * @className exams.SparkStreamingReadKafka  * @description ${description}  * @author pblh123* @date 2023/12/1 15:19* @version 1.0**/object SparkStreamingReadKafka {def main(args: Array[String]): Unit = {//  1. 创建spark,sc对象if (args.length != 2) {println("您需要输入一个参数")System.exit(5)}val musrl: String = args(0)val spark: SparkSession = new SparkSession.Builder().appName(s"${this.getClass.getSimpleName}").master(musrl).getOrCreate()val sc: SparkContext = spark.sparkContext// 生成streamingContext对象val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))//  2. 代码主体val bststrapServers = args(1)val kafkaParms: Map[String, Object] = Map[String, Object]("bootstrap.servers" -> bststrapServers, //kafka列表"key.deserializer" -> classOf[StringDeserializer], k和v 的序列化类型"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream", //消费者组"auto.offset.reset" -> "latest", //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读"enable.auto.commit" -> (true: java.lang.Boolean) // 消费者不自动提交偏移量)val topics = Array("gnutest2", "t100")// createDirectStream: 主动拉取数据val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParms))val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value))//kafka 是一个key value 格式的, 默认key 为null ,一般用不上val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _)// 打印resultRDD.print()//  3. 关闭sc,spark对象ssc.start()ssc.awaitTermination()ssc.stop()sc.stop()spark.stop()}
}

配置输入参数

生产者追加数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/207232.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SNMP陷阱监控工具

SNMP&#xff08;简单网络管理协议&#xff09;是网络管理的一个重要方面&#xff0c;其中网络设备&#xff08;包括路由器、交换机和服务器&#xff09;在满足预定义条件时将SNMP陷阱作为异步通知发送到中央管理系统。简而言之&#xff0c;每当发生关键服务器不可用或硬件高温…

microblaze仿真

verdivcs (1) vlogan/vcs增加编译选项 -debug_accessall -kdb -lca (2) 在 simulation 选项中加入下面三个选项 -guiverdi UVM_VERDI_TRACE"UVM_AWARERALHIERCOMPWAVE" UVM_TR_RECORD 这里 -guiverdi是启动verdi 和vcs联合仿真。UVM_VERDI_TRACE 这里是记录 U…

linux高级篇基础理论七(Tomcat)

♥️作者&#xff1a;小刘在C站 ♥️个人主页&#xff1a; 小刘主页 ♥️不能因为人生的道路坎坷,就使自己的身躯变得弯曲;不能因为生活的历程漫长,就使求索的 脚步迟缓。 ♥️学习两年总结出的运维经验&#xff0c;以及思科模拟器全套网络实验教程。专栏&#xff1a;云计算技…

vue pc官网顶部导航栏组件

官网顶部导航分为一级导航和二级导航 导航的样子 文件的层级 router 文件层级 header 组件代码 <h1 class"logo-wrap"><router-link to"/"><img class"logo" :src"$config.company.logo" alt"" /><i…

直面双碳目标,优维科技携手奥意建筑打造绿色低碳建筑数智云平台

优维“双碳”战略合作建筑 为落实创新驱动发展战略&#xff0c;增强深圳工程建设领域科技创新能力&#xff0c;促进技术进步、科技成果转化和推广应用&#xff0c;根据《深圳市工程建设领域科技计划项目管理办法》《深圳市住房和建设局关于组织申报2022年深圳市工程建设领域科…

IO流(Java)

IO流 在学习IO流之前&#xff0c;我们首先了解一下File File File即文件或文件夹路径对象&#xff0c;其示例类可以是存在路径也可以是未创造路径 File有什么用 用于创建或操作文件或文件夹 File常用API API部分看得懂会查会用即可 IO流 IO(Input 读数据 Output写数据…

Qt/QML编程学习之心得:工程中的文件(十二)

Qt生成了工程之后,尤其在QtCreator产生对应的project项目之后,就如同VisualStudio一样,会产生相关的工程文件,那么这些工程文件都是做什么的呢?这里介绍一下。比如产生了一个Qt Widget application,当然如果Qt Quick Application工程会有所不同。 一、.pro和.pro.user …

企业计算机服务器中了360勒索病毒如何解密,勒索病毒解密数据恢复

网络技术的不断应用与发展&#xff0c;为企业的生产运营提供了极大便利&#xff0c;但随之而来的网络安全威胁也不断增加。近期&#xff0c;云天数据恢复中心接到很多企业的求助&#xff0c;企业的计算机服务器遭到了360后缀勒索病毒攻击&#xff0c;导致企业的所有数据被加密&…

『PyTorch学习笔记』如何快速下载huggingface模型/数据—全方法总结

如何快速下载huggingface模型/数据—全方法总结 文章目录 一. 如何快速下载huggingface大模型1.1. IDM(Windows)下载安装连接1.2. 推荐 huggingface 镜像站1.3. 管理huggingface_hub cache-system(缓存系统) 二. 参考文献 一. 如何快速下载huggingface大模型 推荐 huggingface…

希亦洗地机跟追觅洗地机入手哪个更好?追觅跟希亦洗地机深度评估

近年来&#xff0c;洗地机可以同时处理干湿垃圾&#xff0c;同时降低用户在清洁过程中的劳动强度&#xff0c;成为了家居清洁的新宠&#xff0c;但是目前市场上的品牌和型号层出不穷。用户往往很难挑选&#xff0c;本文挑选了两款目前口碑最好的两款洗地机给大家做一个全面的评…

外贸行业的CRM系统和其它CRM有什么区别?

外贸行业对客户管理的追求日益提高&#xff0c;为了应对客户需求的变化和多元性&#xff0c;外贸企业需要借助CRM管理系统实现智能管理。下面&#xff0c;我们将详细探讨外贸CRM的概念、特点和具体应用。 什么是外贸CRM&#xff1f; 外贸CRM是指针对外贸行业的客户关系管理系…

Nginx+Promtail+Loki+Grafana 升级ELK强大工具

最近客户有个新需求,就是想查看网站的访问情况,由于网站没有做google的统计和百度的统计,所以访问情况,只能通过日志查看,通过脚本的形式给客户导出也不太实际,给客户写个简单的页面,咱也做不到 成熟的日志解决方案,那就是ELK,还有现在比较火的Loki,(当然还有很多其…

两电脑共享鼠标键盘方案

一开始使用的是shareMouse 但是需要注册还有很多不稳定问题 后来想买个双拷线&#xff0c;又太贵&#xff0c;感觉不值的。 再后来&#xff0c;发现微软有自己的系统上的 共享方案 &#xff0c;叫做 Mouse without Borders ,而且是免费的&#xff0c;只能在window电脑上使用…

Linus:我休假的时候也会带着电脑,否则会感觉很无聊

目录 Linux 内核最新版本动态 关于成为内核维护者 代码好写&#xff0c;人际关系难处理 内核维护者老龄化 内核中 Rust 的使用 关于 AI 的看法 参考 12.5-12.6 日&#xff0c;Linux 基金会组织的开源峰会&#xff08;OSS&#xff0c;Open Source Summit&#xff09;在日…

报名学历的同学,月底前记得申请抵扣个税!

2024年度专项附加扣除开始确认啦&#xff01; 已经报名学历&#xff08;自考、成考、开放大学&#xff09;的同学&#xff0c;记得去申请抵扣个税哦&#xff01; 每个月的应纳税额可以减免400元呢&#xff0c;学历提升在读这几年算下来&#xff0c;可以省不少钱。 注意&#x…

轮播插件Slick.js使用方法详解

相比于Swiper而选择使用Slick.js的原因主要是因为其兼容不错并且在手机端的滑动效果更顺畅 参数&#xff1a; 1.基本使用&#xff1a;一般使用只需前十个属性 $(.box ul).slick({autoplay: true, //是否自动播放pauseOnHover: false, //鼠标悬停暂停自动播放speed: 1500, //…

C#网络编程(System.Net命名空间)

目录 一、System.Net命名空间 1.Dns类 &#xff08;1&#xff09;示例源码 &#xff08;2&#xff09;生成效果 2.IPAddress类 &#xff08;1&#xff09;示例源码 &#xff08;2&#xff09;生成效果 3.IPEndPoint类 &#xff08;1&#xff09; 示例源码 &#xff0…

【动态规划系列】子数组的最大和

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

自动化测试框架需要具备哪些功能?

我们经常听说使用了某某框架&#xff0c;那框架究竟是什么呢&#xff1f;框架有什么优势和功能&#xff1f; 什么是自动化框架 自动化框架是包含了自动化测试的组织、执行、监控以及报告等流程的工具&#xff0c;是由多个工具、库、模块和API等组成的工具集。自动化框架的目标…

线性代数入门与学习笔记

该内容为重拾部分线性代数知识的学习笔记&#xff0c;内容上更多的是为了解决问题而学习的内容&#xff0c;并非系统化的学习。 针对的问题为&#xff1a;Music算法推导求解过程中的矩阵计算知识。 学习的内容包括&#xff1a;矩阵原理、矩阵行列式、矩阵的秩、线性变换矩阵变换…