【Kafka】常用操作

1、基本概念

在这里插入图片描述

1. 消息: Kafka是一个分布式流处理平台,它通过消息进行数据的传输和存储。消息是Kafka中的基本单元,可以包含任意类型的数据。

2. 生产者(Producer): 生产者负责向Kafka主题发送消息。它将消息发布到指定的主题,可以按照自定义的逻辑生成消息,并决定消息发送的频率和顺序。

3. 消费者(Consumer): 消费者从Kafka主题订阅并接收消息。它可以以不同的方式消费消息,如批量拉取、实时流式处理或订阅特定的消息主题。

4. 主题(Topic): 主题是Kafka中消息的分类标签,用于组织消息。每个主题可以有多个生产者和多个消费者。主题通常与特定的业务领域或数据类型相关联。

5. 分区(Partition): 主题可以被分割成多个分区,每个分区都是一个有序且持久化的消息队列。分区允许Kafka对消息进行水平扩展,并提供了并行处理和负载均衡的能力。

6. 偏移量(Offset): 偏移量是消息在分区中的唯一标识符,用于表示消息在分区内的顺序位置。消费者可以跟踪偏移量来记录已经读取的消息,以便实现精确的消费位置控制。

7. 消费者组(Consumer Group): 消费者组是一组具有相同逻辑的消费者,它们共同消费一个或多个主题中的消息。消费者组允许Kafka进行水平扩展和负载均衡,在该组内的每个消费者负责处理不同的分区。

8. 副本(Replication): Kafka使用副本机制来提供数据冗余和高可用性。每个分区都可以配置多个副本,这些副本保持分区数据的一致性,并可以替代主副本以提供故障恢复功能。

2、安装部署

参考:
https://juejin.cn/post/7158663198411849741

https://www.cnblogs.com/linjiqin/p/13196347.html

3、常用命令

配置文件解析:cat server.properties

#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600 #kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

启动/关闭 kafka:

cd /usr/local/kafka/kafka_2.12-3.5.0/bin/
bin/kafka-server-start.sh config/server.properties
bin/kafka-server-stop.sh stop

验证kafka是否可以使用,仍在bin目录下

运行kafka生产者发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic sun

运行kafka消费者接收消息

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sun --from-beginning

4、常用操作API

创建生产者并发送消息

from kafka import KafkaProducer
import time
# 创建生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092')# 发送单条消息
producer.send('my_topic', b'Hello, Kafka!')# Kafka的发送实际上是异步的
# 生产者在发送消息之后并不会等待确认消息是否已经成功到达Kafka broker
# 而是立即继续执行下一行代码或退出程序
# 在生产者发送完消息后,给消费者足够的时间来连接到Kafka broker并订阅主题# 等待消费者订阅主题
time.sleep(2)  # 延迟2秒钟,给消费者足够的时间连接到Kafka并订阅主题# 发送多条消息
messages = [b'Message 1', b'Message 2', b'Message 3']
for message in messages:producer.send('my_topic', message)
time.sleep(2)  # 延迟2秒钟,给消费者足够的时间连接到Kafka并订阅主题

创建消费者并订阅主题并消费消息

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')# 消费消息
for message in consumer:print(message.value.decode())

指定消费者组和自动提交偏移量

from kafka import KafkaConsumer# 创建消费者,并指定消费者组和自动提交偏移量
consumer = KafkaConsumer('my_topic', group_id='my_consumer_group',bootstrap_servers='localhost:9092',enable_auto_commit=True)# 消费消息
for message in consumer:print(message.value.decode())

指定消费者组和自动提交偏移量

为什么需要指定消费者组呢?

在Kafka中,消费者组是一组消费者的逻辑名称,它们共同协作来消费一个或多个主题中的消息。通过将消费者组绑定到特定主题上,Kafka能够提供高可用性、负载均衡和容错能力。

