Kafka和Spark Streaming的组合使用学习笔记(Spark 3.5.1)

一、安装Kafka

1.执行以下命令完成Kafka的安装:
cd ~  //默认压缩包放在根目录
sudo tar -zxf  kafka_2.12-2.6.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.6.0 kafka-2.6.0
sudo chown -R qiangzi ./kafka-2.6.0

二、启动Kafaka

1.首先需要启动Kafka,打开一个终端,输入下面命令启动Zookeeper服务:
cd  /usr/local/kafka-2.6.0
./bin/zookeeper-server-start.sh  config/zookeeper.properties

注意:以上现象是Zookeeper服务器已经启动,正在处于服务状态。不要关闭!

2.打开第二个终端输入下面命令启动Kafka服务:
cd  /usr/local/kafka-2.6.0
./bin/kafka-server-start.sh  config/server.properties//加了“&”的命令,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。
bin/kafka-server-start.sh  config/server.properties  &

注意:同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。

三、创建Topic

1.再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:
cd /usr/local/kafka-2.6.0
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsender
2.然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:
./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
cd /usr/local/kafka-2.6.0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordsender

注意,所有这些终端窗口都不要关闭,要继续留着后面使用。

四、Spark准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本号,3.5.1表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark-3.5.1/jars”目录)。此外,还需要把“/usr/local/kafka-2.6.0/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。

cd ~  .jar文件默认放在根目录
sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo cp /usr/local/kafka-2.6.0/libs/kafka-clients-2.6.0.jar /usr/local/spark-3.5.1/jars/

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-streaming-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-token-provider-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

进入下载页面以后,如下图所示,点击红色方框内的“jar”,就可以下载JAR包了。

五、编写Spark Streaming程序使用Kafka数据源

1.编写生产者(Producer)程序
(1)新打开一个终端,然后,执行如下命令创建代码目录和代码文件:
cd /usr/local/spark-3.5.1
mkdir mycode
cd ./mycode
mkdir kafka
mkdir -p kafka/src/main/scala
vi kafka/src/main/scala/KafkaWordProducer.scala
(2)使用vi编辑器新建了KafkaWordProducer.scala

它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object KafkaWordProducer {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +"<messagesPerSec> <wordsPerMessage>")System.exit(1)}val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args// Zookeeper connection propertiesval props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)// Send some messageswhile(true) {(1 to messagesPerSec.toInt).foreach { messageNum =>val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
toString).mkString(" ")print(str)println()val message = new ProducerRecord[String, String](topic, null, str)producer.send(message)}Thread.sleep(1000)}}
}
2.编写消费者(Consumer)程序

在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下创建文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

