Kafka收发消息核心参数详解

文章目录

  • 1、从基础的客户端说起
    • 1.1、消息发送者主流程
    • 1.2、消息消费者主流程
  • 2、从客户端属性来梳理客户端工作机制
    • 2.1、消费者分组消费机制

1、从基础的客户端说起

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></dependency>

1.1、消息发送者主流程

​ 然后可以使用Kafka提供的Producer类,快速发送消息。

public class MyProducer {private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");//同步发送:获取服务端应答消息前,会阻塞当前线程。RecordMetadata recordMetadata = producer.send(record).get();String topic = recordMetadata.topic();int partition = recordMetadata.partition();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();producer.close();}
}

​ 整体来说,构建Producer分为三个步骤:

  • 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  • 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  • 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

1.2、消息消费者主流程

​ 接下来可以使用Kafka提供的Consumer类,快速消费消息。

public class MyConsumer {private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}}
}

​ 整体来说,Consumer同样是分为三个步骤:

  • 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  • 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  • 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。
    ​ Kafka的客户端基本就是固定的按照这三个大的步骤运行。在具体使用过程中,最大的变数基本上就是给生产者和消费者的设定合适的属性。这些属性极大的影响了客户端程序的执行方式。

2、从客户端属性来梳理客户端工作机制

​ 渔与鱼:Kafka的客户端API的重要目的就是想要简化客户端的使用方式,所以对于API的使用,尽量熟练就可以了。对于其他重要的属性,都可以通过源码中的描述去学习,并且可以设计一些场景去进行验证。其重点,是要逐步在脑海之中建立一个Message在Kafka集群中进行流转的基础模型。

​ 其实Kafka的设计精髓,是在网络不稳定,服务也随时会崩溃的这些作死的复杂场景下,如何保证消息的高并发、高吞吐,那才是Kafka最为精妙的地方。但是要理解那些复杂的问题,都是需要建立在这个基础模型基础上的。

2.1、消费者分组消费机制

​ 这是我们在使用kafka时,最为重要的一个机制,因此最先进行梳理。

​ 在Consumer中,都需要指定一个GROUP_ID_CONFIG属性,这表示当前Consumer所属的消费者组。他的描述是这样的:

  public static final String GROUP_ID_CONFIG = "group.id";public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";

既然这里提到了kafka-based offset management strategy,那是不是也有非Kafka管理Offset的策略呢?

另外,还有一个相关的参数GROUP_INSTANCE_ID_CONFIG,可以给组成员设置一个固定的instanceId,这个参数通常可以用来减少Kafka不必要的rebalance。

​ 从这段描述中看到,对于Consumer,如果需要在subcribe时使用组管理功能以及Kafka提供的offset管理策略,那就必须要配置GROUP_ID_CONFIG属性。这个分组消费机制简单描述就是这样的:

在这里插入图片描述

​ 生产者往Topic下发消息时,会尽量均匀的将消息发送到Topic下的各个Partition当中。而这个消息,会向所有订阅了该Topic的消费者推送。推送时,每个ConsumerGroup中只会推送一份。也就是同一个消费者组中的多个消费者实例,只会共同消费一个消息副本。而不同消费者组之间,会重复消费消息副本。这就是消费者组的作用。

​ 与之相关的还有Offset偏移量。这个偏移量表示每个消费者组在每个Partiton中已经消费处理的进度。在Kafka中,可以看到消费者组的Offset记录情况。

[oper@worker1 bin]$ ./kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/92810.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【RabbitMQ实战】06 RabbitMQ配置

一、概述 一般情况下&#xff0c;可以使用默认的内建配置来有效地运行RabbitMQ&#xff0c;并且大多数情况下也并不需要修改任何 RabbitMQ的配置。当然&#xff0c;为了更加有效地操控 RabbitMQ&#xff0c;也可以利用调节系统范围内的参数来达到定制化的需求。 RabbitMQ提供…

《大师级引导-应对困境的工具与技术》读书笔记1

《大师级引导-应对困境的工具与技术》这个书&#xff0c;十分不错&#xff0c;教练和非教练都可以学习。下面是其中的关于冲突的处理&#xff1a; 定义&#xff1a;双方以解决冲突、说明关系为目的而进行的积极的、具有建设性的对话。 目的&#xff1a;制定双方协议&#xf…

《CTFshow-Web入门》10. Web 91~110

Web 入门 索引web91题解总结 web92题解总结 web93题解 web94题解 web95题解 web96题解 web97题解 web98题解 web99题解总结 web100题解 web101题解 web102题解 web103题解 web104题解 web105题解总结 web106题解 web107题解 web108题解 web109题解 web110题解 ctf - web入门 索…

锚框_的标定

一、查漏补缺、熟能生巧&#xff1a; 1.关于fix.axis.add_patch在原来图像的坐标系同添加 边框的函数的使用&#xff1a; 2.torch.arange( h , device)生成tensor的等差数组: 3.torch.T&#xff08;&#xff09;就是transpose转置操作的函数咯: 4.torch.repeat操作&#xff0c…

Substructure‑aware subgraph reasoning for inductive relation prediction

摘要 关系预测的目的是推断知识图中实体之间缺失的关系,其中归纳关系预测因其适用于新兴实体的有效性而广受欢迎。大多数现有方法学习逻辑组合规则或利用子图来预测缺失关系。尽管在性能方面已经取得了很大的进展,但目前的模型仍然不是最优的,因为它们捕获拓扑信息的能力有…

