【数据采集与预处理】数据接入工具Kafka

目录

一、Kafka简介

(一)消息队列

(二)什么是Kafka

二、Kafka架构

三、Kafka工作流程分析

(一)Kafka核心组成

(二)写入流程

(三)Zookeeper 存储结构

(四)Kafka 消费过程

四、Kafka准备工作

(一)Kafka安装配置

(二)启动Kafka

(三)测试Kafka是否正常工作

五、编写Spark Streaming程序使用Kafka数据源


一、Kafka简介

(一)消息队列

消息队列内部实现原理

1、点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
        点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。

2、发布/订阅模式(一对多,数据生产后,推送给所有订阅者)
        发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

(二)什么是Kafka

        Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

在流式计算中,Kafka 一般用来缓存数据,Storm 通过消费 Kafka 的数据进行计算。
1、Apache Kafka 是一个开源消息系统。是由 Apache 软件基金会开发的一个开源消息系统项目。
2、Kafka 最初是由 LinkedIn 公司开发,并于 2011 年初开源。2012 年 10 月从 Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
3、Kafka 是一个分布式消息队列。Kafka 对消息保存时根据 Topic 进行归类,发送消息者称为 Producer,消息接受者称为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个实例(server)称为 broker。
4、无论是 kafka 集群,还是 consumer 都依赖于 zookeeper 集群保存一些 meta 信息,来保证系统可用性。

二、Kafka架构

1、Producer :消息生产者,就是向 kafka broker 发消息的客户端;
2、Consumer :消息消费者,向 kafka broker 取消息的客户端;
3、Topic :可以理解为一个队列;
4、Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 CG。topic 的消息会复制(不是真的复制,是概念上的)到所有的 CG,但每个 partion 只会把消息发给该 CG 中的一个 consumer。如果需要实现广播,只要每个 consumer 有一个独立的 CG 就可以了。要实现单播只要所有的 consumer 在同一个 CG。用 CG 还可以将 consumer 进行自由的分组而不需要多次发送消息到不同的 topic;
5、Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic;
6、Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。kafka 只保证按一个 partition 中的顺序将消息发给consumer,不保证一个 topic 的整体(多个 partition 间)的顺序;
7、Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是 00000000000.kafka。

三、Kafka工作流程分析

(一)Kafka核心组成

(二)写入流程

Producer写入流程:

1)producer 先从 zookeeper 的 "/brokers/.../state"节点找到该 partition 的 leader
2)producer 将消息发送给该 leader
3)leader 将消息写入本地 log
4)followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
5)leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK

(三)Zookeeper 存储结构

注意:producer 不在 zk 中注册,消费者在 zk 中注册。 

(四)Kafka 消费过程

消费者组:

        消费者是以 consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费者读取,但是多个 group 可以同时消费这个 partition。在图中,有一个由三个消费者组成的 group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
        在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的消费者读取的分区。

四、Kafka准备工作

(一)Kafka安装配置

1、到官网下载jar包,保存至“/usr/local/uploads”目录下。

Apache Kafkaicon-default.png?t=N7T8https://kafka.apache.org/downloads

2、解压安装Kafka,并重命名解压后的文件夹。

[root@bigdata uploads]# tar -zxvf kafka_2.11-0.8.2.2.tgz -C /usr/local
[root@bigdata uploads]# cd ..
[root@bigdata local]# mv kafka_2.11-0.8.2.2/ kafka

3、配置Spark环境

[root@bigdata local]# cd ./spark/conf
[root@bigdata conf]# vi spark-env.sh

在文件的第一行接着添加如下内容: 