cd /usr/local/spark-3.5.1/mycode
vi kafka/src/main/scala/KafkaWordCount.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeobject KafkaWordCount{def main(args:Array[String]){val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val sc = new SparkContext(sparkConf)sc.setLogLevel("ERROR")val ssc = new StreamingContext(sc,Seconds(10))ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoopval kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (true: java.lang.Boolean))val topics = Array("wordsender")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))stream.foreachRDD(rdd => {val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))val lines = maped.map(_._2)val words = lines.flatMap(_.split(" "))val pair = words.map(x => (x,1))val wordCounts = pair.reduceByKey(_+_)wordCounts.foreach(println)})ssc.startssc.awaitTermination}
}
3.在路径“file:///usr/local/spark/mycode/kafka/”下创建“checkpoint”目录作为预写式日志的存放路径。
cd ./kafka
mkdir checkpoint
4.继续在当前目录下创建StreamingExamples.scala代码文件,用于设置log4j:
cd /usr/local/spark-3.5.1/mycode/
vi kafka/src/main/scala/StreamingExamples.scala/*StreamingExamples.scala*/
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}                                                                                 /** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}                                                                                                                     }                                                                                                                     } 
5.编译打包程序

现在在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下,就有了如下3个scala文件:

然后,执行下面命令新建一个simple.sbt文件:

cd /usr/local/spark-3.5.1/mycode/kafka/
vim simple.sbt

在simple.sbt中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"

然后执行下面命令,进行编译打包:

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/sbt-1.9.0/sbt/sbt  package

打包成功界面

6. 运行程序

首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:

cd  /usr/local/hadoop-2.10.1
./sbin/start-dfs.sh
或者
start-dfs.sh
start-yarn.sh

启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。
要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。
然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" ./target/scala-2.12/sime-project_2.12-1.0.jar localhost:9092 wordsender  3  5

注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordCount" ./target/scala-2.12/simple-oject_2.12-1.0.jar
运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/10533.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机毕业设计Python地震预测系统 地震数据分析可视化 地震爬虫 大数据毕业设计 Flink Hadoop 深度学习 机器学习 人工智能 知识图谱

学生信息 姓名&#xff1a;  祁浩 题目&#xff1a; 基于Python的中国地震数据分析与可视化系统的设计与实现 学号&#xff1a; 2020135211 班级&#xff1a; 20大数据本科2班 指导教师&#xff1a; 刘思思 答辩过程 学生开题陈述 为了让学习者更好的了解了解地震…

Coze扣子开发指南:AI零代码编程创建插件

在Coze扣子中创建插件&#xff0c;有两种方式&#xff0c;一是用API&#xff0c;具体方式参照上一篇文章《Coze扣子开发指南&#xff1a;用免费API自己创建插件》&#xff0c;还有一种方式就是编程&#xff0c;不过有了AI的帮助&#xff0c;即使不会编程的人&#xff0c;也可以…

HarmonyOS开发案例:【生活健康app之获取成就】(3)

获取成就 本节将介绍成就页面。 功能概述 成就页面展示用户可以获取的所有勋章&#xff0c;当用户满足一定的条件时&#xff0c;将点亮本页面对应的勋章&#xff0c;没有得到的成就勋章处于熄灭状态。共有六种勋章&#xff0c;当用户连续完成任务打卡3天、7天、30天、50天、…

用大于meilisearch-java-0.7.0.jar的报错的解决

Elasticsearch 做为老牌搜索引擎&#xff0c;功能基本满足&#xff0c;但复杂&#xff0c;重量级&#xff0c;适合大数据量。 MeiliSearch 设计目标针对数据在 500GB 左右的搜索需求&#xff0c;极快&#xff0c;单文件&#xff0c;超轻量。 所以&#xff0c;对于中小型项目来说…

阿里云服务器在线安装nginx

⛰️个人主页: 蒾酒 &#x1f525;系列专栏&#xff1a;《nginx实战》 目录 内容简介 安装步骤 1.root用户登录连接阿里云服务器 2.在usr/local下新建nginx目录 3.安装 1安装下载工具 2下载nginx压缩包 3解压 4安装nginx依赖的库 5编译并安装 6启动nginx 7开启…

蓝桥杯-递增三元组(三种解法,二分, 双指针, 前缀和)

给定三个整数数组 A[A1,A2,…AN], B[B1,B2,…BN], C[C1,C2,…CN], 请你统计有多少个三元组 (i,j,k) 满足&#xff1a; 1≤i,j,k≤N Ai<Bj<Ck 输入格式 第一行包含一个整数 N。 第二行包含 N 个整数 A1,A2,…AN。 第三行包含 N 个整数 B1,B2,…BN。 第四行包含 N …

【图像畸变校正】

接上篇文章&#xff1a;【鱼眼&#xff0b;普通相机】相机标定 附代码&#xff1a; 方法一&#xff1a; 使用cv2.undistort """Create May 11, 2024author Wang Jiajun """import cv2 import numpy as npdef correct(img,camera_fileE:/cali…

怎么使用远程桌面传输文件?

微软提供的远程桌面功能是一项强大的工具&#xff0c;可让您在同一网络下远程访问和管理其他计算机。除了远程控制&#xff0c;它还支持文件传输功能&#xff0c;为Windows用户提供了极大的便利。在接下来的内容中&#xff0c;我们将介绍如何使用远程桌面传输文件。 如何从远程…

PADS:生成自交叉平面区域

根据板外形铺铜方法&#xff1a; pads根据板外形铺铜_铺铜如何根据板子形状改变-CSDN博客 根据板外形创建平面区域出现问题&#xff1a; 解决方法&#xff1a;去找结构&#xff0c;让他把出图之前把线合并了

【数据结构】顺序栈

顺序栈 一、相关概念 栈和队列是操作受限的线性表&#xff0c;是限定性的数据结构&#xff1b;栈分为顺序栈和链式栈栈只能在一端进行操作&#xff08;插入、删除&#xff09;栈是限定仅在表尾进行插入或删除操作的线性表&#xff0c;因此&#xff0c;对栈来说&#xff0c;表…

https免费证书获取

获取免费证书的网址&#xff1a; Certbot 1. 进入你的linux系统&#xff0c;先安装snapd&#xff0c; yum install snapd 2. 启动snapd service snapd start 3.安装 Certbot snap install --classic certbot 注意如下出现此错误时&#xff0c;需要先建立snap 软连接后&am…

山东大学软件学院创新项目实训开发日志——第11周

山东大学软件学院创新项目实训开发日志——第11周 项目名称&#xff1a;ModuFusion Visionary&#xff1a;实现跨模态文本与视觉的相关推荐 -------项目目标&#xff1a; 本项目旨在开发一款跨模态交互式应用&#xff0c;用户可以上传图片或视频&#xff0c;并使用文本、点、…

Golang | Leetcode Golang题解之第84题柱状图中最大的矩形

题目&#xff1a; 题解&#xff1a; func largestRectangleArea(heights []int) int {n : len(heights)left, right : make([]int, n), make([]int, n)for i : 0; i < n; i {right[i] n}mono_stack : []int{}for i : 0; i < n; i {for len(mono_stack) > 0 &&am…

SQLite索引名称重复(index already exists)

文章目录 概述报错信息解决方案 概述 SQLite中创建单列索引的方式&#xff0c;跟MySQL类似&#xff1a; CREATE INDEX index_name ON table_name (column_name);但是也有不同的地方&#xff1a; MySQL中索引名称在表内部不重复即可。 SQLite中索引名称在整个库中必须是不重复…

整理项目中经常用到的正则

目录 1、手机号码 2、Email 邮箱 3、QQ 号码 4、非零正整数 5、URL 地址 6、身份证号 项目中难免会经常使用到表单&#xff0c;而表单项校验就需要用到正则&#xff0c; 所以整理总结一下自己项目中使用比较频繁的一些正则校验逻辑。 正则表达式 是由一些具有特殊含义的…

JavaScript之数据类型(3)——object进阶

前言&#xff1a; 利用基础知识来构建对象会发现十分复杂&#xff0c;我们可以结合其他的知识点来为我们object的构建进行优化。 <1>工厂法&#xff1a; 基本格式&#xff1a; function creatObject(属性值1,属性值2,属性值3,...,属性值n) {var 对象名 new Object();对…

在IDEA中使用 Spring Initializr 新建 spring boots 项目

【在IDEA中使用 Spring Initializr 新建 spring boots 项目 - CSDN Apphttp://t.csdnimg.cn/mVs5P Spring Initializr 创建spring boots项目 添加到pom.xml <dependency> <groupId>mysql</groupId> <artifactId>mysql-connec…

Python | Leetcode Python题解之第84题柱状图中最大的矩形

题目&#xff1a; 题解&#xff1a; class Solution:def largestRectangleArea(self, heights: List[int]) -> int:n len(heights)left, right [0] * n, [n] * nmono_stack list()for i in range(n):while mono_stack and heights[mono_stack[-1]] > heights[i]:righ…

代码随想录算法训练营day21 | 513.找树左下角的值、112. 路径总和、106.从中序与后序遍历序列构造二叉树

513.找树左下角的值 迭代法比较简单&#xff0c;层序遍历&#xff0c;找到最下面一层的第一个节点。题目已经说明节点数>1了 class Solution:def findBottomLeftValue(self, root: Optional[TreeNode]) -> int:queue collections.deque()queue.append(root)result ro…

LeetCode题练习与总结:复原IP地址--93

一、题目描述 有效 IP 地址 正好由四个整数&#xff08;每个整数位于 0 到 255 之间组成&#xff0c;且不能含有前导 0&#xff09;&#xff0c;整数之间用 . 分隔。 例如&#xff1a;"0.1.2.201" 和 "192.168.1.1" 是 有效 IP 地址&#xff0c;但是 &qu…