【Axure高保真原型】3D圆柱图_中继器版

今天和大家分享3D圆柱图_中继器版的原型模板&#xff0c;图表在中继器表格里填写具体的数据&#xff0c;调整坐标系后&#xff0c;就可以根据表格数据自动生成对应高度的圆柱图&#xff0c;鼠标移入时&#xff0c;可以查看对应圆柱体的数据……具体效果可以打开下方原型地址体验…

Springboot+vue的在线试题题库管理系统(有报告),Javaee项目,springboot vue前后端分离项目。

演示视频&#xff1a; Springbootvue的在线试题题库管理系统&#xff08;有报告&#xff09;&#xff0c;Javaee项目&#xff0c;springboot vue前后端分离项目。 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的在线试题题库管理系统&#xff0c;采用M&…

javaSwing销售管理

​ 目录 一、选题背景 近几年来&#xff0c;传统商业与电商似乎是水火不容&#xff0c;大有不是你死便是我活的劲头。一直以来舆论都是一边倒的电商将迅速取代传统零售的论调&#xff0c;然而几年过去&#xff0c;电商的发展确实值得侧目&#xff0c;但传统商业虽然受到不小的…

LeetCode 每日一题 2023/9/25-2023/10/1

记录了初步解题思路 以及本地实现代码&#xff1b;并不一定为最优 也希望大家能一起探讨 一起进步 目录 9/25 460. LFU 缓存9/26 2582. 递枕头9/27 1333. 餐厅过滤器9/28 2251. 花期内花的数目9/29 605. 种花问题9/30 2136. 全部开花的最早一天10/1 9/25 460. LFU 缓存 freqMap…

性能压力测试的定义及步骤是什么

在今天的数字化时代&#xff0c;软件系统的性能和稳定性对于企业的成功至关重要。为了确保软件在高负载和压力情况下的正常运行&#xff0c;性能压力测试成为了不可或缺的环节。本文将介绍性能压力测试的定义、步骤。 一、性能压力测试的定义和目标 性能压力测试是通过模拟实际…

现代c++手撸2309神经网络最简化版230901

用c++输入数据:vector<vector<float>> inputs = { {1, 1}, {1, 0} };数据targets={0,1}; 测试:vector<vector<float>> inputs22 = { {1, 0}, {1,1} }; 构建神经网络,例如:NeuralNetwork nn({ 2, 4, 1 }); 则网络有四层、输入层2个节点、输出层1个节…

网络工程师学习中但是发现这个岗位非常卷怎么办

大家好&#xff0c;我是网络工程师成长日记实验室的郑老师&#xff0c;您现在正在查看的是网络工程师成长日记专栏&#xff0c;记录网络工程师日常生活的点点滴滴 有个同学说&#xff0c;他说现在有很多培训机构搞的这个网络工程师也非常卷。他现在还没有入行&#xff0c;他现在…

【js/es6】合集

目录 随机生成字符串每十个字符换行 随机生成字符串 function generateRandomAlphaNum(len) {var rdmString "";for (; rdmString.length < len; rdmString Math.random().toString(36).substr(2));return rdmString.substr(0, len); }每十个字符换行 string.…

数据分析技能点-独立性检验拟合优度检验

在这个数据驱动的时代,数据分析已经成为了一个不可或缺的工具,无论是在商业决策、医疗研究还是日常生活中。然而数据分析并不仅仅是一堆数字和图表;它是一个需要严谨的科学方法和逻辑推理的过程。 本文将重点介绍两种广泛应用于数据分析的统计检验方法:独立性检验和拟合优…

指定vscode黏贴图片路径(VSCode 1.79 更新)

指定vscode黏贴图片路径(VSCode 1.79 更新) 设置中搜索"markdown.copyFiles.destination" 点击AddItem,配置你的key-value&#xff0c;完成。

快排三种递归及其优化,非递归和三路划分

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 目录 快排简介&#xff1a; 快排的三种递归实现&#xff1a; Hoare&#xff1a; 挖坑&#xff1a; 双指针&#xff1a; 小区间优化&#xff1a; 三数取中优化&#xff1a; 快排非递归实现&#xff1a; 快排的三路划…

axios和fetch的区别

axios和fetch都是用于发起HTTP请求的工具&#xff0c;但是它们有一些区别&#xff1a; 语法和用法&#xff1a;axios是一个基于Promise的HTTP客户端&#xff0c;具有更简洁和直观的语法&#xff0c;可以方便地发送GET、POST、PUT等各种请求&#xff0c;并提供了更多的请求配置选…

Ubuntu Linux下安装 TensorFlow等开发环境

1. 安装基本工具 sudo apt-get install build-essential sudo apt-get install cmake git libgtk2.0-dev pkg-config libavcodec-dev libavformat-dev libswscale-dev sudo apt-get install python-dev python-numpy libtbb2 libtbb-dev libjpeg-dev libpng-dev libtiff-dev…

基于PSO算法的功率角摆动曲线优化研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【设计模式】五、原型模式

文章目录 概述示例传统的方式的优缺点原型模式原理结构图-uml 类图 原型模式解决克隆羊问题的应用实例Sheep类实现clone()运行原型模式在 Spring 框架中源码分析 深入讨论-浅拷贝和深拷贝浅拷贝的介绍 小结 概述 示例 克隆羊问题 现在有一只羊 tom&#xff0c;姓名为: tom, 年…