指定消费者组有以下几个原因:

  1. 负载均衡: 当多个消费者以相同的消费者组订阅同一个主题时,Kafka会自动分配分区给每个消费者,从而实现负载均衡。每个消费者只处理被分配的分区,这样可以确保所有分区被均匀地消费。
  2. 容错能力: 如果有消费者发生故障或离线,指定消费者组可以确保其他消费者接管该消费者组失去的分区,从而实现容错能力。这意味着即使某些消费者不可用,消息仍然可以被处理。
  3. 消费者协作: 消费者组允许多个消费者协同工作,以实现更高的消费并行度。每个消费者可以独立地处理其分配的分区,并且可以扩展系统的整体处理能力。

需要注意的是,如果您没有为消费者指定消费者组,则它将成为一个独立的消费者。这种情况下,每个消费者将独立地消费所有分区中的消息,而不会共享负载或具备容错能力。

因此,在大多数情况下,为了实现负载均衡、容错和提高处理能力,您应该指定消费者组,尤其是在需要同时处理大量消息或要求高可用性的场景中。如果您只需要简单地消费主题中的消息,而不关注这些特性,那么可以选择不指定消费者组。

手动提交偏移量

from kafka import KafkaConsumer# 创建消费者,并禁用自动提交偏移量
consumer = KafkaConsumer('my_topic', group_id='my_consumer_group',bootstrap_servers='localhost:9092',enable_auto_commit=False)# 消费消息并手动提交偏移量
for message in consumer:print(message.value.decode())consumer.commit()

自动提交偏移量和手动提交偏移量有什么区别呢?

自动提交偏移量(Auto Commit Offset)和手动提交偏移量(Manual Commit Offset)是两种不同的消费者偏移量管理方式。

自动提交偏移量:

  • 在自动提交模式下,消费者会定期自动将已消费的消息偏移量提交给Kafka。
  • 消费者无需显式调用提交偏移量的方法,Kafka会在后台自动处理。
  • 自动提交偏移量可以简化代码,减少了手动提交的复杂性。
  • 然而,自动提交偏移量可能会导致一些问题。例如,如果消费者在处理消息之前发生故障,那么已经消费但尚未提交的偏移量将丢失,造成消息重复或丢失。

手动提交偏移量:

  • 在手动提交模式下,消费者需要显式地调用提交偏移量的方法,将已消费的消息偏移量提交给Kafka。
  • 手动提交偏移量提供了更好的控制能力,可以确保消息的准确处理和可靠提交。
  • 消费者可以在适当的时机调用commit()方法来提交偏移量。通常,在成功处理消息后再进行提交是一个常见的模式。
  • 手动提交偏移量需要额外的代码来管理和处理偏移量的提交,但它提供了更高的灵活性和可靠性。

选择使用自动提交偏移量还是手动提交偏移量取决于具体的使用场景和需求。如果您的应用程序对消息处理的准确性和可靠性要求较高,或者需要更精细的控制以避免重复消费或消息丢失,那么手动提交偏移量可能更适合。否则,自动提交偏移量可以提供一种简化的方式来管理偏移量,尤其在简单的消费者应用中很常见。

手动提交偏移量与自动提交偏移量在性能方面可能存在一些差异,但这取决于具体的使用情况和配置。

性能方面的考虑:

  1. 提交频率: 自动提交偏移量会定期提交偏移量到Kafka服务器,默认情况下是每隔一段时间提交一次。相比之下,手动提交偏移量可以根据应用程序的需求选择何时提交,可以控制提交的频率。如果手动提交偏移量过于频繁,可能会影响性能。
  2. 网络延迟: 手动提交偏移量需要与Kafka服务器进行通信来提交偏移量。如果手动提交偏移量的操作导致频繁的网络调用,而且网络延迟较高,可能会对性能产生一定的影响。
  3. 消息处理时间: 如果消息处理时间很长,手动提交偏移量可能会在处理消息之前进行提交,以保证消息处理的可靠性。然而,这样也会增加提交偏移量的开销,可能降低整体性能。

需要注意的是,性能差异通常是微小的,并且在大多数情况下不会成为主要限制因素。如果性能是一个关键问题,可以根据实际情况进行测试和优化。

此外,可以通过调整参数来改善性能,例如增加自动提交的间隔时间、批量提交偏移量等。使用合适的配置和优化技术可以平衡性能和可靠性之间的权衡。

