2023_Spark_实验二十五:SparkStreaming读取Kafka数据源:使用Direct方式

SparkStreaming读取Kafka数据源:使用Direct方式

一、前提工作

  • 安装了zookeeper

  • 安装了Kafka

  • 实验环境:kafka + zookeeper + spark

  • 实验流程

二、实验内容

实验要求:实现的从kafka读取实现wordcount程序

启动zookeeper

zk.sh start# zk.sh脚本 参考教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

启动Kafka

kf.sh start# kf.sh 参照教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

 (测试用,实验不做)创建Kafka主题,如test,可参考:Kafka的安装与基本操作

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

--bootstrap-server  连接的Kafka Broker主机名称和端口号

--create 创建主题

--describe 查看主题详细描述

# 创建kafka主题测试
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --create --bootstrap-server hd1:9092 --replication-factor 3 --partitions 1 --topic gnutest2# 再次查看first主题的详情
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server hd1:9092 --describe --topic gnutest2

启动Kafka控制台生产者,可参考:Kafka的安装与基本操作

# 创建kafka生产者
/opt/module/kafka_2.12-3.0.0/bin/kafka-console-producer.sh --bootstrap-server hd1:9092 --topic gnutest2

创建maven项目

添加kafka依赖

       <!--- 添加streaming依赖 ---><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>${spark.version}</version></dependency><!--- 添加streaming kafka依赖 ---><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.13</artifactId><version>3.4.1</version></dependency>

编写程序,如下所示:

package examsimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport java.lang/*** @projectName SparkLearning2023  * @package exams  * @className exams.SparkStreamingReadKafka  * @description ${description}  * @author pblh123* @date 2023/12/1 15:19* @version 1.0**/object SparkStreamingReadKafka {def main(args: Array[String]): Unit = {//  1. 创建spark,sc对象if (args.length != 2) {println("您需要输入一个参数")System.exit(5)}val musrl: String = args(0)val spark: SparkSession = new SparkSession.Builder().appName(s"${this.getClass.getSimpleName}").master(musrl).getOrCreate()val sc: SparkContext = spark.sparkContext// 生成streamingContext对象val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))//  2. 代码主体val bststrapServers = args(1)val kafkaParms: Map[String, Object] = Map[String, Object]("bootstrap.servers" -> bststrapServers, //kafka列表"key.deserializer" -> classOf[StringDeserializer], k和v 的序列化类型"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream", //消费者组"auto.offset.reset" -> "latest", //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读"enable.auto.commit" -> (true: java.lang.Boolean) // 消费者不自动提交偏移量)val topics = Array("gnutest2", "t100")// createDirectStream: 主动拉取数据val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParms))val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value))//kafka 是一个key value 格式的, 默认key 为null ,一般用不上val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _)// 打印resultRDD.print()//  3. 关闭sc,spark对象ssc.start()ssc.awaitTermination()ssc.stop()sc.stop()spark.stop()}
}

配置输入参数

生产者追加数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/207232.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生成元(Digit Generator, ACM/ICPC Seoul 2005, UVa1583)

如果x加上x的各个数字之和得到y&#xff0c;就说x是y的生成元。 给出n&#xff08;1≤n≤100000&#xff09;&#xff0c;求最小生成元。 无解输出0。 例如&#xff0c;n216&#xff0c;121&#xff0c;2005时的解分别为198&#xff0c;0&#xff0c;1979。 我的思路很简单&am…

element-UI中el-scrollbar的使用

在elment-ui中有这么一个滚动条&#xff0c;当鼠标over到内容部分才会显示&#xff0c;移开鼠标之后滚动条就会隐藏起来&#xff0c;相较于原生的滚动条比较美观。 <el-scrollbar> //将滚动条的内部的内容放在里面即可 </el-scrollbar> 在使用过程中&#xff…

SNMP陷阱监控工具

SNMP&#xff08;简单网络管理协议&#xff09;是网络管理的一个重要方面&#xff0c;其中网络设备&#xff08;包括路由器、交换机和服务器&#xff09;在满足预定义条件时将SNMP陷阱作为异步通知发送到中央管理系统。简而言之&#xff0c;每当发生关键服务器不可用或硬件高温…

microblaze仿真

verdivcs (1) vlogan/vcs增加编译选项 -debug_accessall -kdb -lca (2) 在 simulation 选项中加入下面三个选项 -guiverdi UVM_VERDI_TRACE"UVM_AWARERALHIERCOMPWAVE" UVM_TR_RECORD 这里 -guiverdi是启动verdi 和vcs联合仿真。UVM_VERDI_TRACE 这里是记录 U…

第四十二篇,MATLAB on Linux

最近在Ubuntu上安装了一把MATLAB&#xff0c;以下操作亲测有效。 一、版本 Linux&#xff1a;Ubuntu 18.04 MATLAB&#xff1a;R2021a Linux版&#xff0c;910 MATLAB下载链接&#xff1a;提取码MUYU&#xff0c;感谢大佬无私奉献&#xff01; 二、安装 详细的安装步骤不…

linux高级篇基础理论七(Tomcat)

♥️作者&#xff1a;小刘在C站 ♥️个人主页&#xff1a; 小刘主页 ♥️不能因为人生的道路坎坷,就使自己的身躯变得弯曲;不能因为生活的历程漫长,就使求索的 脚步迟缓。 ♥️学习两年总结出的运维经验&#xff0c;以及思科模拟器全套网络实验教程。专栏&#xff1a;云计算技…

算法题,文本左右对齐

/*** 给定一个单词数组 words 和一个长度 maxWidth &#xff0c;重新排版单词&#xff0c;使其成为每行恰好有 maxWidth 个字符&#xff0c;且左右两端对齐的文本。** 你应该使用 “贪心算法” 来放置给定的单词&#xff1b;也就是说&#xff0c;尽可能多地往每行中放置单词。必…

ubuntu22.04系统更改完resolv.conf后 重启网络服务后resolv.conf被重置

vi /etc/systemd/resolved.conf&#xff0c; [Resolve] DNS8.8.8.8 114.114.114.114 192.168.4.2 2.重启域名解析服务 systemctl restart systemd-resolved systemctl enable systemd-resolved 3.备份当前的/etc/resolve.conf&#xff0c;并重新设置/run/systemd/resolve/res…

Docker 安装 Centos和宝塔

1. 安装centos docker pull centos:centos7 2. 创建docker容器&#xff1a;newbt 代表容器名 docker run -i -t -d --name newbt -p 2000:20 -p 2100:21 -p 8000:80 -p 4430:443 -p 8880:888 -p 8888:8888 -p 38444:38444 -p 2200:22 -p 2300:23 -p 2500:25 -p 3306:3306 -p 6…

c++ 解析zip文件,实现对流式文件pptx内容的修改

libzip 官网地址&#xff1a;示例代码 #include <iostream> #include <cstdlib> #include <cstring> #include <ctime> #include <zip.h>//解析原始zip内容&#xff0c;保存为新的zip文件 int ziptest(const char* inputPath, const char* out…

vue pc官网顶部导航栏组件

官网顶部导航分为一级导航和二级导航 导航的样子 文件的层级 router 文件层级 header 组件代码 <h1 class"logo-wrap"><router-link to"/"><img class"logo" :src"$config.company.logo" alt"" /><i…

直面双碳目标,优维科技携手奥意建筑打造绿色低碳建筑数智云平台

优维“双碳”战略合作建筑 为落实创新驱动发展战略&#xff0c;增强深圳工程建设领域科技创新能力&#xff0c;促进技术进步、科技成果转化和推广应用&#xff0c;根据《深圳市工程建设领域科技计划项目管理办法》《深圳市住房和建设局关于组织申报2022年深圳市工程建设领域科…

K8S集群优化的可执行优化

目录 前期环境优化 1.永久关闭交换分区 2.#加载 ip_vs 模块 3.调整内核参数 4.#使用Systemd管理的Cgroup来进行资源控制与管理 5.开机自启kubelet 6.内核参数优化方案 7.etcd优化 默认etcd空间配额大小为 2G&#xff0c;超过 2G 将不再写入数据。通过给etcd配置 --quo…

IO流(Java)

IO流 在学习IO流之前&#xff0c;我们首先了解一下File File File即文件或文件夹路径对象&#xff0c;其示例类可以是存在路径也可以是未创造路径 File有什么用 用于创建或操作文件或文件夹 File常用API API部分看得懂会查会用即可 IO流 IO(Input 读数据 Output写数据…

Qt/QML编程学习之心得:工程中的文件(十二)

Qt生成了工程之后,尤其在QtCreator产生对应的project项目之后,就如同VisualStudio一样,会产生相关的工程文件,那么这些工程文件都是做什么的呢?这里介绍一下。比如产生了一个Qt Widget application,当然如果Qt Quick Application工程会有所不同。 一、.pro和.pro.user …

企业计算机服务器中了360勒索病毒如何解密,勒索病毒解密数据恢复

网络技术的不断应用与发展&#xff0c;为企业的生产运营提供了极大便利&#xff0c;但随之而来的网络安全威胁也不断增加。近期&#xff0c;云天数据恢复中心接到很多企业的求助&#xff0c;企业的计算机服务器遭到了360后缀勒索病毒攻击&#xff0c;导致企业的所有数据被加密&…

游戏策划常用的ChatGPT通用提示词模板

游戏设计&#xff1a;请帮助我设计一个有趣的游戏。 游戏玩法&#xff1a;如何设计游戏的玩法&#xff1f; 游戏机制&#xff1a;如何设计游戏的机制&#xff1f; 游戏平衡&#xff1a;如何平衡游戏中的各种元素&#xff1f; 游戏美术&#xff1a;如何设计游戏的美术风格&a…

『PyTorch学习笔记』如何快速下载huggingface模型/数据—全方法总结

如何快速下载huggingface模型/数据—全方法总结 文章目录 一. 如何快速下载huggingface大模型1.1. IDM(Windows)下载安装连接1.2. 推荐 huggingface 镜像站1.3. 管理huggingface_hub cache-system(缓存系统) 二. 参考文献 一. 如何快速下载huggingface大模型 推荐 huggingface…

希亦洗地机跟追觅洗地机入手哪个更好?追觅跟希亦洗地机深度评估

近年来&#xff0c;洗地机可以同时处理干湿垃圾&#xff0c;同时降低用户在清洁过程中的劳动强度&#xff0c;成为了家居清洁的新宠&#xff0c;但是目前市场上的品牌和型号层出不穷。用户往往很难挑选&#xff0c;本文挑选了两款目前口碑最好的两款洗地机给大家做一个全面的评…

Android 记录一些Framework开发的命令

源码编译流程 1. "source build/envsetup.sh" (source可以用 . 代替&#xff0c;即". build/envsetup.sh") 2. "lunch"&#xff0c;并选择要编译的项目或者"choosecombo" 3. "make idegen -j4" (这里的 -j4 表示用4线程来…