2023_Spark_实验二十四:SparkStreaming读取Kafka数据源:使用Direct方式

SparkStreaming读取Kafka数据源:使用Direct方式

一、前提工作

  • 安装了zookeeper

  • 安装了Kafka

  • 实验环境:kafka + zookeeper + spark

  • 实验流程

二、实验内容

实验要求:实现的从kafka读取实现wordcount程序

启动zookeeper

zk.sh start# zk.sh脚本 参考教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

启动Kafka

kf.sh start# kf.sh 参照教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

 (测试用,实验不做)创建Kafka主题,如test,可参考:Kafka的安装与基本操作

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

--bootstrap-server  连接的Kafka Broker主机名称和端口号

--create 创建主题

--describe 查看主题详细描述

# 创建kafka主题测试
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --create --bootstrap-server hd1:9092 --replication-factor 3 --partitions 1 --topic gnutest2# 再次查看first主题的详情
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server hd1:9092 --describe --topic gnutest2

启动Kafka控制台生产者,可参考:Kafka的安装与基本操作

# 创建kafka生产者
/opt/module/kafka_2.12-3.0.0/bin/kafka-console-producer.sh --bootstrap-server hd1:9092 --topic gnutest2

创建maven项目

添加kafka依赖

       <!--- 添加streaming依赖 ---><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>${spark.version}</version></dependency><!--- 添加streaming kafka依赖 ---><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.13</artifactId><version>3.4.1</version></dependency>

编写程序,如下所示:

package examsimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport java.lang/*** @projectName SparkLearning2023  * @package exams  * @className exams.SparkStreamingReadKafka  * @description ${description}  * @author pblh123* @date 2023/12/1 15:19* @version 1.0**/object SparkStreamingReadKafka {def main(args: Array[String]): Unit = {//  1. 创建spark,sc对象if (args.length != 2) {println("您需要输入一个参数")System.exit(5)}val musrl: String = args(0)val spark: SparkSession = new SparkSession.Builder().appName(s"${this.getClass.getSimpleName}").master(musrl).getOrCreate()val sc: SparkContext = spark.sparkContext// 生成streamingContext对象val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))//  2. 代码主体val bststrapServers = args(1)val kafkaParms: Map[String, Object] = Map[String, Object]("bootstrap.servers" -> bststrapServers, //kafka列表"key.deserializer" -> classOf[StringDeserializer], k和v 的序列化类型"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream", //消费者组"auto.offset.reset" -> "latest", //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读"enable.auto.commit" -> (true: java.lang.Boolean) // 消费者不自动提交偏移量)val topics = Array("gnutest2", "t100")// createDirectStream: 主动拉取数据val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParms))val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value))//kafka 是一个key value 格式的, 默认key 为null ,一般用不上val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _)// 打印resultRDD.print()//  3. 关闭sc,spark对象ssc.start()ssc.awaitTermination()ssc.stop()sc.stop()spark.stop()}
}

配置输入参数

生产者追加数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/190346.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JavaSE学习专栏】第04篇 Java面向对象

文章目录 1 面向过程&面向对象2 类和对象2.1 对象的特征2.2 java类及类的成员2.3 类的语法格式 3 创建与初始化对象3.1 类的成员之一&#xff1a;属性3.2 类的成员之二&#xff1a;方法3.3 类的成员之三&#xff1a;构造器&#xff08;构造方法&#xff09;3.3.1 无参构造方…

hexo博客部署到云服务器

欢迎大家到我的博客浏览。hexo博客部署到云服务器 | YinKais Blog 这篇文章带大家将hexo博客部署到云服务器上&#xff01; 一、服务器环境安装 1、安装 node js yum install gcc-c make yum -y install nodejs yum -y install npm 验证 node -v npm -v 2、安装git、ngin…

Linux 命令stat

命令作用 stat命令用于显示文件的状态信息。stat命令的输出信息比ls命令的输出信息要更详细。 查看的信息内容: File 显示文件名 Size 显示文件大小 Blocks 文件使用的数据块总数 IO Block IO块大小 regular file 文件类型&#xff08;常规文件&#xff09; Device …

Postman Post请求上传文件

Postman Post请求上传文件 一、选择post请求方式&#xff0c;输入请求地址 二、填写Headers Key&#xff1a;Content-Type Value&#xff1a;multipart/form-data [{"key":"Content-Type","value":"multipart/form-data","de…

WEBAPI返回图片显示在VUE前端

WEBAPI部分 通过nuget安装Opencvsharp &#xff0c;这部分用来做图像处理 在controller中写如下方法&#xff0c;我要对原图进行旋转使用了Opencv&#xff0c;如果不需要旋转可以用注释的代码 [HttpGet(Name "ShowImage")]public async Task<IActionResult> …

vscode 调试jlink

文章目录 软件使用说明1、启动GDB Server2、下载gdb3、vscode配置4、调试 软件 vscodejlink - (JLinkGDBServer.exe)gcc-arm-none-eabi-10-2020-q4-major (arm-none-eabi-gdb.exe) 使用说明 vscode通过TCP端口调用JLinkGDBServer通过jlink连接和操作设备&#xff0c;vscode不…

怎么让百度快速收录,百度SEO收录工具

百度收录对于一个网站的重要性不言而喻。拥有良好的百度收录意味着网站能够更好地被搜索引擎收录&#xff0c;为用户提供更精准的搜索结果。而怎样实现百度快速收录成为了许多网站管理员关注的焦点。 百度收录的重要性 百度是国内最大的搜索引擎之一&#xff0c;拥有数以亿计的…

工业机器视觉megauging(向光有光)使用说明书(三,轻量级的visionpro)

下来我们说说第二个相机的添加&#xff1a; 第一步&#xff0c;点击相机二&#xff0c;如下&#xff1a; 第二步&#xff0c;点击&#xff1a;加载工具组.xml&#xff0c;加载toolgroupxml2目录下的&#xff1a;工具组.xml 注意&#xff0c;一个相机只能用一个toolgroupxml,第…

【Python表白系列】如何实现爱心光波的表白效果(完整代码)

文章目录 爱心光波环境需求完整代码详细分析系列文章爱心光波 环境需求 python3.11.4PyCharm Community Edition 2023.2.5pyinstaller6.2.0(可选,这个库用于打包,使程序没有python环境也可以运行,如果想发给好朋友的话需要这个库哦~)【注】 python环境搭建请见:https://w…

Python生成器:优雅而高效的迭代器

Python是一种强大而灵活的编程语言&#xff0c;拥有丰富的标准库和特性功能&#xff0c;其中之一就是 生成器。 生成器 是Python中一种非常实用的特性&#xff0c;它能帮助我们编写高效的代码&#xff0c;尤其是在处理大量数据时&#xff0c;它能够帮助我们更有效地处理迭代任…

初识Linux(下).妈妈再也不用担心我Linux找不到门了

文章目录 前言1. date时间相关的指令1.1 date1.2 在设定时间方面示例如下&#xff1a; 1.3 时间戳示例如下&#xff1a; 2. Cal指令示例如下&#xff1a;类似windows 3. find指令&#xff1a;&#xff08;非常重要&#xff09; -name示例如下&#xff1a;类似windows 4. grep指…

AirServer怎么用?如何AirServer进行手机投屏

什么是 AirServer&#xff1f; AirServer 是适用于 Mac 和 PC 的先进的屏幕镜像接收器。 它允许您接收 AirPlay 和 Google Cast 流&#xff0c;类似于 Apple TV 或 Chromecast 设备。AirServer 可以将一个简单的大屏幕或投影仪变成一个通用的屏幕镜像接收器 &#xff0c;是一款…

数据结构:图文详解单链表的各种操作(头插法,尾插法,任意位置插入,删除节点,查询节点,求链表的长度,清空链表)

目录 一.什么是链表 二.链表的实现 节点的插入 头插法 尾插法 指定位置插入 节点的删除 删除第一次出现的关键字节点 删除所有关键字节点 节点的查找 链表的清空 链表的长度 前言&#xff1a;在上一篇文章中&#xff0c;我们认识了线性数据结构中的顺序表&#xff0…

接口测试 —— requests 的基本了解

● requests介绍及安装 ● requests原理及源码介绍 ● 使用requests发送请求 ● 使用requests处理响应 ● get请求参数 ● 发送post请求参数 ● 请求header设置 ● cookie的处理 ● https证书的处理 ● 文件上传、下载 requests介绍 ● requests是python第三方的HTTP…

树莓派4B机器狗的串口通信驱动库pyserial实践

pyserial是一个串口通信驱动库&#xff0c;常用的Windows、Linux、MacOS等都可以安装&#xff0c;这里我使用的是树莓派4B来测试&#xff0c;这块板子还是很强大的&#xff0c;我们可以通过pyserial这个库来操作基于这块板子上的机器狗之类的设备。 1、四足机器狗 本人的是四…

【开源】基于JAVA语言的考研专业课程管理系统

项目编号&#xff1a; S 035 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S035&#xff0c;文末获取源码。} 项目编号&#xff1a;S035&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 考研高校模块2.3 高…

深信服AD负载均衡频繁掉线故障分析

一个由114.114.114.114引起的AD异常 客户反馈深信服负载均衡链路频繁掉线&#xff0c;具体故障现象如下 可以获取到IP地址、网关 两分钟掉一次&#xff0c;持续一个多月&#xff0c;求IT的心理阴影面积&#xff01; 链路监视器只设置了一个114.114.114.114 处理流程&#xff…

SCA技术进阶系列(四):DSDX SBOM供应链安全应用实践

一、SBOM的发展趋势 数字时代&#xff0c;软件已经成为维持生产生活正常运行的必备要素之一。随着容器、中间件、微服务、 DevOps等技术理念的演进&#xff0c;软件行业快速发展&#xff0c;但同时带来软件设计开发复杂度不断提升&#xff0c;软件供应链愈发复杂&#xff0c;软…

MySQL 基础、进阶、运维的学习笔记

1. MySQL 基础篇 1.1 MySQL 概述 1.1.1 数据库相关概念 数据库(Database, 简称 DB): 存储数据的仓库&#xff0c;数据是有组织的进行存储。 数据库管理系统(Database Management System, 简称 DBMS): 操作和管理数据库的大型软件。 SQL(Structured Query Language, 简称 S…

Linux:进程间通信

目录 一、关于进程间通信 二、管道 pipe函数 管道的特点 匿名管道 命名管道 mkfifo 三、system v共享内存 shmget函数(创建) ftok函数(生成key) shmctl函数(删除) shmat/dt函数(挂接/去关联) 四、初识信号量 一、关于进程间通信 首先我们都知道&#xff0c;进程运…