总而言之,手动提交偏移量可能会稍微影响性能,但仍然取决于具体的使用情况和配置。对于大多数应用程序而言,差异通常是可以接受的,并且可以根据实际需求进行调整和优化。

查看当前有哪些topic

from kafka import KafkaAdminClient# 创建AdminClient连接到Kafka集群
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')# 获取主题列表
topic_list = admin_client.list_topics()# 打印主题列表
print(topic_list)# ['my_topic', 'sun', '__consumer_offsets']
# __consumer_offsets是Kafka中的一个系统内置主题
# 这个特殊的主题用于存储消费者组的偏移量(offsets)
# 以跟踪消费者在每个分区中读取消息的位置
# __consumer_offsets主题的目的是为了支持Kafka的消费者组功能
# 当消费者组启用自动提交偏移量时,Kafka会将消费者组的偏移量信息存储在__consumer_offsets主题中
# 以便能够在重平衡、故障恢复等情况下为消费者提供正确的偏移量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/13794.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python web实战 | Docker+Nginx部署python Django Web项目详细步骤【干货】

概要 在这篇文章中,我将介绍如何使用 Docker 和 Nginx 部署 Django Web 项目。一步步讲解如何构建 Docker 镜像、如何编写 Docker Compose 文件和如何配置 Nginx。 1. Docker 构建 Django Web 项目 1.1 配置 Django 项目 在开始之前,我们需要有一个 D…

QT自定义控件实现并导入

QT自定义控件 介绍 QT Creator自定义控件和designer控件导入 1.安装QT5.7.1 2.将QT编译器目录、lib目录、include目录导入path 使用说明 使用说明按照 1.创建QtDesigner自定义控件工程,打开Qt Creator,创建一个Qt 设计师自定义控件,如下图所示&#xf…

CK_03靶机详解

CK_03靶机详解 靶场下载地址:https://download.vulnhub.com/ck/MyFileServer_3.zip 这个靶机开放的端口特别多,所以给我们的误导也很多,我直接按照正确的思路来。 因为开着445所以就枚举了一下靶机上共享的东西,发现两个share的…

elment-ui的侧边栏 开关及窗口联动

<template><div class"asders"><el-aside width"200px"><div class"boxbody"><div>源码外卖</div><el-switch v-model"isCollapse" :active-value"true" :inactive-value"fals…

通过Filebeat进行日志监控

对系统的日志监控&#xff0c;通用做法是使用ELK&#xff08;Elasticsearch、Logstash、Kibana&#xff09;进行监控和搜索日志&#xff0c;这里给出另一种方案&#xff1a;通过Filebeat接收日志到Kafka&#xff0c;监控平台接收Kafka&#xff0c;并通过WebSocket实时展示。 这…

CAD Voronoi3D V1.0.1 版本更新说明

更新说明 CAD Voronoi3D V1.0.1版本对泰森多边形晶格进行进一步的优化。 采用新算法大幅度减少形体边界出现小晶格的可能性&#xff0c;使区块更均匀&#xff1a; 优化曲边边界晶格曲率问题&#xff0c;消除曲边形体晶格边界曲率过大现象&#xff1a; 优化生成算法&#xff…

Bash编程

目录&#xff1a; bash编程语法bash脚本编写 1.bash编程语法 Bash 编程基础 变量引号数组控制语句函数 Bash 变量 语法&#xff1a; Variable_namevalue Bash 变量定义的规则 变量名区分大小写&#xff0c;a和A为两个不同的变量。变量名可以使用大小写字母混编的形式进行…

iOS - 解压ipa包中的Assert.car文件

项目在 Archive 打包后&#xff0c;生成ipa包 将 xxx.ipa文件修改为zip后缀即 xxx.zip &#xff0c;然后再双击解压&#xff0c;会生成一个 Payload 文件夹&#xff0c;里面一个文件 如下图&#xff1a; 然后显示改文件的包内容&#xff1a; 解压 Assets.car 文件的方式&…

Linux操作系统1-命令篇

不同领域的主流操作系统 桌面操作系统 Windos Mac os Linux服务器操作系统 Unix Linux(免费、稳定、占有率高) Windows Server移动设备操作系统 Android(基于Linux,开源) ios嵌入式操作系统 Linux(机顶盒、路由器、交换机) Linux 特点&#xff1a;免费、开源、多用户、多任务…

