【数据采集与预处理】数据接入工具Kafka

目录

一、Kafka简介

(一)消息队列

(二)什么是Kafka

二、Kafka架构

三、Kafka工作流程分析

(一)Kafka核心组成

(二)写入流程

(三)Zookeeper 存储结构

(四)Kafka 消费过程

四、Kafka准备工作

(一)Kafka安装配置

(二)启动Kafka

(三)测试Kafka是否正常工作

五、编写Spark Streaming程序使用Kafka数据源


一、Kafka简介

(一)消息队列

消息队列内部实现原理

1、点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
        点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。

2、发布/订阅模式(一对多,数据生产后,推送给所有订阅者)
        发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

(二)什么是Kafka

        Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

在流式计算中,Kafka 一般用来缓存数据,Storm 通过消费 Kafka 的数据进行计算。
1、Apache Kafka 是一个开源消息系统。是由 Apache 软件基金会开发的一个开源消息系统项目。
2、Kafka 最初是由 LinkedIn 公司开发,并于 2011 年初开源。2012 年 10 月从 Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
3、Kafka 是一个分布式消息队列。Kafka 对消息保存时根据 Topic 进行归类,发送消息者称为 Producer,消息接受者称为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个实例(server)称为 broker。
4、无论是 kafka 集群,还是 consumer 都依赖于 zookeeper 集群保存一些 meta 信息,来保证系统可用性。

二、Kafka架构

1、Producer :消息生产者,就是向 kafka broker 发消息的客户端;
2、Consumer :消息消费者,向 kafka broker 取消息的客户端;
3、Topic :可以理解为一个队列;
4、Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 CG。topic 的消息会复制(不是真的复制,是概念上的)到所有的 CG,但每个 partion 只会把消息发给该 CG 中的一个 consumer。如果需要实现广播,只要每个 consumer 有一个独立的 CG 就可以了。要实现单播只要所有的 consumer 在同一个 CG。用 CG 还可以将 consumer 进行自由的分组而不需要多次发送消息到不同的 topic;
5、Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic;
6、Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。kafka 只保证按一个 partition 中的顺序将消息发给consumer,不保证一个 topic 的整体(多个 partition 间)的顺序;
7、Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是 00000000000.kafka。

三、Kafka工作流程分析

(一)Kafka核心组成

(二)写入流程

Producer写入流程:

1)producer 先从 zookeeper 的 "/brokers/.../state"节点找到该 partition 的 leader
2)producer 将消息发送给该 leader
3)leader 将消息写入本地 log
4)followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
5)leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK

(三)Zookeeper 存储结构

注意:producer 不在 zk 中注册,消费者在 zk 中注册。 

(四)Kafka 消费过程

消费者组:

        消费者是以 consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费者读取,但是多个 group 可以同时消费这个 partition。在图中,有一个由三个消费者组成的 group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
        在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的消费者读取的分区。

四、Kafka准备工作

(一)Kafka安装配置

1、到官网下载jar包,保存至“/usr/local/uploads”目录下。

Apache Kafkaicon-default.png?t=N7T8https://kafka.apache.org/downloads

2、解压安装Kafka,并重命名解压后的文件夹。

[root@bigdata uploads]# tar -zxvf kafka_2.11-0.8.2.2.tgz -C /usr/local
[root@bigdata uploads]# cd ..
[root@bigdata local]# mv kafka_2.11-0.8.2.2/ kafka

3、配置Spark环境

[root@bigdata local]# cd ./spark/conf
[root@bigdata conf]# vi spark-env.sh

在文件的第一行接着添加如下内容: 