:/usr/local/spark/examples/jars/*:/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*

接着,在“/usr/local/spark/jars”目录下新建文件夹kafka,并将“/usr/local/kafka/libs/”目录下的所有jar包都拷贝到“/usr/local/spark/jars/kafka”目录下。

[root@bigdata spark]# cd /usr/local/spark/jars
[root@bigdata jars]# mkdir kafka
[root@bigdata jars]# cd kafka
[root@bigdata kafka]# cp /usr/local/kafka/libs/* .

然后,将“/usr/local/uploads/”下的spark-streaming-kafka-0-8_2.11-2.4.0.jar包也拷贝到“/usr/local/spark/jars/kafka”目录下。

[root@bigdata kafka]# cp /usr/local/uploads/spark-streaming-kafka-0-8_2.11-2.4.0.jar .

spark-streaming-kafka-0-8_2.11-2.4.0.jar的下载地址:

http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.4.0

下图是拷贝完成后的“/usr/local/spark/jars/kafka”目录下的所有jar包。

这样,Spark环境就配好了。

(二)启动Kafka

1、启动Zookeeper服务

打开一个终端,输入下面命令启动Zookeeper服务:

[root@bigdata kafka]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/zookeeper-server-start.sh config/zookeeper.properties

千万不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。

2、启动Kafka服务

打开第二个终端,然后输入下面命令启动Kafka服务:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# bin/kafka-server-start.sh config/server.properties

千万不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了

(三)测试Kafka是否正常工作

再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsendertest”的Topic:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/kafka-topics.sh  --create  --zookeeper  localhost:2181 --replication-factor  1  --partitions  1  --topic  wordsendertest
#可以用list列出所有创建的Topic,验证是否创建成功
[root@bigdata kafka]# ./bin/kafka-topics.sh  --list  --zookeeper  localhost:2181

replication-factor:每个partition的副本个数 

下面用生产者(Producer)来产生一些数据,请在当前终端(记作“数据源终端”)内继续输入下面命令:

[root@bigdata kafka]# ./bin/kafka-console-producer.sh  --broker-list  localhost:9092  --topic  wordsendertest

上面命令执行后,就可以在当前终端内用键盘输入一些英文单词,比如可以输入:

hello hadoop

hello spark

现在可以启动一个消费者,来查看刚才生产者产生的数据。请另外打开第四个终端,输入下面命令:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/kafka-console-consumer.sh  --zookeeper  localhost:2181  --topic  wordsendertest  --from-beginning

可以看到,屏幕上会显示出如下结果,也就是刚才在另外一个终端里面输入的内容:

五、编写Spark Streaming程序使用Kafka数据源

在“/home/zhc/mycode/”路径下新建文件夹sparkstreaming,再在该文件夹下新建py文件KafkaWordCount.py。

#/home/zhc/mycode/sparkstreaming/KafkaWordCount.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilsif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: KafkaWordCount.py <zk> <topic>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingKafkaWordCount")ssc = StreamingContext(sc, 1)zkQuorum, topic = sys.argv[1:]kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})lines = kvs.map(lambda x: x[1])counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)counts.pprint()ssc.start()ssc.awaitTermination()

新建一个终端(记作“流计算终端”),执行KafkaWordCount.py,命令如下:

[root@bigdata zhc]# cd /home/zhc/mycode
[root@bigdata mycode]# mkdir sparkstreaming
[root@bigdata mycode]# cd sparkstreaming
[root@bigdata sparkstreaming]# vi KafkaWordCount.py
[root@bigdata sparkstreaming]# spark-submit KafkaWordCount.py localhost:2181 wordsendertest

这时再切换到之前已经打开的“数据源终端”,用键盘手动敲入一些英文单词,在流计算终端内就可以看到类似如下的词频统计动态结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/598948.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

竞赛保研 基于机器视觉的银行卡识别系统 - opencv python

1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于深度学习的银行卡识别算法设计 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f9ff; 更多资料, 项目分享&#xff1a; https://gitee.com/dancheng…

Mybatis动态SQL注解开发操作数据库

通过Mybatis的动态注解开发&#xff0c;只需要在映射文件中使用注解来配置映射关系&#xff0c;从而无需编写XML映射文件。常用的注解有Select&#xff0c;Update&#xff0c;Insert&#xff0c;Delete等&#xff0c;它们分别用于配置查询&#xff0c;更新&#xff0c;插入和删…

Linux系统安全

作为一种开放源代码的操作系统&#xff0c;linux服务器以其安全、高效和稳定的显著优势而得以广泛应用。 账号安全控制 用户账号是计算机使用者的身份凭证或标识&#xff0c;每个要访问系统资源的人&#xff0c;必须凭借其用户账号 才能进入计算机.在Linux系统中&#xff0c;提…

MIGO向成本中心发料,从成本中心收货

向成本中心发料&#xff0c;首先在MM03查看物料是否有库存&#xff0c;物料的计价标准和产成品的计价标准价是否同一种&#xff0c;S价或者V价 首先&#xff0c;“会计1”视图&#xff0c;查看物料库存 “成本2”视图查看标准成本发布 1、MIGO发货&#xff0c;选&#xff1a;A…

Solid Converter 10.1(PDF转换器)软件安装包下载及安装教程

Solid Converter 10.1下载链接&#xff1a;https://docs.qq.com/doc/DUkdMbXRpZ255dXFT 1、选中下载好的安装包右键解压到【Solid Converter 10.1.11102.4312】文件夹。 2、选中"solidconverter"右键以管理员身份运行 3、选择”自定义安装”&#xff0c;勾选”我已阅…

MySql 1170-BLOB/TEXT 错误

MySql 1170-BLOB/TEXT column idused in key specification without a key length 原因&#xff1a;由于将主键id设置为 text类型&#xff0c;所以导致主键 的长度&#xff0c;没有设置。 解决方案&#xff1a;方案1&#xff1a;将主键id设置为varchar 类型的,设置对应的长度…

如何通过Python将各种数据写入到Excel工作表

在数据处理和报告生成等工作中&#xff0c;Excel表格是一种常见且广泛使用的工具。然而&#xff0c;手动将大量数据输入到Excel表格中既费时又容易出错。为了提高效率并减少错误&#xff0c;使用Python编程语言来自动化数据写入Excel表格是一个明智的选择。Python作为一种简单易…

揭秘人工智能:探索智慧未来

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;数据结构、网络奇遇记 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. 什么是人工智能?二. 人工智能的关键技术2.1 机器学习2.2 深度学习2.1 计算机…

《系统架构设计师教程(第2版)》第3章-信息系统基础知识-05-专家系统(ES)

文章目录 1. 先了解人工智能2.1 人工智能的特点2.2 人工智能的主要分支2. ES概述2.1 概述2.2 和一般系统的区别1)第一遍说了5点(理解为主)2)第二遍说的3点(主要记这个)3. ES的特点4. ES的组成4.1 知识库4.2 综合数据库4.3 推理机4.4 知识获取模块4.5 解释程序4.6 人一机接…

linux泡妞大法之Nginx网站服务

技能目标 学会 Nginx 网站服务的基本构建 了解 Nginx 访问控制实现的方法 掌握 Nginx 部署虚拟主机的方法 学会 LNMP 架构部署及应用的方法 在各种网站服务器软件中&#xff0c;除了 Apache HTTP Server 外&#xff0c;还有一款轻量级…

授权策略(authorize方法)

authorize方法&#xff08;授权策略的使用示例&#xff09; $this->authorize(destroy, $status) 要实现这个功能&#xff0c;你需要执行以下步骤&#xff1a; 1、创建一个授权策略&#xff1a; 在Laravel中&#xff0c;授权策略是用于定义用户对特定操作的权限的类。你可…

我是一片骂声中成长起来的专家,RocketMQ消息中间件实战派上下册!!

曾几何&#xff0c;我的技术真的很烂&#xff0c;烂到技术主管每次都是点名要Review我的业务代码。 曾几何&#xff0c;我对技术沉淀没有一点自我意识&#xff0c;总是觉得临时抱一下佛脚就可以了。 曾几何&#xff0c;我也觉得技术无用&#xff0c;看看那些业务领导&#xf…

每日一道算法题day-two(备战蓝桥杯)

今天带来的题目是&#xff1a; 填充 有一个长度为 n的 0101 串&#xff0c;其中有一些位置标记为 ?&#xff0c;这些位置上可以任意填充 0 或者 1&#xff0c;请问如何填充这些位置使得这个 0101 串中出现互不重叠的 00 和 11 子串最多&#xff0c;输出子串个数。 输入格式…

【Kubernetes】Kubernetes ConfigMap 实战指南

ConfigMap 是 Kubernetes 中一种用于存储配置信息的资源对象,它允许您将配置与应用程序解耦,轻松管理和更新配置。在这个实战指南中,我们将涵盖创建、更新、删除 ConfigMap,并探讨其原理、优点、不足。最后,我们将通过一个实际案例演示如何在 Node.js 应用程序中使用 Conf…

Spring框架-Spring Bean管理

文章目录 Spring Bean管理Spring Bean配置方式&#xff1a;使用XML配置方式&#xff1a;User.javaapplicationContext.xmlUserTest.java 使用注解配置方式&#xff1a;ComponentServiceRepositoryConfigurationScopeValueQualifierPrimary Bean的作用域和生命周期&#xff1a;B…

2023年广东省网络安全A模块(笔记详解)

模块A 基础设施设置与安全加固 一、项目和任务描述&#xff1a; 假定你是某企业的网络安全工程师&#xff0c;对于企业的服务器系统&#xff0c;根据任务要求确保各服务正常运行&#xff0c;并通过综合运用登录和密码策略、流量完整性保护策略、事件监控策略、防火墙策略等多…

企业数据资源入表,对数商企业有什么变化?释放了什么信号?

不是必须的&#xff0c;二者没有必然联系。 数据资产入表的专业术语是数据资产会计核算。在《企业数据资源相关会计处理暂行规定》出台之前&#xff0c;很多企业的数据产品研究和开发阶段所产生的支出大都是费用化&#xff0c;直接计入损益表&#xff0c;但企业有一部分数据产…

Linux习题4

解析&#xff1a; 用二进制表示 rwx&#xff0c;r 代表可读&#xff0c;w 代表可写&#xff0c;x 代表可执行。如果可读&#xff0c;权限二进制为 100&#xff0c;十进制是4&#xff1b;如果可写&#xff0c;权限二进制为 010&#xff0c;十进制是2&#xff1b; 如果可执行&a…

如何在Linux上部署1Panel面板并远程访问内网Web端管理界面

文章目录 前言1. Linux 安装1Panel2. 安装cpolar内网穿透3. 配置1Panel公网访问地址4. 公网远程访问1Panel管理界面5. 固定1Panel公网地址 前言 1Panel 是一个现代化、开源的 Linux 服务器运维管理面板。高效管理,通过 Web 端轻松管理 Linux 服务器&#xff0c;包括主机监控、…

JAVA那些事(八)异常处理

异常处理机制 Java中的异常处理机制是Java程序设计中非常重要的一个特性&#xff0c;它允许程序在出现错误或意外情况时进行适当的响应&#xff0c;而不是直接导致程序崩溃。异常处理遵循“捕获或者声明”原则&#xff0c;这意味着程序员要么捕获并处理可能发生的异常&#xf…