flutter:BottomNavigationBar和TabBar

区别 BottomNavigationBarr和TabBar都是用于创建导航栏的组件&#xff0c;但它们有一些区别。 位置不同&#xff1a;BottomNavigationBar通常位于屏幕底部&#xff0c;用于主要导航&#xff1b;而TabBar通常位于屏幕顶部或底部&#xff0c;用于切换不同的视图或页面。 样式不…

java设计模式-观察者模式

什么是观察者模式 观察者模式&#xff08;Observer&#xff09;是软件设计中的一种行为模式。 它定义了对象之间的一对多关系&#xff0c;其中如果一个对象改变了状态&#xff0c;所有依赖它的对象都会自动被通知并更新。 这种模式包含了两种主要的角色&#xff0c;即被观察…

AI 绘画Stable Diffusion 研究(二)sd模型ControlNet1.1 介绍与安装

部署包作者:秋葉aaaki 免责声明: 本安装包及启动器免费提供 无任何盈利目的 大家好&#xff0c;我是风雨无阻。 众所周知&#xff0c;StableDiffusion 是非常强大的AI绘图工具&#xff0c;需要详细了解StableDiffusion的朋友&#xff0c;可查看我之前的这篇文章&#xff1a; …

springboot框架下,请使用@ConfigurationProperties替代@Value加载配置

一、背景 程序启动时&#xff0c;详细报错见下&#xff1a; 10:40:31.965 [main] ERROR org.springframework.boot.SpringApplication - Application run failed org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name ‘redisDi…

Tribon二次开发- tbbatchjob

在Tribon安装目录下C:\Tribon\M3\Bin里面有许多未知用途的exe,有的双击后时一个DOS终端,有的一闪而过,有的需要按照提示输入信息,有的需要提前在指定的目录配置文件,该如何使用呢? 这些exe大多可以在Tribon以外通过.NET来使用,有的可以通过添加.NET项目引用来使用,有的…

Spring Cloud Alibaba - Nacos源码分析(三)

目录 一、Nacos客户端服务订阅的事件机制 1、监听事件的注册 2、ServiceInfo处理 serviceInfoHolder.processServiceInfo 一、Nacos客户端服务订阅的事件机制 Nacos客户端订阅的核心流程&#xff1a;Nacos客户端通过一个定时任务&#xff0c;每6秒从注册中心获取实例列表&…

filebeat介绍

1、filebeat概述 Filebeat是用于转发和集中日志数据的轻量级传送工具。Filebeat监视您指定的日志文件或位置&#xff0c;收集日志事件&#xff0c;并将它们转发到Elasticsearch或 Logstash或kafka进行索引 1.1 Filebeat两个主要组件 prospector 和 harvester。 prospector&a…

Flink CEP(二) 运行源码解析

通过DemoApp学习一下&#xff0c;CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。 1. Pattern的定义 Pattern<Tuple3<String, Long, String>,?> pattern Pattern.<Tuple3<String, Long, String>>begin("begin").where(new…

数据分析-关于指标和指标体系

一、电商指标体系 二、指标体系的作用 三、统计学中基本的分析手段

移远通信推出新一代高算力智能模组SG885G-WF,为工业和消费级IoT应用带来全新性能标杆

2023年7月24日&#xff0c;全球领先的物联网整体解决方案供应商移远通信宣布&#xff0c;正式推出其新一代旗舰级安卓智能模组SG885G-WF。该智能模组具有高达48 TOPS 的AI综合算力、强大性能及丰富的多媒体功能&#xff0c;非常适用于需要高处理能力和多媒体功能的工业和消费者…

如何在win10环境下配置强化学习gym库(使用vscode)

我是通过anacondavscode完成的gym库的使用&#xff0c;只是把案例跑起来了&#xff0c;具体步骤如下&#xff1a; 1、安装anaconda,参考链接&#xff1a;https://www.jianshu.com/p/2f3be7781451 我其实就是生安装的&#xff0c;也没有去配置环境啥的&#xff0c;就是下载安…