Kafka和Spark Streaming的组合使用学习笔记(Spark 3.5.1)

一、安装Kafka

1.执行以下命令完成Kafka的安装:
cd ~  //默认压缩包放在根目录
sudo tar -zxf  kafka_2.12-2.6.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.6.0 kafka-2.6.0
sudo chown -R qiangzi ./kafka-2.6.0

二、启动Kafaka

1.首先需要启动Kafka,打开一个终端,输入下面命令启动Zookeeper服务:
cd  /usr/local/kafka-2.6.0
./bin/zookeeper-server-start.sh  config/zookeeper.properties

注意:以上现象是Zookeeper服务器已经启动,正在处于服务状态。不要关闭!

2.打开第二个终端输入下面命令启动Kafka服务:
cd  /usr/local/kafka-2.6.0
./bin/kafka-server-start.sh  config/server.properties//加了“&”的命令,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。
bin/kafka-server-start.sh  config/server.properties  &

注意:同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。

三、创建Topic

1.再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:
cd /usr/local/kafka-2.6.0
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsender
2.然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:
./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
cd /usr/local/kafka-2.6.0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordsender

注意,所有这些终端窗口都不要关闭,要继续留着后面使用。

四、Spark准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本号,3.5.1表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark-3.5.1/jars”目录)。此外,还需要把“/usr/local/kafka-2.6.0/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。

cd ~  .jar文件默认放在根目录
sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo cp /usr/local/kafka-2.6.0/libs/kafka-clients-2.6.0.jar /usr/local/spark-3.5.1/jars/

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-streaming-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-token-provider-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

进入下载页面以后,如下图所示,点击红色方框内的“jar”,就可以下载JAR包了。

五、编写Spark Streaming程序使用Kafka数据源

1.编写生产者(Producer)程序
(1)新打开一个终端,然后,执行如下命令创建代码目录和代码文件:
cd /usr/local/spark-3.5.1
mkdir mycode
cd ./mycode
mkdir kafka
mkdir -p kafka/src/main/scala
vi kafka/src/main/scala/KafkaWordProducer.scala
(2)使用vi编辑器新建了KafkaWordProducer.scala

它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object KafkaWordProducer {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +"<messagesPerSec> <wordsPerMessage>")System.exit(1)}val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args// Zookeeper connection propertiesval props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)// Send some messageswhile(true) {(1 to messagesPerSec.toInt).foreach { messageNum =>val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
toString).mkString(" ")print(str)println()val message = new ProducerRecord[String, String](topic, null, str)producer.send(message)}Thread.sleep(1000)}}
}
2.编写消费者(Consumer)程序

在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下创建文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