:/usr/local/spark/examples/jars/*:/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*

接着,在“/usr/local/spark/jars”目录下新建文件夹kafka,并将“/usr/local/kafka/libs/”目录下的所有jar包都拷贝到“/usr/local/spark/jars/kafka”目录下。

[root@bigdata spark]# cd /usr/local/spark/jars
[root@bigdata jars]# mkdir kafka
[root@bigdata jars]# cd kafka
[root@bigdata kafka]# cp /usr/local/kafka/libs/* .

然后,将“/usr/local/uploads/”下的spark-streaming-kafka-0-8_2.11-2.4.0.jar包也拷贝到“/usr/local/spark/jars/kafka”目录下。

[root@bigdata kafka]# cp /usr/local/uploads/spark-streaming-kafka-0-8_2.11-2.4.0.jar .

spark-streaming-kafka-0-8_2.11-2.4.0.jar的下载地址:

http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.4.0

下图是拷贝完成后的“/usr/local/spark/jars/kafka”目录下的所有jar包。

这样,Spark环境就配好了。

(二)启动Kafka

1、启动Zookeeper服务

打开一个终端,输入下面命令启动Zookeeper服务:

[root@bigdata kafka]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/zookeeper-server-start.sh config/zookeeper.properties

千万不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。

2、启动Kafka服务

打开第二个终端,然后输入下面命令启动Kafka服务:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# bin/kafka-server-start.sh config/server.properties

千万不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了

(三)测试Kafka是否正常工作

再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsendertest”的Topic:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/kafka-topics.sh  --create  --zookeeper  localhost:2181 --replication-factor  1  --partitions  1  --topic  wordsendertest
#可以用list列出所有创建的Topic,验证是否创建成功
[root@bigdata kafka]# ./bin/kafka-topics.sh  --list  --zookeeper  localhost:2181

replication-factor:每个partition的副本个数 

下面用生产者(Producer)来产生一些数据,请在当前终端(记作“数据源终端”)内继续输入下面命令:

[root@bigdata kafka]# ./bin/kafka-console-producer.sh  --broker-list  localhost:9092  --topic  wordsendertest

上面命令执行后,就可以在当前终端内用键盘输入一些英文单词,比如可以输入:

hello hadoop

hello spark

现在可以启动一个消费者,来查看刚才生产者产生的数据。请另外打开第四个终端,输入下面命令:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/kafka-console-consumer.sh  --zookeeper  localhost:2181  --topic  wordsendertest  --from-beginning

可以看到,屏幕上会显示出如下结果,也就是刚才在另外一个终端里面输入的内容:

五、编写Spark Streaming程序使用Kafka数据源

在“/home/zhc/mycode/”路径下新建文件夹sparkstreaming,再在该文件夹下新建py文件KafkaWordCount.py。

#/home/zhc/mycode/sparkstreaming/KafkaWordCount.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilsif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: KafkaWordCount.py <zk> <topic>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingKafkaWordCount")ssc = StreamingContext(sc, 1)zkQuorum, topic = sys.argv[1:]kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})lines = kvs.map(lambda x: x[1])counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)counts.pprint()ssc.start()ssc.awaitTermination()

新建一个终端(记作“流计算终端”),执行KafkaWordCount.py,命令如下:

[root@bigdata zhc]# cd /home/zhc/mycode
[root@bigdata mycode]# mkdir sparkstreaming
[root@bigdata mycode]# cd sparkstreaming
[root@bigdata sparkstreaming]# vi KafkaWordCount.py
[root@bigdata sparkstreaming]# spark-submit KafkaWordCount.py localhost:2181 wordsendertest

这时再切换到之前已经打开的“数据源终端”,用键盘手动敲入一些英文单词,在流计算终端内就可以看到类似如下的词频统计动态结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/598948.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

竞赛保研 基于机器视觉的银行卡识别系统 - opencv python

1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于深度学习的银行卡识别算法设计 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f9ff; 更多资料, 项目分享&#xff1a; https://gitee.com/dancheng…

Linux系统安全

作为一种开放源代码的操作系统&#xff0c;linux服务器以其安全、高效和稳定的显著优势而得以广泛应用。 账号安全控制 用户账号是计算机使用者的身份凭证或标识&#xff0c;每个要访问系统资源的人&#xff0c;必须凭借其用户账号 才能进入计算机.在Linux系统中&#xff0c;提…

MIGO向成本中心发料,从成本中心收货

向成本中心发料&#xff0c;首先在MM03查看物料是否有库存&#xff0c;物料的计价标准和产成品的计价标准价是否同一种&#xff0c;S价或者V价 首先&#xff0c;“会计1”视图&#xff0c;查看物料库存 “成本2”视图查看标准成本发布 1、MIGO发货&#xff0c;选&#xff1a;A…

Solid Converter 10.1(PDF转换器)软件安装包下载及安装教程

Solid Converter 10.1下载链接&#xff1a;https://docs.qq.com/doc/DUkdMbXRpZ255dXFT 1、选中下载好的安装包右键解压到【Solid Converter 10.1.11102.4312】文件夹。 2、选中"solidconverter"右键以管理员身份运行 3、选择”自定义安装”&#xff0c;勾选”我已阅…

MySql 1170-BLOB/TEXT 错误

MySql 1170-BLOB/TEXT column idused in key specification without a key length 原因&#xff1a;由于将主键id设置为 text类型&#xff0c;所以导致主键 的长度&#xff0c;没有设置。 解决方案&#xff1a;方案1&#xff1a;将主键id设置为varchar 类型的,设置对应的长度…

如何通过Python将各种数据写入到Excel工作表

在数据处理和报告生成等工作中&#xff0c;Excel表格是一种常见且广泛使用的工具。然而&#xff0c;手动将大量数据输入到Excel表格中既费时又容易出错。为了提高效率并减少错误&#xff0c;使用Python编程语言来自动化数据写入Excel表格是一个明智的选择。Python作为一种简单易…

揭秘人工智能:探索智慧未来

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;数据结构、网络奇遇记 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. 什么是人工智能?二. 人工智能的关键技术2.1 机器学习2.2 深度学习2.1 计算机…

linux泡妞大法之Nginx网站服务

技能目标 学会 Nginx 网站服务的基本构建 了解 Nginx 访问控制实现的方法 掌握 Nginx 部署虚拟主机的方法 学会 LNMP 架构部署及应用的方法 在各种网站服务器软件中&#xff0c;除了 Apache HTTP Server 外&#xff0c;还有一款轻量级…

我是一片骂声中成长起来的专家,RocketMQ消息中间件实战派上下册!!

曾几何&#xff0c;我的技术真的很烂&#xff0c;烂到技术主管每次都是点名要Review我的业务代码。 曾几何&#xff0c;我对技术沉淀没有一点自我意识&#xff0c;总是觉得临时抱一下佛脚就可以了。 曾几何&#xff0c;我也觉得技术无用&#xff0c;看看那些业务领导&#xf…

2023年广东省网络安全A模块(笔记详解)

模块A 基础设施设置与安全加固 一、项目和任务描述&#xff1a; 假定你是某企业的网络安全工程师&#xff0c;对于企业的服务器系统&#xff0c;根据任务要求确保各服务正常运行&#xff0c;并通过综合运用登录和密码策略、流量完整性保护策略、事件监控策略、防火墙策略等多…

Linux习题4

解析&#xff1a; 用二进制表示 rwx&#xff0c;r 代表可读&#xff0c;w 代表可写&#xff0c;x 代表可执行。如果可读&#xff0c;权限二进制为 100&#xff0c;十进制是4&#xff1b;如果可写&#xff0c;权限二进制为 010&#xff0c;十进制是2&#xff1b; 如果可执行&a…

如何在Linux上部署1Panel面板并远程访问内网Web端管理界面

文章目录 前言1. Linux 安装1Panel2. 安装cpolar内网穿透3. 配置1Panel公网访问地址4. 公网远程访问1Panel管理界面5. 固定1Panel公网地址 前言 1Panel 是一个现代化、开源的 Linux 服务器运维管理面板。高效管理,通过 Web 端轻松管理 Linux 服务器&#xff0c;包括主机监控、…

ReactNative 常见问题及处理办法(加固混淆)

文章目录 摘要 引言 正文ScrollView内无法滑动RN热更新中的文件引用问题RN中获取高度的技巧RN强制横屏UI适配问题低版本RN&#xff08;0.63以下&#xff09;适配iOS14图片无法显示问题RN清理缓存RN navigation参数取值pod install 或者npm install 443问题处理 打开要处理的…

2023中国PostgreSQL数据库生态大会-核心PPT资料下载

一、峰会简介 大会以“极速进化融合新生”为主题&#xff0c;探讨了PostgreSQL数据库生态的发展趋势和未来方向。 在大会主论坛上&#xff0c;专家们就PostgreSQL数据库的技术创新、应用实践和生态发展进行了深入交流。同时&#xff0c;大会还设置了技术创新&云原生论坛、…

2023年后,AI 还有什么研究方向有前景?

什么是AI ​ AI代表人工智能&#xff0c;它是指通过计算机科学技术使机器能够执行需要智力的任务的一种技术。这些任务包括学习、推理、问题解决和感知等&#xff0c;通常是人类智能的表现。人工智能的目标是使计算机系统能够执行需要人类智力的任务&#xff0c;而不需要人类的…

国产高分七号光学影像产品预处理步骤

1.引言 高分七号卫星采用主被动光学复合测绘新体制&#xff0c;星上搭载了双线阵相机、激光测高仪等有效载荷&#xff0c;其中双线阵相机可有效获取20公里幅宽、优于0.8m&#xff08;后视&#xff1a;0.65m;前视&#xff1a;0.8m&#xff09;分辨率的全色立体影像和2.6m分辨率的…

Java中的Queue

Java中的Queue 在Java中&#xff0c;Queue 接口代表了一个队列数据结构&#xff0c;它按照先进先出&#xff08;First In, First Out&#xff0c;FIFO&#xff09;的原则进行元素的操作。Queue 接口扩展自 Collection 接口&#xff0c;定义了一系列方法&#xff0c;包括添加、删…

JavaWeb——后端之Mybatis

四、Mybatis 概念&#xff1a; Mybatis是一款持久层&#xff08;Dao层&#xff09;框架&#xff0c;用于简化JDBC&#xff08;Sun操作数据库的规范&#xff0c;较繁琐&#xff09;的开发 历史&#xff1a; Apache的一个开源项目iBatis&#xff0c;2010年由apache迁移到了goog…

Zookeeper(持续更新)

VIP-01 Zookeeper特性与节点数据类型详解 文章目录 VIP-01 Zookeeper特性与节点数据类型详解正文1. 什么是Zookeeper&#xff1f;2. Zookeeper 核心概念2.1、 文件系统数据结构2.2、监听通知机制2.3、Zookeeper 经典的应用场景3.2. 使用命令行操作zookeeper 正文 什么是Zookee…

初学编程,到底选Java还是C++?

初学编程&#xff0c;到底选Java还是C? 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「C的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#x…