cd /usr/local/spark-3.5.1/mycode
vi kafka/src/main/scala/KafkaWordCount.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeobject KafkaWordCount{def main(args:Array[String]){val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val sc = new SparkContext(sparkConf)sc.setLogLevel("ERROR")val ssc = new StreamingContext(sc,Seconds(10))ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoopval kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (true: java.lang.Boolean))val topics = Array("wordsender")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))stream.foreachRDD(rdd => {val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))val lines = maped.map(_._2)val words = lines.flatMap(_.split(" "))val pair = words.map(x => (x,1))val wordCounts = pair.reduceByKey(_+_)wordCounts.foreach(println)})ssc.startssc.awaitTermination}
}
3.在路径“file:///usr/local/spark/mycode/kafka/”下创建“checkpoint”目录作为预写式日志的存放路径。
cd ./kafka
mkdir checkpoint
4.继续在当前目录下创建StreamingExamples.scala代码文件,用于设置log4j:
cd /usr/local/spark-3.5.1/mycode/
vi kafka/src/main/scala/StreamingExamples.scala/*StreamingExamples.scala*/
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}                                                                                 /** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}                                                                                                                     }                                                                                                                     } 
5.编译打包程序

现在在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下,就有了如下3个scala文件:

然后,执行下面命令新建一个simple.sbt文件:

cd /usr/local/spark-3.5.1/mycode/kafka/
vim simple.sbt

在simple.sbt中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"

然后执行下面命令,进行编译打包:

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/sbt-1.9.0/sbt/sbt  package

打包成功界面

6. 运行程序

首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:

cd  /usr/local/hadoop-2.10.1
./sbin/start-dfs.sh
或者
start-dfs.sh
start-yarn.sh

启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。
要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。
然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" ./target/scala-2.12/sime-project_2.12-1.0.jar localhost:9092 wordsender  3  5

注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordCount" ./target/scala-2.12/simple-oject_2.12-1.0.jar
运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/10533.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机毕业设计Python地震预测系统 地震数据分析可视化 地震爬虫 大数据毕业设计 Flink Hadoop 深度学习 机器学习 人工智能 知识图谱

学生信息 姓名&#xff1a;  祁浩 题目&#xff1a; 基于Python的中国地震数据分析与可视化系统的设计与实现 学号&#xff1a; 2020135211 班级&#xff1a; 20大数据本科2班 指导教师&#xff1a; 刘思思 答辩过程 学生开题陈述 为了让学习者更好的了解了解地震…

Coze扣子开发指南:AI零代码编程创建插件

在Coze扣子中创建插件&#xff0c;有两种方式&#xff0c;一是用API&#xff0c;具体方式参照上一篇文章《Coze扣子开发指南&#xff1a;用免费API自己创建插件》&#xff0c;还有一种方式就是编程&#xff0c;不过有了AI的帮助&#xff0c;即使不会编程的人&#xff0c;也可以…

HarmonyOS开发案例:【生活健康app之获取成就】(3)

获取成就 本节将介绍成就页面。 功能概述 成就页面展示用户可以获取的所有勋章&#xff0c;当用户满足一定的条件时&#xff0c;将点亮本页面对应的勋章&#xff0c;没有得到的成就勋章处于熄灭状态。共有六种勋章&#xff0c;当用户连续完成任务打卡3天、7天、30天、50天、…

用大于meilisearch-java-0.7.0.jar的报错的解决

Elasticsearch 做为老牌搜索引擎&#xff0c;功能基本满足&#xff0c;但复杂&#xff0c;重量级&#xff0c;适合大数据量。 MeiliSearch 设计目标针对数据在 500GB 左右的搜索需求&#xff0c;极快&#xff0c;单文件&#xff0c;超轻量。 所以&#xff0c;对于中小型项目来说…

阿里云服务器在线安装nginx

⛰️个人主页: 蒾酒 &#x1f525;系列专栏&#xff1a;《nginx实战》 目录 内容简介 安装步骤 1.root用户登录连接阿里云服务器 2.在usr/local下新建nginx目录 3.安装 1安装下载工具 2下载nginx压缩包 3解压 4安装nginx依赖的库 5编译并安装 6启动nginx 7开启…

怎么使用远程桌面传输文件?

微软提供的远程桌面功能是一项强大的工具&#xff0c;可让您在同一网络下远程访问和管理其他计算机。除了远程控制&#xff0c;它还支持文件传输功能&#xff0c;为Windows用户提供了极大的便利。在接下来的内容中&#xff0c;我们将介绍如何使用远程桌面传输文件。 如何从远程…

PADS:生成自交叉平面区域

根据板外形铺铜方法&#xff1a; pads根据板外形铺铜_铺铜如何根据板子形状改变-CSDN博客 根据板外形创建平面区域出现问题&#xff1a; 解决方法&#xff1a;去找结构&#xff0c;让他把出图之前把线合并了

https免费证书获取

获取免费证书的网址&#xff1a; Certbot 1. 进入你的linux系统&#xff0c;先安装snapd&#xff0c; yum install snapd 2. 启动snapd service snapd start 3.安装 Certbot snap install --classic certbot 注意如下出现此错误时&#xff0c;需要先建立snap 软连接后&am…

山东大学软件学院创新项目实训开发日志——第11周

山东大学软件学院创新项目实训开发日志——第11周 项目名称&#xff1a;ModuFusion Visionary&#xff1a;实现跨模态文本与视觉的相关推荐 -------项目目标&#xff1a; 本项目旨在开发一款跨模态交互式应用&#xff0c;用户可以上传图片或视频&#xff0c;并使用文本、点、…

Golang | Leetcode Golang题解之第84题柱状图中最大的矩形

题目&#xff1a; 题解&#xff1a; func largestRectangleArea(heights []int) int {n : len(heights)left, right : make([]int, n), make([]int, n)for i : 0; i < n; i {right[i] n}mono_stack : []int{}for i : 0; i < n; i {for len(mono_stack) > 0 &&am…

JavaScript之数据类型(3)——object进阶

前言&#xff1a; 利用基础知识来构建对象会发现十分复杂&#xff0c;我们可以结合其他的知识点来为我们object的构建进行优化。 <1>工厂法&#xff1a; 基本格式&#xff1a; function creatObject(属性值1,属性值2,属性值3,...,属性值n) {var 对象名 new Object();对…

在IDEA中使用 Spring Initializr 新建 spring boots 项目

【在IDEA中使用 Spring Initializr 新建 spring boots 项目 - CSDN Apphttp://t.csdnimg.cn/mVs5P Spring Initializr 创建spring boots项目 添加到pom.xml <dependency> <groupId>mysql</groupId> <artifactId>mysql-connec…

Python | Leetcode Python题解之第84题柱状图中最大的矩形

题目&#xff1a; 题解&#xff1a; class Solution:def largestRectangleArea(self, heights: List[int]) -> int:n len(heights)left, right [0] * n, [n] * nmono_stack list()for i in range(n):while mono_stack and heights[mono_stack[-1]] > heights[i]:righ…

Rust学习笔记(中)

前言 笔记的内容主要参考与《Rust 程序设计语言》&#xff0c;一些也参考了《通过例子学 Rust》和《Rust语言圣经》。 Rust学习笔记分为上中下&#xff0c;其它两个地址在Rust学习笔记&#xff08;上&#xff09;和Rust学习笔记&#xff08;下&#xff09;。 错误处理 pani…

01、什么是ip、协议、端口号知道吗?计算机网络通信的组成是什么?

声明&#xff1a;本教程不收取任何费用&#xff0c;欢迎转载&#xff0c;尊重作者劳动成果&#xff0c;不得用于商业用途&#xff0c;侵权必究&#xff01;&#xff01;&#xff01; 目录 前言 计算机网络 网络ip地址 网络协议 网络端口号 前言 最近有个项目要用到相关文章…

蓝桥杯单片机之模块代码《多样点灯方式》

过往历程 历程1&#xff1a;秒表 历程2&#xff1a;按键显示时钟 历程3&#xff1a;列矩阵按键显示时钟 历程4&#xff1a;行矩阵按键显示时钟 历程5&#xff1a;新DS1302 历程6&#xff1a;小数点精确后两位ds18b20 历程7&#xff1a;35定时器测量频率 历程8&#xff…

大数据Scala教程从入门到精通第六篇:Scala编译结果反编译分析

一&#xff1a;Scala编译结果反编译分析 问题&#xff1a;为什么Scalac之后的生成的class文件有两个&#xff0c;一个带$的&#xff0c;一个不带$的&#xff1f; 不能直接java 执行scala编译的字节码文件。 直接运行的话就会报错&#xff0c;会报一个类没有被找到。 引入类库就…

JavaScript 防抖与节流——以游戏智慧解锁实战奥秘

&#x1f525; 个人主页&#xff1a;空白诗 文章目录 &#x1f3ae; 引言❓ 什么是防抖和节流&#x1f3f9; 防抖(Debounce) - 锁定追击&#xff0c;精确无误&#x1f4cc; 基础概念&#x1f4cc; 适用场景&#x1f4cc; 实战代码&#xff1a;防抖 应用于输入框的实时搜索 &…

Java基础入门day48

day48 JDBC调用关系 tomcat 简介 tomcat是Apache下的一个核心项目&#xff0c;免费开源&#xff0c;支持servlet和jsp。 tomcat技术先进&#xff0c;性能稳定&#xff0c;目前比较流行的web应用服务器 安装 官网&#xff1a; Apache Tomcat - Welcome! 下载 tomcat8.5 解压&a…

Linux入门攻坚——23、DNS和BIND基础入门1

DNS——Domain Name Service&#xff0c;协议&#xff08;C/S&#xff0c;53/udp&#xff0c;53/tcp&#xff09; BIND——Berkeley Internet Name Domain&#xff0c;ISC&#xff08;www.isc.org&#xff09; 互联网络上主机之间的通信依靠的是IP&#xff0c;而人或程序一般使…